2

I am new to the spark and unable to figure out the solution for the following problem.

I have a JSON file to parse and then create a couple of metrics and write the data back into the JSON format.

now following is my code I am using

import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._

object quick2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder
      .appName("quick1")
      .master("local[*]")
      .getOrCreate()

    val rawData = spark.read.json("/home/umesh/Documents/Demo2/src/main/resources/sampleQuick.json")

    val mat1 = rawData.select(rawData("mal_name"),rawData("cust_id")).distinct().orderBy("cust_id").toJSON.cache()
    val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).toJSON.cache()

val write1 = mat1.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat1/")

val write = mat2.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat2/")
}
}

Now above code is writing the proper json format. However, matrices can contain duplicate result as well example:

md5   mal_name
1       a
1       b
2       c
3       d
3       e

so with above code every object is getting written in single line

like this

{"file_md5":"1","mal_name":"a"}
{"file_md5":"1","mal_name":"b"}
{"file_md5":"2","mal_name":"c"}
{"file_md5":"3","mal_name":"d"}

and so on.

but I want to combine the data of common keys:

so the output should be

{"file_md5":"1","mal_name":["a","b"]}

can somebody please suggest me what shall I do here. Or if there is any other better way to approach this problem.

Thanks!

2 Answers 2

5
  1. You can use collect_list or collect_set as per your need on mal_name column
  2. You can directly save DataFrame/DataSet directly as JSON file
import org.apache.spark.sql.functions.{alias, collect_list}
import spark.implicits._

rawData.groupBy($"file_md5")
  .agg(collect_set($"mal_name").alias("mal_name"))
  .write
  .format("json")
  .save("json/file/location/to/save")
Sign up to request clarification or add additional context in comments.

Comments

0

as wrote by @mrsrinivas I changed my code as per below

val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).cache()
val labeledDf = mat2.toDF("file_md5","mal_name")
labeledDf.groupBy($"file_md5").agg(collect_list($"mal_name")).coalesce(1).write.format("json").save("/home/umesh/Documents/Demo2/src/test/run8/")

Keeping this quesion open for some more suggestions if any.

2 Comments

.agg(collect_list($"mal_name").alias("mal_name")) should save you from creating a new column.
hi, tried alias did not work gave an error and when I tried with "as" it worked. val labeledDf = mat1.groupBy($"file_md5").agg(collect_set($"mal_name").as("mal_name"))

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.