HuggingFaceProcessor with ProcessingStep results in import errors (similar to issues/2656)
Describe the bug Using the HuggingFaceProcessor with ProcessingStep results in import errors similar to: https://github.com/aws/sagemaker-python-sdk/issues/2656
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 2: SAGEMAKER_INPUTS_DIR: command not found 2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 3: SAGEMAKER_OUTPUTS_DIR: command not found 2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 5: import: command not found 2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 6: import: command not found 2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 7: import: command not found 2024-07-25T15:47:57.616Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 8: import: command not found ...
To reproduce Define the HuggingFace processor huggingface_processor = HuggingFaceProcessor( role=role, transformers_version='4.4', pytorch_version='1.6.0', instance_count=1, instance_type=f'{SAGEMAKER_GPU_INSTANCE_TYPE}', command=["python3"] )
step_evaluate = ProcessingStep( name="...", processor=huggingface_processor, inputs=[ ... ], outputs=[ ... ], code="src/evaluate.py" )
pipeline = Pipeline( name="...", steps=[ step_evaluate] )
Execute the pipeline pipeline.upsert(role_arn=role) execution = pipeline.start() execution.wait()
Expected behavior Error should not happen, and evalute.py should be invoked by the container.
Screenshots or logs Attached
System information A description of your system. Please provide:
- SageMaker Python SDK version: $ pip show sagemaker Name: sagemaker Version: 2.226.1
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): $ pip show transformers Name: transformers Version: 4.42.4
$ pip show torch Name: torch Version: 2.1.2 log-events-viewer-result.csv
Additional context related? https://github.com/aws/sagemaker-python-sdk/issues/2656
Hi @solanki-ravi , thanks for reaching out!
Given this job logs:
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 2: SAGEMAKER_INPUTS_DIR: command not found
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 3: SAGEMAKER_OUTPUTS_DIR: command not found
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 5: import: command not found
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 6: import: command not found
2024-07-25T15:47:57.615Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 7: import: command not found
2024-07-25T15:47:57.616Z | /opt/ml/processing/input/entrypoint/evaluate.py: line 8: import: command not found
The command not found error was thrown from your src/evaluate.py script. As this is a customer script, without it, we are not able to reproduce the same issue.
Could you check your script to see what commands are used in line 2-8 in src/evaluate.py?
Thanks @qidewenwhen. evaluate.py and path invocation script is inline here.
--- evaluate.py---
#!/usr/bin/env python3
SAGEMAKER_INPUTS_DIR = '/opt/ml/processing/input'
SAGEMAKER_OUTPUTS_DIR = '/opt/ml/processing/output'
import subprocess
#Install the required libraries before imports
subprocess.check_call(["pip", "install", "-r", "/opt/ml/processing/input/requirements/requirements.txt"])
import argparse
import os
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import json
import logging
import tarfile
import numpy as np
logging.basicConfig(level=logging.INFO)
class CustomDataset(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
def compute_metrics(pred, labels):
preds = pred.argmax(-1)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
acc = accuracy_score(labels, preds)
return {
'accuracy': acc,
'f1': f1,
'precision': precision,
'recall': recall
}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model_dir", type=str, default=f'{SAGEMAKER_INPUTS_DIR}/model')
parser.add_argument("--test_file", type=str, default=f'{SAGEMAKER_INPUTS_DIR}/test/test')
parser.add_argument("--output_dir", type=str, default=f'{SAGEMAKER_OUTPUTS_DIR}')
parser.add_argument("--max_length", type=int, default=512)
args = parser.parse_args()
# Load dataset
logging.info(f'Reading test data from: {args.test_file}.')
test_df = pd.read_csv(args.test_file, header=None)
logging.info(f'Successfully read test data from: {args.test_file}.')
test_texts, test_labels = test_df.iloc[:, 1].tolist(), test_df.iloc[:, 0].tolist()
logging.info(f'Test texts sample: {test_texts[:5]}')
logging.info(f'Test labels sample: {test_labels[:5]}')
logging.info(f'Number of test samples: {len(test_texts)}')
# Extract the tar.gz model file
model_tar_path = os.path.join(args.model_dir, "model.tar.gz")
model_extracted_dir = os.path.join(args.model_dir, "extracted")
logging.info(f'Extracting model.tar.gz, model_tar_path: {model_tar_path}, model_extracted_dir: {model_extracted_dir}')
os.makedirs(model_extracted_dir, exist_ok=True)
with tarfile.open(model_tar_path) as tar:
tar.extractall(path=model_extracted_dir)
# Tokenize data
tokenizer = BertTokenizer.from_pretrained(model_extracted_dir)
test_encodings = tokenizer(test_texts, truncation=True, padding=True, max_length=args.max_length)
# Create dataset
test_dataset = CustomDataset(test_encodings, test_labels)
# Load model
model = BertForSequenceClassification.from_pretrained(model_extracted_dir)
model.eval()
# Create data loader
test_loader = DataLoader(test_dataset, batch_size=16)
# Perform evaluation
all_preds = []
all_labels = []
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
with torch.no_grad():
for batch in test_loader:
inputs = {key: val.cuda() for key, val in batch.items() if key != 'labels'}
labels = batch['labels'].cuda()
outputs = model(**inputs)
logits = outputs.logits
all_preds.append(logits.cpu().numpy())
all_labels.append(labels.cpu().numpy())
all_preds = np.concatenate(all_preds, axis=0)
all_labels = np.concatenate(all_labels, axis=0)
# Compute metrics
metrics = compute_metrics(all_preds, all_labels)
logging.info(f"Evaluation metrics: {metrics}")
# Save metrics
os.makedirs(args.output_dir, exist_ok=True)
with open(os.path.join(args.output_dir, "evaluation_metrics.json"), "w") as f:
json.dump(metrics, f)
and invocation code:
from sagemaker.huggingface import HuggingFaceProcessor
from sagemaker.workflow.steps import ProcessingInput, ProcessingOutput, ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role
# Define the HuggingFace processor
# https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html#hugging-face-processor
huggingface_processor = HuggingFaceProcessor(
role=role,
transformers_version='4.4.2',
pytorch_version='1.6.0',
#py_version='py36',
instance_count=1,
instance_type=SAGEMAKER_GPU_INSTANCE_TYPE,
code_location = 'src/',
command = ["python3", "evaluate.py"]
)
step_evaluate_bert = ProcessingStep(
name="Newsgroups20-BERT-Evaluate",
processor=huggingface_processor,
inputs=[
ProcessingInput(
source='s3://<s3 bucket path ...>/model.tar.gz',
#source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/input/model"
),
ProcessingInput(
source='s3://<s3 bucket path ...>/output/test/test',
#source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/input/test"
)
],
outputs=[
ProcessingOutput(
output_name="evaluation",
source="/opt/ml/processing/output",
destination=f'{SAGEMAKER_S3_BUCKET}/output/evaluation/'
)
],
code="src/evaluate.py"
)
# Update the pipeline to include the evaluation step
pipeline = Pipeline(
name="Newsgroups20Pipeline-Bert-Process-Train-Evaluate",
#steps=[step_process, step_train, step_evaluate]
steps=[step_evaluate_bert]
)
In order to make troubleshooting simple, I created this Notebook (attached) that results in the same error (no dependency on my code). @qidewenwhen
evaluate.py
import os
if __name__ == '__main__':
# Print all environment variables
for key, value in os.environ.items():
print(f'{key}: {value}')
output logs:
2024-08-05T18:41:42.365-04:00/opt/ml/processing/input/entrypoint/evaluate.py: line 1: import: command not found | /opt/ml/processing/input/entrypoint/evaluate.py: line 1: import: command not found
2024-08-05T18:41:42.365-04:00 | /opt/ml/processing/input/entrypoint/evaluate.py: line 6: syntax error near unexpected token `value'
2024-08-05T18:41:42.365-04:00/opt/ml/processing/input/entrypoint/evaluate.py: line 6: ` for key, value in os.environ.items():'
Hi @solanki-ravi , sorry for the long delay. Did not receive notification.
And thanks so much for the reproduce notebook, which is very helpful!
Given the error, seems the evaluate.py is being interpreted as a shell script rather than a python script. Thus, it's complaining "import: command not found"
I checked the pipeline definition in the notebook.
"Steps": [{
"Name": "Evaluate",
"Type": "Processing",
"Arguments": {
"AppSpecification": {
"ContainerEntrypoint": ["/bin/bash", "/opt/ml/processing/input/entrypoint/evaluate.py"]
},
...
The ContainerEntrypoint starts with /bin/bash. Even if you've explicitly called out python3 in command
huggingface_processor = HuggingFaceProcessor(
role=role,
transformers_version='4.4.2',
pytorch_version='1.6.0',
#py_version='py36',
instance_count=1,
instance_type=SAGEMAKER_GPU_INSTANCE_TYPE,
code_location = 'src/',
command = ["python3", "evaluate.py"] <<<<<<<<<<
)
I looked into the Processor code and found that the HuggingFaceProcessor extends the FrameworkProcessor, which hard-codes the entrypoint command to be /bin/bash
https://github.com/aws/sagemaker-python-sdk/blob/1e679b415301ade807653308701b7fed4eefd9ec/src/sagemaker/processing.py#L1422-L1425
In the _set_entrypoint method, the bash command is used and it also mentions that the user input command is ignored in favor of the self.framework_entrypoint_command
https://github.com/aws/sagemaker-python-sdk/blob/1e679b415301ade807653308701b7fed4eefd9ec/src/sagemaker/processing.py#L1998-L2011
Given the above, seems the framework containers including HuggingFace expects bash scripts only.
As my team does not own these processors, will need the corresponding team for further assistance if you hope to expand the usage for python code. I'll relabel this issue so that it can be redirected to the right team.
For now, we can workaround the issue with HuggingFaceProcessor (FrameworkProcessor) only calling bash entry by invoking python script via a custom uploaded bash script. Since command=['python3'] is ignored and bash is used anyway, we can pass code the custom bash script instead. Just remember to upload your src folder with the entry script (in this case inference_model.py) to s3 beforehand set by src_s3_path:
1 - the bash script
# Write bash script to your src folder that you upload to s3
bash_script_path = os.path.join(os.path.abspath('./src'), 'run_inference.sh')
with open(bash_script_path, 'w') as f:
f.write("#!/bin/bash\npython3 /opt/ml/processing/input/code/src/inference_model.py")
os.chmod(bash_script_path, 0o755) # Make executable
2 - your src folder should now look something like:
src/
inference_model.py
run_inference.sh
3 - upload the src/ dir to s3 and get the resulting s3 path src_s3_path
4 - create the Pipeline step
def inference_model_step(
bucket,
src_s3_path, # you must already have uploaded your src folder to s3, this is the location of it on s3
model_path,
transform_dataset_output,
execution_name_param,
cache_config=CacheConfig(enable_caching=False, expire_after="30d")
):
processor = HuggingFaceProcessor(
role=SAGEMAKER_ROLE_ARN,
instance_count=1,
instance_type="ml.g5.4xlarge",
transformers_version="4.36.0",
pytorch_version="2.1.0",
py_version="py310",
env={'PYTHONPATH': '/opt/ml/processing/input/code'},
base_job_name="inference-model-job"
)
step = ProcessingStep(
name="InferenceModel",
processor=processor,
cache_config=cache_config,
code=os.path.join(src_s3_path,'run_inference.sh'), # Use the bash script instead of Python script
inputs=[
ProcessingInput(
source=model_path,
destination='/opt/ml/processing/input/model',
input_name="model"
),
ProcessingInput(
source=transform_dataset_output,
destination='/opt/ml/processing/input/transform_datasets',
input_name="transform_datasets"
),
ProcessingInput(
source=src_s3_path, # this pulls your src code from the s3 location and places it on the instance
destination='/opt/ml/processing/input/code/src',
input_name='src_dir'
)
],
outputs=[
ProcessingOutput(
output_name="responses",
source='/opt/ml/processing/output/responses',
destination=Join(
on='/',
values=[
f"s3://{bucket}",
'pipeline_outputs/finetuned_model_responses',
execution_name_param
]
)
)
]
)
return step