0

Follow-up to this question

I have JSON streaming data in the format same as below

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

I need to transform it to the format below

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

To achieve this performed the transformations as suggested to the previous question.

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

Now I need to save the data to a ElasticSearch.

 df6.writeStream
  .outputMode("complete")
  .format("es")
  .option("es.resource", "index/type")
  .option("es.nodes", "localhost")
  .option("es.port", 9200)
  .start()
  .awaitTermination()

I get an error that ElasticSearch doesn't support Append output mode. On Append mode it fails write to writeStream with aggregation cannot be done on Append mode. I was able to write to console on complete mode. How can I write the data to ElasticSearch now

Any help will be appreciated.

1 Answer 1

1

There is no need for pivot or aggregation here. If B column is indeed Array[Map[String, String]] (array<map<string, string>> in SQL types), all you need is a simple select or withColumn:

df
  .withColumn("B", explode($"B"))
  .select($"A", $"B"("C") as "C", $"B"("D") as "D")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.