I am using Spark SQL in AWS Glue script to transform some data in S3. Here is the script logic
Data Format CSV
Programming Language: Python
1) Pull the data from S3 using Glue’s Catalog into Glue’s DynamicDataFrame
2) Extract the Spark Data Frame from Glue’s Data frame using toDF()
3) Make the Spark Data Frame Spark SQL Table
createOrReplaceTempView()
4) Use SQL query to transform (Here is where I am having issues)
5) Convert the final data frame to Glue Dynamic Data Frame
6) Store final Data Frame into S3 using
glueContext.write_dynamic_frame.from_options()
Problem
When I am using comparison in SQL such as WHERE >
or
(case when <some_columns> > <some int> then 1 else 0 end) as <some_newcol>
I am getting the following error
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` >
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` >
100000)' (struct<int:int,string:string> and int).; line 1 pos 35;\n'Project
['demand_amt]\n+- 'Filter (cxvalue#4 > 100000)\n +- SubqueryAlias sales\n +-
LogicalRDD [sales_id#0, customer_name#1, customer_loc#2, demand_amt#3L,
cxvalue#4]\n"
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` =
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` =
100000)' (struct<int:int,string:string> and int).; line 1 pos 33;\n'Project
[customer_name#1, CASE WHEN (cxvalue#4 = 100000) THEN demand_amt#3 ELSE 0 END AS
small#12, CASE WHEN cxvalue#4 IN (200000,300000,400000) THEN demand_amt#3 ELSE 0
END AS medium#13]\n+- SubqueryAlias sales\n +- LogicalRDD [sales_id#0,
customer_name#1, customer_loc#2, demand_amt#3, cxvalue#4]\n"
This tells me it is considering a colums as both numeric and string and this is specific to Spark and not AWS. SUM() GROUP BY works fine only comparision
I have tried the following steps
1) Tried to change the column type using Spark method - Failed
df=df.withColumn(<column> df[<columns>].cast(DoubleType())) # df is Spark Data
111
Glue does not allow to change the data type of spark data frame column type
2) Used Glue’s resoveChoice method as explained in https://github.com/aws-samples/aws-gluesamples/ blob/master/examples/resolve_choice.md . resolveChoice method worked - but sql Failed with the same error
3) Used cast(<columns> as <data_type>) in SQL query – Failed
4) Spun up Spark Cluster on my Google Cloud (Just to ensure nothing AWS related). Used Spark only with same above logic – Failed with the same error
5) On same Spark cluster and same data set used the same logic but enforced schema using StructType and StructField
while creating a new Spark data frame – Passed
Here is the Sample Data
+--------+-------------+------------+----------+-------+
|sales_id|customer_name|customer_loc|demand_amt|cxvalue|
+--------+-------------+------------+----------+-------+
| 1| ABC| Denver CO| 1200| 300000|
| 2| BCD| Boston MA| 212| 120000|
| 3| CDE| Phoenix AZ| 332| 100000|
| 4| BCD| Boston MA| 211| 120000|
| 5| DEF| Portland OR| 2121|1000000|
| 6| CDE| Phoenix AZ| 32| 100000|
| 7| ABC| Denver CO| 3227| 300000|
| 8| DEF| Portland OR| 2121|1000000|
| 9| BCD| Boston MA| 21| 120000|
| 10| ABC| Denver CO| 1200|300000 |
+--------+-------------+------------+----------+-------+
These are sample code and queries where things fail
sdf_sales.createOrReplaceTempView("sales")
tbl1="sales"
sql2="""select customer_name, (case when cxvalue < 100000 then 1 else 0) as small,
(case when cxvalue in (200000, 300000, 400000 ) then demand_amt else 0 end) as medium
from {0}
""".format(tbl1)
sql4="select demand_amt from {0} where cxvalue>100000".format(tbl1)
However, these queries work great with successful Glue Job
sql3="""select customer_name, sum(demand_amt) as total_spent from {0} GROUP BY customer_name""".format(tbl1)
Challenge: Wish glue somehow allowed me to change Spark Dataframe schema. Any suggestion will be appreciated.