I am new to AWS Glue and I am facing performance issues with the following code:
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
# Define S3 path with wildcard to match all .ind files
s3_ind_path = f"s3a://{source_bucket}/{source_prefix}/*.ind"
# Read all .ind files as whole text
whole_df = spark.sparkContext.wholeTextFiles(s3_ind_path).toDF(["path", "content"])
# Step 1: Split the raw content into key-value string array
whole_df = whole_df.withColumn("kv_pairs", split(col("content"), "\\|"))
# Step 2: Convert array of key:value strings into a map
whole_df = whole_df.withColumn(
"fields_map",
map_from_entries(
expr("transform(kv_pairs, x -> struct(split(x, ':', 2)[0] as key, split(x, ':', 2)[1] as value))")
)
)
# Step 3: Extract multiple fields from the map
fields_to_extract = [
"SSN_TIN", "TIDTYPE", "FNAME", "MNAME", "LNAME", "ENTNAME", "BRK_ACCT",
"DIR_ACCT", "NONBRKAC", "SPONSOR", "REP_ID", "RR2", "REG_TY", "LOB",
"PROC_DTE", "DT_REC", "SCANDTS", "XTRACID", "NASU_ID", "DC_SRCRF",
"CHK_AMT", "CHK_NUML", "DEPOSDT", "RCVDDATE", "STK_CNUM", "STK_SHR", "DOC-TY-ID"
]
for field in fields_to_extract:
whole_df = whole_df.withColumn(field, col("fields_map").getItem(field))
# Optional: Drop intermediate columns if not needed
whole_df = whole_df.drop("kv_pairs", "fields_map","content")
client_df=whole_df.select("SSN_TIN", "TIDTYPE", "fname","lname","entname")
client_df.cache()
print(f"Total rows in client_df: {client_df.count()}")
print("converting clients to dynamic frame")
client_dynamic_frame = DynamicFrame.fromDF(client_df, glueContext, "dynamic_frame")
print("Inserting clients")
glueContext.write_dynamic_frame.from_options(
frame=client_dynamic_frame,
connection_type="JDBC",
connection_options={
"connectionName": connection_name,
"database": tgt_database,
"dbtable": "staging.bpm_migration_client1",
"useConnectionProperties": "true"
}
)
print("client insert complete")
I am reading 100000 thousand files from an S3 bucket and writing the content to a PostgreSQL db. This is taking more than an hour to complete. Is there any way I can make it faster? Any help will be appreciated.
This is a sample data in each file:
~FIMSID:2461696948|DOC_TYPE_ID:000000869|DOC_TYPE_NAME:Annuity Applic Enroll Forms|REP_ID:TEST|ROR:111|SCANDTS:12/4/2018|SPONSOR:H|SSN_TIN:123456789|PROC_DTE:2018-12-05|SCANDATE:2018-12-05|DOCSIZE:+000048020|NUMPAGES:+000000001|SRC_SYS:+000000213|BPMWRK:ANNUITIES - FL|DC_SRCRF:234909631_FL_198585_Allianz_Annuity_Application_Details_Allianz_Annuity_Application_Detail.pdf|DIR_ACCT:3333333333|DOC_CLS:DIRECT ANNUITY|DOC_ORG:ELECTRONIC|DT_REC:12/4/2018 8:12:10 PM|FNAME:FTST|XXXX:12345|KKKKK:123456332|LNAME:LTST|LOB:ANNUITY|MAILTYPE:ELECTRONIC|MIMETYPE:application/pdf|REPFNAME:TSTR|REPLNAME:TSTL|REPRGN:REGION 2|REPSTAT:REGULAR|SPCLHAND:ARCHIVEONLY|SRCSYSHD:FIRELIGHT|TIDTYPE:SSN||||IMAGEFILE:1234562.pdf|ANNOTS||~
wholeTextFilesand not a more efficient way to read? You select fields one at a time, and don't seem to be using all of them - restrict what you're reading and do it in one shot instead of a loop. Don't use caching if you're only writing through - it just adds the overhead of a cache.