0

I am new to AWS Glue and I am facing performance issues with the following code:

spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")

# Define S3 path with wildcard to match all .ind files
s3_ind_path = f"s3a://{source_bucket}/{source_prefix}/*.ind"

# Read all .ind files as whole text
whole_df = spark.sparkContext.wholeTextFiles(s3_ind_path).toDF(["path", "content"])

# Step 1: Split the raw content into key-value string array
whole_df = whole_df.withColumn("kv_pairs", split(col("content"), "\\|"))

# Step 2: Convert array of key:value strings into a map
whole_df = whole_df.withColumn(
    "fields_map",
    map_from_entries(
        expr("transform(kv_pairs, x -> struct(split(x, ':', 2)[0] as key, split(x, ':', 2)[1] as value))")
    )
)
# Step 3: Extract multiple fields from the map
fields_to_extract = [
    "SSN_TIN", "TIDTYPE", "FNAME", "MNAME", "LNAME", "ENTNAME", "BRK_ACCT",
    "DIR_ACCT", "NONBRKAC", "SPONSOR", "REP_ID", "RR2", "REG_TY", "LOB",
    "PROC_DTE", "DT_REC", "SCANDTS", "XTRACID", "NASU_ID", "DC_SRCRF",
    "CHK_AMT", "CHK_NUML", "DEPOSDT", "RCVDDATE", "STK_CNUM", "STK_SHR", "DOC-TY-ID"
]
for field in fields_to_extract:
    whole_df = whole_df.withColumn(field, col("fields_map").getItem(field))
# Optional: Drop intermediate columns if not needed
whole_df = whole_df.drop("kv_pairs", "fields_map","content")


client_df=whole_df.select("SSN_TIN", "TIDTYPE", "fname","lname","entname")


client_df.cache()
print(f"Total rows in client_df: {client_df.count()}")


print("converting clients to dynamic frame")
client_dynamic_frame = DynamicFrame.fromDF(client_df, glueContext, "dynamic_frame")
print("Inserting clients")

glueContext.write_dynamic_frame.from_options( 
frame=client_dynamic_frame,
connection_type="JDBC",
connection_options={
    "connectionName": connection_name,
    "database": tgt_database,
    "dbtable": "staging.bpm_migration_client1",
    "useConnectionProperties": "true"
}
)

print("client insert complete")

I am reading 100000 thousand files from an S3 bucket and writing the content to a PostgreSQL db. This is taking more than an hour to complete. Is there any way I can make it faster? Any help will be appreciated.

This is a sample data in each file:

~FIMSID:2461696948|DOC_TYPE_ID:000000869|DOC_TYPE_NAME:Annuity Applic Enroll Forms|REP_ID:TEST|ROR:111|SCANDTS:12/4/2018|SPONSOR:H|SSN_TIN:123456789|PROC_DTE:2018-12-05|SCANDATE:2018-12-05|DOCSIZE:+000048020|NUMPAGES:+000000001|SRC_SYS:+000000213|BPMWRK:ANNUITIES - FL|DC_SRCRF:234909631_FL_198585_Allianz_Annuity_Application_Details_Allianz_Annuity_Application_Detail.pdf|DIR_ACCT:3333333333|DOC_CLS:DIRECT ANNUITY|DOC_ORG:ELECTRONIC|DT_REC:12/4/2018 8:12:10 PM|FNAME:FTST|XXXX:12345|KKKKK:123456332|LNAME:LTST|LOB:ANNUITY|MAILTYPE:ELECTRONIC|MIMETYPE:application/pdf|REPFNAME:TSTR|REPLNAME:TSTL|REPRGN:REGION 2|REPSTAT:REGULAR|SPCLHAND:ARCHIVEONLY|SRCSYSHD:FIRELIGHT|TIDTYPE:SSN||||IMAGEFILE:1234562.pdf|ANNOTS||~
3
  • 1
    This question is far too broad, more so because you provided no information on the nature and size of the data. It's possible that an hour is to be expected due to size, or that massive optimisation is possible due to very sparse data, for example. Is there a reason you're using wholeTextFiles and not a more efficient way to read? You select fields one at a time, and don't seem to be using all of them - restrict what you're reading and do it in one shot instead of a loop. Don't use caching if you're only writing through - it just adds the overhead of a cache. Commented Aug 24 at 23:20
  • @Grismar. Updated the question with sample data. This is part of my code. Remaining fields will be referenced later in the code. Thanks! Commented Aug 25 at 2:25
  • 1. Just use Polars or DuckDB, Why are you using Spark for such simple task? 2. The problem is your PG DB, not Spark, did you run this without writing to DB part? Commented Aug 27 at 10:51

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.