1

I am trying to break a string (technically strings passed from a column in a dataframe) and return these broken strings as list to dataframe. Scala version 2.11. I would prefer scala or pyspark solutions with udf's - because there is a lot happening inside the udf.

Let us say that I have a dataframe:

val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")

The result I want (in a udf, because there is a lot happening in there; Scala version 2.11) --

 A       B
123    ((a, b, c),
        (d, e, f),
        (x, y, z))
124    ((g, h, i),
        (j, k, l), 
        (m, n, o))

Write a udf to break this and return lists - but, I do not know how to define or pass schema to get the results back into the dataframe as three columns.

def testUdf =  udf( (s: String) => { 
  val a = s.split("\\*").take(3).toList
  val b = s.split("\\*").drop(3).take(3).toList
  val c = s.split("\\*").drop(6).take(3).toList
  val abc = (a, b, c).zipped.toList.asInstanceOf[List[String]]
  // println (abc) // This does not work
} )
val df2 = df.select($"A", testUdf($"B").as("B")) // does not work because of type mismatch. 

I tried doing this - but, I do not know how to pass schema to the Udf above:

   val schema = StructType(List(
     StructField("C1", StringType),
     StructField("C2", StringType),
     StructField("C3", StringType)
   ))

Also, following this, I hope to follow the procedure outlined on Explode multiple columns in Spark SQL table to explode the dataframe.

Help would be greatly appreciated.

3 Answers 3

2

The udf you defined is a from String to Unit - remove abc from the last line to return it Also note that asInstanceOf[] doesn't change the type - you still have a tuple The below will give you a list of lists

def testUdf =  udf( (s: String) => { 
  val a = s.split("\\*").take(3).toList
  val b = s.split("\\*").drop(3).take(3).toList
  val c = s.split("\\*").drop(6).take(3).toList
  (a, b, c).zipped.toList.map(t=>List(t._1,t._2,t._3))
} )
Sign up to request clarification or add additional context in comments.

1 Comment

I did add "abc" at the end as shown. The statement for dataframe, df2, executes -- but df2.show -- causes some errors ---scala.MatchError: (a,d,x) (of class scala.Tuple3). Without the last "abc", it causes java.lang.UnsupportedOperationException: Schema for type Unit is not supported
2

The way you generated arrays prior to zipped won't render the elements correctly. One way to generate the elements in the wanted order is to use a 2-dimensional Array to pre-transpose the elements before applying zipped.

The following UDF will 1) split a string column into an array which gets transposed into a 2-D array, 2) zip the rows of the 2-D array into array of tuples, and 3) convert the array of tuples to a tuple of tuples (i.e. column type struct of structs):

val df = Seq(
  ("123", "a*b*c*d*e*f*x*y*z"),
  ("124", "g*h*i*j*k*l*m*n*o")
).toDF("A", "B")

import org.apache.spark.sql.functions._

def splitUdf = udf( (s: String) => {
  val arr = s.split("\\*")
  val arr2d = Array.ofDim[String](3, 3)

  for {
    r <- 0 until 3
    c <- 0 until 3
  } arr2d(r)(c) = arr(c * 3 + r)

  val arrTup = (arr2d(0), arr2d(1), arr2d(2)).zipped.toArray

  (arrTup(0), arrTup(1), arrTup(2))
} )

val df2 = df.select($"A", splitUdf($"B").as("B"))

df2.show(false)
// +---+-------------------------+
// |A  |B                        |
// +---+-------------------------+
// |123|[[a,b,c],[d,e,f],[x,y,z]]|
// |124|[[g,h,i],[j,k,l],[m,n,o]]|
// +---+-------------------------+

Comments

1

The problem ist that your UDF returns Unit (last statement is return value). I would suggest the following procedure:

val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")

def testUdf = udf((s: String) => {
  val Array(s1, s2, s3, s4, s5, s6, s7, s8, s9) = s.split(s"\\*")
  Seq(
    (s1, s2, s3),
    (s4, s5, s6),
    (s7, s8, s9)
  )
})

val df2 = df.select($"A", explode(testUdf($"B")).as("B"))

df2.show()

+---+-------+
|  A|      B|
+---+-------+
|123|[a,b,c]|
|123|[d,e,f]|
|123|[x,y,z]|
|124|[g,h,i]|
|124|[j,k,l]|
|124|[m,n,o]|
+---+-------+

3 Comments

I appreciate the answer. The major issue is that I cannot get to where I can use explode. When I use the solution given by Arnon - I get Schema for type Any not supported.
@Shiva And what does not work if you use my solution?
From the original dataframe (given on the first line), I cannot get to the dataframe that you use. When I use Arnon's answer, I get "Schema for type Unit is not supported" -- not in this problem, but, in the actual dataset that I am using. I am getting List[List[Any]] and I cannot pass it out of the udf to the dataframe (before explode can work).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.