19

Using Spark 1.5 and Scala 2.10.6

I'm trying to filter a dataframe via a field "tags" that is an array of strings. Looking for all rows that have the tag 'private'.

val report = df.select("*")
  .where(df("tags").contains("private"))

getting:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'Contains(tags, private)' due to data type mismatch: argument 1 requires string type, however, 'tags' is of array type.;

Is the filter method better suited?

UPDATED:

the data is coming from cassandra adapter but a minimal example that shows what I'm trying to do and also gets the above error is:

  def testData (sc: SparkContext): DataFrame = {
    val stringRDD = sc.parallelize(Seq("""
      { "name": "ed",
        "tags": ["red", "private"]
      }""",
      """{ "name": "fred",
        "tags": ["public", "blue"]
      }""")
    )
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    sqlContext.read.json(stringRDD)
  }
  def run(sc: SparkContext) {
    val df1 = testData(sc)
    df1.show()
    val report = df1.select("*")
      .where(df1("tags").contains("private"))
    report.show()
  }

UPDATED: the tags array can be any length and the 'private' tag can be in any position

UPDATED: one solution that works: UDF

val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}
val report = df1.filter(filterPriv(df1("tags")))
5
  • post sample of your data and how u r creating the df Commented Jan 17, 2016 at 0:19
  • 1
    One option is to build a UDF. Commented Jan 17, 2016 at 3:24
  • 1
    Well, after looking at the source code (since the scaladoc for Column.contains says only "Contains the other element" which is not very enlightening), I see that Column.contains constructs an instance of org.apache.spark.sql.catalyst.expressions.Contains which says "A function that returns true if the string left contains the string right". So it seems that df1("tags").contains cannot do what we want it to do in this case. I don't know what alternative to suggest. There is an ArrayContains also in ...expressions but Column doesn't seem to make use of it. Commented Jan 17, 2016 at 3:26
  • Indeed, after changing the data to just strings instead of an array of strings, I find that the query succeeds. Commented Jan 17, 2016 at 3:33
  • @DavidMaust, I got a UDF to work: val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}; val report = df1.filter(filterPriv(df1("tags"))) still looking for something nicer but at least I'm not blocked. thx! Commented Jan 17, 2016 at 15:59

2 Answers 2

32

I think if you use where(array_contains(...)) it will work. Here's my result:

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame

scala> def testData (sc: SparkContext): DataFrame = {
     |     val stringRDD = sc.parallelize(Seq
     |      ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""",
     |       """{ "name": "albert", "tags": ["private", "lumpy"] }""",
     |       """{ "name": "zed", "tags": ["big", "private", "square"] }""",
     |       """{ "name": "jed", "tags": ["green", "small", "round"] }""",
     |       """{ "name": "ed", "tags": ["red", "private"] }""",
     |       """{ "name": "fred", "tags": ["public", "blue"] }"""))
     |     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
     |     import sqlContext.implicits._
     |     sqlContext.read.json(stringRDD)
     |   }
testData: (sc: org.apache.spark.SparkContext)org.apache.spark.sql.DataFrame

scala>   
     | val df = testData (sc)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> val report = df.select ("*").where (array_contains (df("tags"), "private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> report.show
+------+--------------------+
|  name|                tags|
+------+--------------------+
|   ned|[blue, big, private]|
|albert|    [private, lumpy]|
|   zed|[big, private, sq...|
|    ed|      [red, private]|
+------+--------------------+

Note that it works if you write where(array_contains(df("tags"), "private")), but if you write where(df("tags").array_contains("private")) (more directly analogous to what you wrote originally) it fails with array_contains is not a member of org.apache.spark.sql.Column. Looking at the source code for Column, I see there's some stuff to handle contains (constructing a Contains instance for that) but not array_contains. Maybe that's an oversight.

Sign up to request clarification or add additional context in comments.

2 Comments

.select("*") is not needed => df.where(...) ...
Need to import org.apache.spark.sql.functions.array_contains before one can use this method.
1

You can use ordinal to refer to the json array's for e.g. in your case df("tags")(0). Here is a working sample

scala> val stringRDD = sc.parallelize(Seq("""
     |       { "name": "ed",
     |         "tags": ["private"]
     |       }""",
     |       """{ "name": "fred",
     |         "tags": ["public"]
     |       }""")
     |     )
stringRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[87] at parallelize at <console>:22

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> sqlContext.read.json(stringRDD)
res28: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> val df=sqlContext.read.json(stringRDD)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> df.columns
res29: Array[String] = Array(name, tags)

scala> df.dtypes
res30: Array[(String, String)] = Array((name,StringType), (tags,ArrayType(StringType,true)))

scala> val report = df.select("*").where(df("tags")(0).contains("private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> report.show
+----+-------------+
|name|         tags|
+----+-------------+
|  ed|List(private)|
+----+-------------+

7 Comments

thanks. works if pos is fixed but it isn't. I should have made the test data a little more complex, there can be any number of tags in the array, position is arbitrary.
@navicore then your code should work. check my update
interesting, I'm missing something, looks like exactly what I was doing but getting the error for. double checking spark versions now...
@navicore this is on 1.5.4
thx. I must be crossing hands somewhere. I tried 1.5.1 and 1.6 and val report = df.select("*").where(df("tags").contains("private")) gives me that error in the orig post. digging...
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.