I am using lambda and Python and S3.
# lambda_bootstrap_train.py
import boto3
import time
import json
import os
sm = boto3.client('sagemaker', region_name='us-east-2')
s3 = boto3.client('s3', region_name='us-east-2')
ssm = boto3.client('ssm', region_name='us-east-2')
FUSED_BUCKET = '*********'
MODEL_BUCKET = '*********'
CODE_BUCKET = '*********'
_training_script_uri = None
def get_sagemaker_role():
role = os.environ.get('SAGEMAKER_ROLE_ARN')
if role and role != 'arn:aws:iam::...:role/SageMakerRole':
return role
try:
return ssm.get_parameter(Name='/chainify/sagemaker/role-arn')['Parameter']['Value']
except ssm.exceptions.ParameterNotFound:
raise ValueError("Set SAGEMAKER_ROLE_ARN or SSM /chainify/sagemaker/role-arn")
def upload_training_script():
global _training_script_uri
if _training_script_uri:
return _training_script_uri
script_path = os.path.join(os.path.dirname(__file__), 'lstm_train.py')
if not os.path.exists(script_path):
raise FileNotFoundError("lstm_train.py not in package")
prefix = f"training_code/{int(time.time())}/"
key = f"{prefix}lstm_train.py"
with open(script_path, 'rb') as f:
s3.put_object(Bucket=CODE_BUCKET, Key=key, Body=f)
_training_script_uri = f"s3://{CODE_BUCKET}/{prefix}"
print(f"Uploaded training script to {_training_script_uri}")
return _training_script_uri
def lambda_handler(event, context):
try:
ROLE = get_sagemaker_role()
except ValueError as e:
return {'status': 'error', 'error': str(e)}
symbols = event.get('symbols', [])
if not symbols:
objs = s3.list_objects_v2(Bucket=FUSED_BUCKET, Prefix='fused/')
symbols = [obj['Key'].split('/')[1].split('_')[0] for obj in objs.get('Contents', [])]
print(f"Training LSTM for {len(symbols)} symbols: {symbols}")
code_s3_uri = upload_training_script()
lstm_jobs = []
for symbol in symbols:
job_name = f"lstm-{symbol.lower()}-{int(time.time())}"
try:
config = {
'TrainingJobName': job_name,
'AlgorithmSpecification': {
'TrainingImage': '763104351884.dkr.ecr.us-east-2.amazonaws.com/pytorch-training:2.0-cpu-py310',
'TrainingInputMode': 'File'
},
'RoleArn': ROLE,
'InputDataConfig': [{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3Uri': f"s3://{FUSED_BUCKET}/fused/{symbol}_fused.parquet",
'S3DataType': 'S3Prefix'
}
},
'ContentType': 'application/x-parquet'
}],
'OutputDataConfig': {'S3OutputPath': f"s3://{MODEL_BUCKET}/{symbol}/lstm/"},
'ResourceConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 50},
'StoppingCondition': {'MaxRuntimeInSeconds': 7200},
'HyperParameters': {
'sagemaker_program': 'lstm_train.py',
'sagemaker_submit_directory': code_s3_uri,
'sagemaker_container_log_level': '20',
'epochs': '100',
'seq_len': '60'
},
'Tags': [{'Key': 'project', 'Value': 'chainify-v2'}]
}
sm.create_training_job(**config)
lstm_jobs.append(job_name)
print(f"Started: {job_name}")
except Exception as e:
print(f"Failed {symbol}: {e}")
return {
'status': 'training_started',
'lstm_jobs': lstm_jobs
}
2nd script on s3
# lstm_train.py
import argparse
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
class TSDataset(Dataset):
def __init__(self, df, seq_len=60):
self.seq_len = seq_len
features = [c for c in df.columns if c not in ['label_day', 'label_week']]
self.X = df[features].values
self.y = df[['label_day', 'label_week']].values
def __len__(self):
return len(self.X) - self.seq_len
def __getitem__(self, i):
return torch.FloatTensor(self.X[i:i+self.seq_len]), torch.FloatTensor(self.y[i+self.seq_len])
class LSTM(nn.Module):
def __init__(self, input_size, hidden=128, layers=2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden, layers, batch_first=True)
self.fc = nn.Linear(hidden, 2)
self.sig = nn.Sigmoid()
def forward(self, x):
out, _ = self.lstm(x)
return self.sig(self.fc(out[:, -1]))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', type=int, default=50)
parser.add_argument('--seq_len', type=int, default=60)
parser.add_argument('--model_dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
args = parser.parse_args()
files = [os.path.join(args.training, f) for f in os.listdir(args.training) if f.endswith('.parquet')]
df = pd.concat([pd.read_parquet(f) for f in files])
dataset = TSDataset(df, args.seq_len)
loader = DataLoader(dataset, batch_size=64, shuffle=True)
model = LSTM(dataset.X.shape[1])
opt = torch.optim.Adam(model.parameters())
crit = nn.BCELoss()
for epoch in range(args.epochs):
for x, y in loader:
opt.zero_grad()
loss = crit(model(x), y)
loss.backward()
opt.step()
torch.save(model.state_dict(), f"{args.model_dir}/model.pth")
if __name__ == "__main__":
main()
I receive this message on my training jobs:
Failure reason AlgorithmError: Framework Error: Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/trainer.py", line 88, in train
entrypoint()
File "/opt/conda/lib/python3.10/site-packages/sagemaker_pytorch_container/training.py", line 176, in main
train(environment.Environment())
File "/opt/conda/lib/python3.10/site-packages/sagemaker_pytorch_container/training.py", line 106, in train
entry_point.run(uri=training_environment.module_dir,
File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/entry_point.py", line 96, in run
files.download_and_extract(uri=uri, path=environment.code_dir)
File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/files.py", line 138, in download_and_extract
s3_download(uri, dst)
File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/files.py", line 174, in s3_download
s3.Bucket(bucket).download_file(key, dst)
File "/opt/conda/lib/python3.10/site-packages/boto3/s3/inject.py", line 2