0

Any pointers on below?

input df: here col1 is of type string

+----------------------------------+
|                              col1|
+----------------------------------+
|[{a:1,g:2},{b:3,h:4},{c:5,i:6}]   |
|[{d:7,j:8},{e:9,k:10},{f:11,l:12}]|
+----------------------------------+

expected output: (again col1 is of type string)

+-------------+
|        col1 |
+-------------+
|  {a:1,g:2}  |
|  {b:3,h:4}  |
|  {c:5,i:6}  |
|  {d:7,j:8}  |
|  {e:9,k:10} |
|  {f:11,l:12}|
+-----+

Thanks!

4 Answers 4

2

You can use the Spark SQL explode function with an UDF :

import spark.implicits._
val df = spark.createDataset(Seq("[{a},{b},{c}]","[{d},{e},{f}]")).toDF("col1")
df.show()

+-------------+
|         col1|
+-------------+
|[{a},{b},{c}]|
|[{d},{e},{f}]|
+-------------+

import org.apache.spark.sql.functions._
val stringToSeq = udf{s: String => s.drop(1).dropRight(1).split(",")}
df.withColumn("col1", explode(stringToSeq($"col1"))).show()

+----+
|col1|
+----+
| {a}|
| {b}|
| {c}|
| {d}|
| {e}|
| {f}|
+----+

Edit: for you new input data, the custom UDF can evolve as above :

val stringToSeq = udf{s: String =>
  val extractor = "[^{]*:[^}]*".r
  extractor.findAllIn(s).map(m => s"{$m}").toSeq
}

new output :

+-----------+
|       col1|
+-----------+
|  {a:1,g:2}|
|  {b:3,h:4}|
|  {c:5,i:6}|
|  {d:7,j:8}|
| {e:9,k:10}|
|{f:11,l:12}|
+-----------+
Sign up to request clarification or add additional context in comments.

2 Comments

Using native Spark functions, like a regexp_replace followed by a split, would most likely perform better than a UDF.
how would this change if this was a little different as: val df = spark.createDataset(Seq("[{a:1,g:2},{b:3,h:4},{c:5,i:6}]","[{d:7,j:8},{e:9,k:10},{f:11,l:12}]")).toDF("col1") to be more precise
2

Spark provides a quite rich trim function which can be used to remove the leading and the trailing chars, [] in your case. As @LeoC already mentioned the required functionality can be implemented through the build-in functions which will perform much better:

import org.apache.spark.sql.functions.{trim, explode, split}

val df = Seq(
  ("[{a},{b},{c}]"),
  ("[{d},{e},{f}]")
).toDF("col1")

df.select(
  explode(
    split(
      trim($"col1", "[]"), ","))).show

// +---+
// |col|
// +---+
// |{a}|
// |{b}|
// |{c}|
// |{d}|
// |{e}|
// |{f}|
// +---+

EDIT:

For the new dataset the logic remains the same with the difference that you need to split with a different character other than ,. You can achieve this using regexp_replace to replace }, with }| in order to be able later to split with | instead of ,:

import org.apache.spark.sql.functions.{trim, explode, split, regexp_replace}

val df = Seq(
  ("[{a:1,g:2},{b:3,h:4},{c:5,i:6}]"),
  ("[{d:7,j:8},{e:9,k:10},{f:11,l:12}]")
).toDF("col1")

df.select(
  explode(
    split(
      regexp_replace(trim($"col1", "[]"), "},", "}|"), // gives: {a:1,g:2}|{b:3,h:4}|{c:5,i:6}
    "\\|")
  )
).show(false)

// +-----------+
// |col        |
// +-----------+
// |{a:1,g:2}  |
// |{b:3,h:4}  |
// |{c:5,i:6}  |
// |{d:7,j:8}  |
// |{e:9,k:10} |
// |{f:11,l:12}|
// +-----------+

Note: with split(..., "\\|") we escape | which is a special regex character.

3 Comments

thanks, but apologies for the confusion, please see the updated example. the split by ',' is causing the issue still
note : This solution is valid for Spark 2.3.x or greater
That is valid @baitmbarek
1

You can do:

val newDF = df.as[String].flatMap(line=>line.replaceAll("\\[", "").replaceAll("\\]", "").split(","))
newDF.show()

Output:

+-----+
|value|
+-----+
|  {a}|
|  {b}|
|  {c}|
|  {d}|
|  {e}|
|  {f}|
+-----+

Just as a note, this process will name the output column as value, that you can easily rename it (if needed), using select, withColumn, etc.

2 Comments

thanks, but apologies for the confusion, please see the updated example. the split by ',' is causing the issue still
Try this: df.select(trim('col1, "[]")).as[String].flatMap(record =>{ val output = record.split("\\{").filter(_.size>0).map(rcd=> "{" + (if(rcd.trim().endsWith(",")) rcd.slice(0, rcd.size-1) else rcd) ) output }).show(truncate=false)
1

Finally what worked:

import spark.implicits._
val df = spark.createDataset(Seq("[{a:1,g:2},{b:3,h:4},{c:5,i:6}]","[{d:7,j:8},{e:9,k:10},{f:11,l:12}]")).toDF("col1")
df.show()

val toStr = udf((value : String) => value.split("},\\{").map(_.toString))
val addParanthesis = udf((value : String) => ("{" + value + "}"))
val removeParanthesis = udf((value : String) => (value.slice(2,value.length()-2)))

import org.apache.spark.sql.functions._
df
.withColumn("col0", removeParanthesis(col("col1")))
.withColumn("col2", toStr(col("col0")))
.withColumn("col3", explode(col("col2")))
.withColumn("col4", addParanthesis(col("col3")))
.show()

output:

+--------------------+--------------------+--------------------+---------+-----------+
|                col1|                col0|                col2|     col3|       col4|
+--------------------+--------------------+--------------------+---------+-----------+
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  a:1,g:2|  {a:1,g:2}|
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  b:3,h:4|  {b:3,h:4}|
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  c:5,i:6|  {c:5,i:6}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...|  d:7,j:8|  {d:7,j:8}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...| e:9,k:10| {e:9,k:10}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...|f:11,l:12|{f:11,l:12}|
+--------------------+--------------------+--------------------+---------+-----------+

1 Comment

I think Too many udfs are not needed for a single string processing

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.