4

I use Spark 1.5.

I have a DataFrame A_DF as follows:

+--------------------+--------------------+
|                  id|        interactions|
+--------------------+--------------------+
|        id1         |30439831,30447866...|
|        id2         |37597858,34499875...|
|        id3         |30447866,32896718...|
|        id4         |33029476,31988037...|
|        id5         |37663606,37627579...|
|        id6         |37663606,37627579...|
|        id7         |36922232,37675077...|
|        id8         |37359529,37668820...|
|        id9         |37675077,37707778...|
+--------------------+--------------------+

where interactions is a String. I want to explode this by first splitting the interactions string into a set of substrings split by a comma which I try to do as follows:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))

but I am getting the following error:

error: missing arguments for method explode in class DataFrame;
follow this method with `_' if you want to treat it as a partially applied function A_DF.explode(splitArr($"interactions"))

which I don't understand. So I tried something even more complicated:

val B_DF = A_DF.explode($"interactions") { case (Row(interactions: String) =>
        interactions.split(",").map(_.trim))
     }

to which I am getting an inspection warning, that reads:

Expression of Type Array[String] does not conform to expected type TraversableOnce[A_]

Any ideas?

1
  • 1
    The explode is not taking UDF, it's just a normal function. It should be more like this: A_DF.explode("interactions", "interaction") { (s: String) => s.split(",").map(_.trim) } Commented Dec 2, 2016 at 11:54

1 Answer 1

6

Dataset.explode is deprecated as of Spark 2.0.0. Unless you have a reason, stay away from it. You've been warned.

If you do have a reason to use DataFrame.explode, see the signatures:

explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]): DataFrame

explode[A <: Product](input: Column*)(f: (Row) ⇒ TraversableOnce[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame

In either case, explode uses two parameter groups and hence the first error.

(This is Spark 2.1.0-SNAPSHOT)

scala> spark.version
res1: String = 2.1.0-SNAPSHOT

scala> val A_DF = Seq(("id1", "30439831,30447866")).toDF("id", "interactions")
A_DF: org.apache.spark.sql.DataFrame = [id: string, interactions: string]

scala> A_DF.explode(split($"interactions", ","))
<console>:26: error: missing argument list for method explode in class Dataset
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `explode _` or `explode(_)(_)(_)` instead of `explode`.
       A_DF.explode(split($"interactions", ","))
                   ^

You could do it as follows (note the warning which is about deprecation of explode as I use 2.1.0-SNAPSHOT):

scala> A_DF.explode[String, String]("interactions", "parts")(_.split(",")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+--------+
| id|     interactions|   parts|
+---+-----------------+--------+
|id1|30439831,30447866|30439831|
|id1|30439831,30447866|30447866|
+---+-----------------+--------+

You could use the other explode as follows:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> case class Interaction(id: String, part: String)
defined class Interaction

scala> A_DF.explode[Interaction]($"id", $"interactions") { case Row(id: String, ins: String) => ins.split(",").map { it => Interaction(id, it) } }.show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+---+--------+
| id|     interactions| id|    part|
+---+-----------------+---+--------+
|id1|30439831,30447866|id1|30439831|
|id1|30439831,30447866|id1|30447866|
+---+-----------------+---+--------+

Use explode function instead and you should be fine as described in the scaladoc (quoted below):


Given that this is deprecated, as an alternative, you can explode columns either using functions.explode():

ds.select(explode(split('words, " ")).as("word"))

or flatMap():

ds.flatMap(_.words.split(" "))

You could then use explode function as follows:

A_DF.select($"id", explode(split('interactions, ",") as "part"))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.