1

I have data set with one of the field containing array as below:

{ "name" : "James", "subjects" : [ "english", "french", "botany" ] },
{ "name" : "neo", "subjects" : [ "english", "physics" ] },
{ "name" : "john", "subjects" : [ "spanish", "mathematics" ] }

Now i want to filter using Dataset.filter function by passing Column object. I tried isin function of Column and array_contains function of functions but did not work.

Is there a way to create Column object that will filter the dataset where an array field contains one of the values?

1
  • What did you try exactly? Why do you want to use a Column-based expression? Commented Apr 30, 2017 at 15:48

3 Answers 3

4

There are multiple ways to do this--once you've imported Encoders implicitly:

import sparkSession.implicits._

First, you can turn your DataFrame, which is a DataSet[Row], into a strongly typed DataSet[Student], which allows you to use familiar (at least if you know Scala) Scala idioms:

case class Student(name: String, subjects: Seq[String])

sparkSession.read.json("my.json")
    .as[Student]
    .filter(_.subjects.contains("english"))

You can also use a pure-Column based approach in your DataFrame with array_contains from the helpful Spark functions library:

sparkSession.read.json("my.json").filter(array_contains($"subjects", "english"))

Finally, although it may not be helpful to you here, keep in mind that you can also use explode from the same functions library to give each subject its own row in the column:

sparkSession.read.json("my.json")
  .select($"name", explode($"subjects").as("subjects"))
  .filter($"subjects" === "english")
Sign up to request clarification or add additional context in comments.

5 Comments

I also thought about creating a custom case class like your Student, but eventually decided not to since it's certainly not Student, but rather Assignment or some kind of "mapping" class. That's why I sticked to use a tuple. WDYT?
It looks to me like a Student with the classes (subjects) each takes. It could also be a Professor with the classes (subjects) each teaches. Either way, I think the model name matters less than the broader concept of taking advantage of strong typing with a case class when you have a simple data model like this. I've found case classes to resonate very strongly with people new to Scala as well.
Right. I'm not questioning the use of case classes, but just the name of the case class. case classes are just named tuples and that's why I'm wondering what better name could there be for Student case class for this use case.
I never said you were questioning the use of case classes. I'm confused why you are morally opposed to Student when it is the most obvious choice for the reason I gave. Maybe this is a cultural thing. Regardless, my main point is that the name of the case class is really, really unimportant to the bigger picture.
Also, to say "case classes are just named tuples" is a vast oversimplification, but that is outside the scope of this thread.
1

Spark SQL's DataFrameReader supports so-called JSON Lines text format (aka newline-delimited JSON) where:

Each Line is a Valid JSON Value

You can use json operator to read the dataset.

// on command line
$ cat subjects.jsonl
{ "name" : "James", "subjects" : [ "english", "french", "botany" ] }
{ "name" : "neo", "subjects" : [ "english", "physics" ] }
{ "name" : "john", "subjects" : [ "spanish", "mathematics" ] }

// in spark-shell
scala> val subjects = spark.read.json("subjects.jsonl")
subjects: org.apache.spark.sql.DataFrame = [name: string, subjects: array<string>]

scala> subjects.show(truncate = false)
+-----+-------------------------+
|name |subjects                 |
+-----+-------------------------+
|James|[english, french, botany]|
|neo  |[english, physics]       |
|john |[spanish, mathematics]   |
+-----+-------------------------+

scala> subjects.printSchema
root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: string (containsNull = true)

With that, you should have a look at functions library when you can find Collection functions that deal with array-based inputs, e.g. array_contains or explode.

That's what you can find in the answer from @Vidya.


What is missing is my beloved Dataset.flatMap that, given the subjects Dataset, could be used as follows:

scala> subjects
  .as[(String, Seq[String])]  // convert to Dataset[(String, Seq[String])] for more type-safety
  .flatMap { case (student, subjects) => subjects.map(s => (student, s)) }  // typed expand
  .filter(_._2.toLowerCase == "english")  // filter out non-english subjects
  .show
+-----+-------+
|   _1|     _2|
+-----+-------+
|James|english|
|  neo|english|
+-----+-------+

That however doesn't look as good/nice as its for-comprehension version.

val subjectsDF = subjects.as[(String, Seq[String])]
val englishStudents = for {
  (student, ss) <- subjectsDF  // flatMap
  subject <- ss                // map
  if subject.toLowerCase == "english"
} yield (student, subject)

scala> englishStudents.show
+-----+-------+
|   _1|     _2|
+-----+-------+
|James|english|
|  neo|english|
+-----+-------+

Moreover, as of Spark 2.2 (soon to be released), you've got DataFrameReader.json operator that you can use to read a Dataset[String].

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

import org.apache.spark.sql.Dataset
val subjects: Dataset[String] = Seq(
  """{ "name" : "James", "subjects" : [ "english", "french", "botany" ] }""",
  """{ "name" : "neo", "subjects" : [ "english", "physics" ] }""",
  """{ "name" : "john", "subjects" : [ "spanish", "mathematics" ]}""").toDS

scala> spark.read.option("inferSchema", true).json(subjects).show(truncate = false)
+-----+-------------------------+
|name |subjects                 |
+-----+-------------------------+
|James|[english, french, botany]|
|neo  |[english, physics]       |
|john |[spanish, mathematics]   |
+-----+-------------------------+

2 Comments

The flatMap approach is appealing to people who are really comfortable with Scala and can reason about what (T) ⇒ TraversableOnce[U] in the Scaladoc means, but it can be intimidating to people coming to Spark in Scala from a Python or R background. Besides, the DataFrame abstraction makes sense to them and others, so it feels overengineered to use fancy Scala rather than higher-order abstractions for simple cases like this. Still, it is nice to have flatMap in your back pocket for when those cases arise.
Correct. Where I think flatMap shines (over DataFrame-based alternatives) is with for-comprehension where this "exploding" can be nicely expressed. I don't honestly use it very often though. This and the other answers with flatMap are mainly supposed to get myself used to them ;-)
0

As per my understanding, you are trying to find the records within DataFrame based on the array column which contains a particular string. For example, in this case, you are trying to find the records which contain the particular subject say "english".

Let first create a sample DataFrame

import org.apache.spark.sql.functions._

val json_data = """[{ "name" : "James", "subjects" : [ "english", "french", "botany" ] },
{ "name" : "neo", "subjects" : [ "english", "physics" ] },
{ "name" : "john", "subjects" : [ "spanish", "mathematics" ] }]"""
val df = spark.read.json(Seq(json_data).toDS).toDF

Now let's try to find the records which contain the subject say "english". Here we can use the higher-order function "array_contains" which is available from spark 2.4.0.

df.filter(array_contains($"subjects", "english")).show(truncate=false)

// Output

+-----+-------------------------+------------+
|name |subjects                 |contains_eng|
+-----+-------------------------+------------+
|James|[english, french, botany]|true        |
|neo  |[english, physics]       |true        |
+-----+-------------------------+------------+

You can find more details about the functions here (scala and python).

I hope this helps.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.