-4

I am using lambda and Python and S3.

# lambda_bootstrap_train.py
import boto3
import time
import json
import os

sm = boto3.client('sagemaker', region_name='us-east-2')
s3 = boto3.client('s3', region_name='us-east-2')
ssm = boto3.client('ssm', region_name='us-east-2')

FUSED_BUCKET = '*********'
MODEL_BUCKET = '*********'
CODE_BUCKET = '*********'
_training_script_uri = None

def get_sagemaker_role():
    role = os.environ.get('SAGEMAKER_ROLE_ARN')
    if role and role != 'arn:aws:iam::...:role/SageMakerRole':
        return role
    try:
        return ssm.get_parameter(Name='/chainify/sagemaker/role-arn')['Parameter']['Value']
    except ssm.exceptions.ParameterNotFound:
        raise ValueError("Set SAGEMAKER_ROLE_ARN or SSM /chainify/sagemaker/role-arn")

def upload_training_script():
    global _training_script_uri
    if _training_script_uri:
        return _training_script_uri

    script_path = os.path.join(os.path.dirname(__file__), 'lstm_train.py')
    if not os.path.exists(script_path):
        raise FileNotFoundError("lstm_train.py not in package")

    prefix = f"training_code/{int(time.time())}/"
    key = f"{prefix}lstm_train.py"

    with open(script_path, 'rb') as f:
        s3.put_object(Bucket=CODE_BUCKET, Key=key, Body=f)

    _training_script_uri = f"s3://{CODE_BUCKET}/{prefix}"
    print(f"Uploaded training script to {_training_script_uri}")
    return _training_script_uri

def lambda_handler(event, context):
    try:
        ROLE = get_sagemaker_role()
    except ValueError as e:
        return {'status': 'error', 'error': str(e)}

    symbols = event.get('symbols', [])
    if not symbols:
        objs = s3.list_objects_v2(Bucket=FUSED_BUCKET, Prefix='fused/')
        symbols = [obj['Key'].split('/')[1].split('_')[0] for obj in objs.get('Contents', [])]

    print(f"Training LSTM for {len(symbols)} symbols: {symbols}")

    code_s3_uri = upload_training_script()

    lstm_jobs = []
    for symbol in symbols:
        job_name = f"lstm-{symbol.lower()}-{int(time.time())}"
        try:
            config = {
                'TrainingJobName': job_name,
                'AlgorithmSpecification': {
                    'TrainingImage': '763104351884.dkr.ecr.us-east-2.amazonaws.com/pytorch-training:2.0-cpu-py310',
                    'TrainingInputMode': 'File'
                },
                'RoleArn': ROLE,
                'InputDataConfig': [{
                    'ChannelName': 'training',
                    'DataSource': {
                        'S3DataSource': {
                            'S3Uri': f"s3://{FUSED_BUCKET}/fused/{symbol}_fused.parquet",
                            'S3DataType': 'S3Prefix'
                        }
                    },
                    'ContentType': 'application/x-parquet'
                }],
                'OutputDataConfig': {'S3OutputPath': f"s3://{MODEL_BUCKET}/{symbol}/lstm/"},
                'ResourceConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 50},
                'StoppingCondition': {'MaxRuntimeInSeconds': 7200},
                'HyperParameters': {
                    'sagemaker_program': 'lstm_train.py',
                    'sagemaker_submit_directory': code_s3_uri,
                    'sagemaker_container_log_level': '20',
                    'epochs': '100',
                    'seq_len': '60'
                },
                'Tags': [{'Key': 'project', 'Value': 'chainify-v2'}]
            }

            sm.create_training_job(**config)
            lstm_jobs.append(job_name)
            print(f"Started: {job_name}")
        except Exception as e:
            print(f"Failed {symbol}: {e}")

    return {
        'status': 'training_started',
        'lstm_jobs': lstm_jobs
    }

2nd script on s3

# lstm_train.py
import argparse
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class TSDataset(Dataset):
    def __init__(self, df, seq_len=60):
        self.seq_len = seq_len
        features = [c for c in df.columns if c not in ['label_day', 'label_week']]
        self.X = df[features].values
        self.y = df[['label_day', 'label_week']].values

    def __len__(self): 
        return len(self.X) - self.seq_len

    def __getitem__(self, i):
        return torch.FloatTensor(self.X[i:i+self.seq_len]), torch.FloatTensor(self.y[i+self.seq_len])

class LSTM(nn.Module):
    def __init__(self, input_size, hidden=128, layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden, layers, batch_first=True)
        self.fc = nn.Linear(hidden, 2)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.sig(self.fc(out[:, -1]))

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=50)
    parser.add_argument('--seq_len', type=int, default=60)
    parser.add_argument('--model_dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    args = parser.parse_args()

    files = [os.path.join(args.training, f) for f in os.listdir(args.training) if f.endswith('.parquet')]
    df = pd.concat([pd.read_parquet(f) for f in files])

    dataset = TSDataset(df, args.seq_len)
    loader = DataLoader(dataset, batch_size=64, shuffle=True)

    model = LSTM(dataset.X.shape[1])
    opt = torch.optim.Adam(model.parameters())
    crit = nn.BCELoss()

    for epoch in range(args.epochs):
        for x, y in loader:
            opt.zero_grad()
            loss = crit(model(x), y)
            loss.backward()
            opt.step()

    torch.save(model.state_dict(), f"{args.model_dir}/model.pth")

if __name__ == "__main__":
    main()

I receive this message on my training jobs:

Failure reason AlgorithmError: Framework Error: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/trainer.py", line 88, in train      
    entrypoint() 
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_pytorch_container/training.py", line 176, in main 
    train(environment.Environment()) 
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_pytorch_container/training.py", line 106, in train
    entry_point.run(uri=training_environment.module_dir,
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/entry_point.py", line 96, in run 
    files.download_and_extract(uri=uri, path=environment.code_dir)
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/files.py", line 138, in download_and_extract 
    s3_download(uri, dst) 
  File "/opt/conda/lib/python3.10/site-packages/sagemaker_training/files.py", line 174, in s3_download 
    s3.Bucket(bucket).download_file(key, dst)
  File "/opt/conda/lib/python3.10/site-packages/boto3/s3/inject.py", line 2

1 Answer 1

-1

This was a simple fix. the s3 file needed to be tar.gz as required by sagemaker

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.