This is how my input data looks like,
20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000
And i want to convert the same to an keyValue RDD, where key is an Integer and Value is an JSON object and the purpose is to write the same to ElasticSearch
(
2024270, {
"metrics": {
"date" : 20170201,
"style_id" : 1234709,
"revenue" : 1000,
"list_count" : 1000,
"pdp_count" : 1000,
"add_to_cart_count" : 1000
}
}
)
In Python, I am able to do the same using the below piece of code,
metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)
def format_metrics(line):
tokens = line.split('^')
try:
return (tokens[1], {
'metrics': {
'date': tokens[0],
'mrp': float(tokens[2]),
'revenue': float(tokens[3]),
'quantity': int(tokens[4]),
'product_discount': float(tokens[5]),
'coupon_discount': float(tokens[6]),
'total_discount': float(tokens[7]),
'list_count': int(tokens[8]),
'add_to_cart_count': int(tokens[9]),
'pdp_count': int(tokens[10])
}
}) if len(tokens) > 1 else ('', dict())
But am not able to figure it out how to achieve the same in Scala and am a newbie to Scala, I managed to get the below output, but not able to wrap the JSON into "metrics" block, any pointers would be really helpful ?
ordersDF.withColumn("key", $"style_id")
.withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
.select("key", "json")
.show(false)
// Exiting paste mode, now interpreting.
+-------+-------------------------------------------------+
|key |json |
+-------+-------------------------------------------------+
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}|
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}|
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}|
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}|
+-------+-------------------------------------------------+
jsoncolumn asmetricsand calltoJSONon the resulting Dataframe.toJSONdoesn't provide the exact required output anyway.