3

This is how my input data looks like,

20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000

And i want to convert the same to an keyValue RDD, where key is an Integer and Value is an JSON object and the purpose is to write the same to ElasticSearch

( 
2024270, {
"metrics": {
  "date" : 20170201,
  "style_id" : 1234709,
  "revenue" : 1000,
  "list_count" : 1000,
  "pdp_count" : 1000,
  "add_to_cart_count" : 1000
 }
}
)

In Python, I am able to do the same using the below piece of code,

metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)


def format_metrics(line):
    tokens = line.split('^')
    try:
        return (tokens[1], {
                    'metrics': {
                        'date': tokens[0],
                        'mrp': float(tokens[2]),
                        'revenue': float(tokens[3]),
                        'quantity': int(tokens[4]),
                        'product_discount': float(tokens[5]),
                        'coupon_discount': float(tokens[6]),
                        'total_discount': float(tokens[7]),
                        'list_count': int(tokens[8]),
                        'add_to_cart_count': int(tokens[9]),
                        'pdp_count': int(tokens[10])
                    }
                }) if len(tokens) > 1 else ('', dict())

But am not able to figure it out how to achieve the same in Scala and am a newbie to Scala, I managed to get the below output, but not able to wrap the JSON into "metrics" block, any pointers would be really helpful ?

ordersDF.withColumn("key", $"style_id")
        .withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "json")
        .show(false)

// Exiting paste mode, now interpreting.

+-------+-------------------------------------------------+
|key    |json                                             |
+-------+-------------------------------------------------+
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}|
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}|
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}|
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}|
+-------+-------------------------------------------------+
5
  • Want to convert it on key/value RDD because I want to join it with an other RDD (dimensional info) which will also contain data in json format and then write the output to ES Commented Sep 11, 2017 at 4:39
  • 1
    Rename the json column as metrics and call toJSON on the resulting Dataframe. Commented Sep 11, 2017 at 8:52
  • Thanks @philantrovert it worked Commented Sep 11, 2017 at 11:55
  • @RajivChodisetti,you should have let philantrovert answer the question. Commented Sep 11, 2017 at 12:49
  • @RameshMaharjan It's alright. toJSON doesn't provide the exact required output anyway. Commented Sep 11, 2017 at 13:29

1 Answer 1

2

I tried what @philantrovert has suggested and it worked.

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv")
ordersDF: org.apache.spark.sql.DataFrame = [date: int, style_id: int ... 9 more fields]

scala> :paste
// Entering paste mode (ctrl-D to finish)

ordersDF.withColumn("key", $"style_id")
        .withColumn("metrics", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "metrics")
        .toJSON
        .show(false)

// Exiting paste mode, now interpreting.

+-----------------------------------------------------------------------------------+
|value                                                                              |
+-----------------------------------------------------------------------------------+
|{"key":2024270,"metrics":"{\"date\":20170101,\"style_id\":2024270,\"mrp\":1000.0}"}|
|{"key":2024333,"metrics":"{\"date\":20170101,\"style_id\":2024333,\"mrp\":1000.0}"}|
|{"key":2023709,"metrics":"{\"date\":20170101,\"style_id\":2023709,\"mrp\":1000.0}"}|
|{"key":1234709,"metrics":"{\"date\":20170201,\"style_id\":1234709,\"mrp\":1000.0}"}|
+-----------------------------------------------------------------------------------+

I have also tried an other way using Json4s library and that also worked,

def convertRowToJSON(row: Row) = {

    val json =
    ("metrics" ->
      ("date" -> row(1).toString) ~
      ("style_id" -> row.getInt(1)) ~
      ("mrp" -> row.getFloat(2)) ~
      ("revenue" -> row.getFloat(3)) ~
      ("quantity" -> row.getInt(1)) ~
      ("product_discount" -> row.getFloat(3)) ~
      ("coupon_discount" -> row.getFloat(3)) ~
      ("total_discount" -> row.getFloat(3)) ~
      ("list_count" -> row.getInt(1)) ~
      ("add_to_cart_count" -> row.getInt(1)) ~
      ("pdp_count" -> row.getInt(1))
      )
    (row.getInt(1),compact(render(json)).toString)
}

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv").map(convertRowToJSON)
ordersDF: org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

scala> ordersDF.show(false)
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1     |_2                                                                                                                                                                                                                                                |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}|
|2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}|
|2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}|
|1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

json4s is pretty good too. Nice answer. You can accept your own answer and close this question.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.