1

I want to compare two columns in a Spark DataFrame: if the value of a column (attr_value) is found in values of another (attr_valuelist) I want only that value to be kept. Otherwise, the column value should be null.

For example, given the following input

id1 id2   attrname  attr_value   attr_valuelist
1   2     test      Yes          Yes, No
2   1     test1     No           Yes, No
3   2     test2     value1       val1, Value1,value2

I would expect the following output

id1 id2   attrname  attr_value   attr_valuelist
1   2     test      Yes          Yes
2   1     test1     No           No
3   2     test2     value1       Value1
3
  • 2
    And if the value of attr_value is not in attr_valuelist, should the row stay unchanged? Commented Jul 20, 2018 at 7:27
  • A Spark custom transformation may help Commented Jul 20, 2018 at 7:49
  • Please change the second column value to null, first column value remains same. Commented Jul 20, 2018 at 7:50

2 Answers 2

4

I assume, given your sample input, that the column with the search item contains a string while the search target is a sequence of strings. Also, I assume you're interested in case-insensitive search.

This is going to be the input (I added a column that would have yielded a null to test the behavior of the UDF I wrote):

+---+---+--------+----------+----------------------+
|id1|id2|attrname|attr_value|attr_valuelist        |
+---+---+--------+----------+----------------------+
|1  |2  |test    |Yes       |[Yes, No]             |
|2  |1  |test1   |No        |[Yes, No]             |
|3  |2  |test2   |value1    |[val1, Value1, value2]|
|3  |2  |test2   |value1    |[val1, value2]        |
+---+---+--------+----------+----------------------+

You can solve your problem with a very simple UDF.

val find = udf {
  (item: String, collection: Seq[String]) =>
    collection.find(_.toLowerCase == item.toLowerCase)
}

val df = spark.createDataFrame(Seq(
  (1, 2, "test", "Yes", Seq("Yes", "No")),
  (2, 1, "test1", "No", Seq("Yes", "No")),
  (3, 2, "test2", "value1", Seq("val1", "Value1", "value2")),
  (3, 2, "test2", "value1", Seq("val1", "value2"))
)).toDF("id1", "id2", "attrname", "attr_value", "attr_valuelist")

df.select(
  $"id1", $"id2", $"attrname", $"attr_value",
  find($"attr_value", $"attr_valuelist") as "attr_valuelist")

showing the output of the last command would yield the following output:

+---+---+--------+----------+--------------+
|id1|id2|attrname|attr_value|attr_valuelist|
+---+---+--------+----------+--------------+
|  1|  2|    test|       Yes|           Yes|
|  2|  1|   test1|        No|            No|
|  3|  2|   test2|    value1|        Value1|
|  3|  2|   test2|    value1|          null|
+---+---+--------+----------+--------------+

You can execute this code in any spark-shell. If you are using this from a job you are submitting to a cluster, remember to import spark.implicits._.

Sign up to request clarification or add additional context in comments.

Comments

2

can you try this code. I think it will work with that SQL contains case when.

val emptyRDD = sc.emptyRDD[Row] 

var emptyDataframe = sqlContext.createDataFrame(emptyRDD, your_dataframe.schema)

your_dataframe.createOrReplaceTempView("tbl")  

emptyDataframe = sqlContext.sql("select id1, id2, attrname, attr_value, case when
attr_valuelist like concat('%', attr_value, '%') then attr_value else
null end as attr_valuelist from tbl") 

emptyDataframe.show

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.