0
|-- data: struct (nullable = true)
 |    |-- keyNote: struct (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- note: string (nullable = true)

With the example structure above, how would I select the note field under the structs data and keyNote?

I need to filter with two different data frames and can not seem to be able to select a nested field. I am using Spark 1.6.2 where the left anti isn't available so I used the following filter. Below are two ways I have tried.

val dataFrame = esData.join(broadcastDataFrame, esData.select(esData.col("data.keyNote")).col("note") !== broadcastDataFrame("id")) 

Error: Cannot resolve column name "note" among (keyNote)


val dataFrame = esData.join(broadcastDataFrame, esData.select(esData.col("data.keyNote.*")).col("note") !== broadcastDataFrame("id")) 

Error: No such struct field * in key, note


val dataFrame = esData.join(broadcastDataFrame, esData("data.keyNote.note") !== broadcastDataFrame("id")) 

java.lang.IllegalArgumentException: Field "note" does not exist.(..)


val dataFrame = esData.join(broadcastDataFrame, esData.select($"data.keyNote.note").col("note") !== broadcastDataFrame("id")) 

Error: resolved attribute(s) note#9 missing from data#1,id#3 in operator !Join Inner, Some(NOT (note#9 = id#3))

The dataFrame used is created from Elastic Search (artifact: elastic-spark-13_2.10, Version:5.1.1)

val dataFrameES = context.read.format("org.elasticsearch.spark.sql")
   .options(Map("es.read.field.exclude" ->
    "<Excluding All the fields except those I need>"))
   .load("<Index>/<Type>") 

Now I attempted to use the es.read.field.include but nothing I tried would be able to retrieve the nested items except for excluding everything else. I tried to include the following; data, data.keyNote, data.keyNote.key, and every permutation plus wildcard of * after each. I am not sure if this is a spark thing or an elastic search thing.

I thought it was the schema being read wrong until I excluded all the unwanted fields and successfully retrieved the ones I wanted.

I think now that it is the join because I am able to grab that field with no errors in a filter like so;

 esData.filter(esData("data.keyNote.key").equalTo("x")) 

I just continue to get errors when I try to complete the join above, which is required being I have two data sets. When I do run the filter above right after creating the elastic search data frame is takes far longer than running a curl.

0

1 Answer 1

1

The correct syntax is:

df1.join(df2, df1("x.y.z") !== df2("v"))

or

df1.join(df).where(df1("x.y.z") !== df2("v")

Full example

scala> :paste
// Entering paste mode (ctrl-D to finish)

val esData = sqlContext.read.json(sc.parallelize(Seq(
  """{"data": {"keyNote": {"key":   "foo", "note": "bar"}}}""")))

val broadcastDataFrame = Seq((1L, "foo"), (2L, "bar")).toDF("n", "id")

esData.join(
  broadcastDataFrame, esData("data.keyNote.note") !== broadcastDataFrame("id")
).show

// Exiting paste mode, now interpreting.

+-----------+---+---+
|       data|  n| id|
+-----------+---+---+
|[[foo,bar]]|  1|foo|
+-----------+---+---+

esData: org.apache.spark.sql.DataFrame = [data: struct<keyNote:struct<key:string,note:string>>]
broadcastDataFrame: org.apache.spark.sql.DataFrame = [n: bigint, id: string]

If you want antijoin it is better to use outer join and filter out nulls.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.