3

I have a pyspark dataframe(df1) whose first first row is as below:

[Row(_c0='{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', _c1='0')]

I want to compare the "values" list with the first column of below dataframe(df2) values as shown below:

0    0.57581    1.25461    0.68694    0.974580    1.54789    0.23646
1    0.98745    0.23655    2.58970    4.587580    0.89756    1.25678
2    0.45780    5.78940    0.65986    2.125400    0.98745    1.23658
3    2.56834    0.25698    4.26587    0.569872    0.36987    0.68975
4    0.25678    1.23654    5.68320    0.986230    0.87563    2.58975

Similarly I have many rows in df1, I have to see which values in df1 "values" list is greater than the corresponding column in df2.I need to find those indices which satisfy the above condition and store it as list in another column to df1.

For instance 1.172737 > 0.98745 so its index is 1.Hence I will have another column in df1 named(indices) which contains value1 and it has to append the same if another value comes up.

The comparison is between respective column and rows.The above shown df1 row is 1st row,so it has to compared with first column in df2.

If I have underemphasised sth please let me know in the comments.

2
  • I think your json is not loaded correctly in your dataframe1 and it would be better if you parsed it in order to have correct structures. Secondly, you cannot compare two dataframes if they don't have a column to be joined on. Even if I presume it is df1._c1 and df2's first column. Finally, you have 5 values in df1 and 6 values in df2 : how should it be compared ? Commented Jul 29, 2019 at 13:31
  • As I said, the first row and first column is compared (not first row of df2) Commented Jul 30, 2019 at 11:23

1 Answer 1

2

This code works with Python 2.7 and Spark 2.3.2 :

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Create test dataframes
df1 = spark.createDataFrame([
        ['{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', '0'],
        ['{"type":"Fi","values":[0.6, 0.8, 0.5, 2.1, 0.4]}', '0']
    ],['_c0','_c1'])
df2 = spark.createDataFrame([
        [0, 0.57581, 1.25461, 0.68694, 0.974580, 1.54789, 0.23646],
        [1, 0.98745, 0.23655, 2.58970, 4.587580, 0.89756, 1.25678],
        [2, 0.45780, 5.78940, 0.65986, 2.125400, 0.98745, 1.23658],
        [3, 2.56834, 0.25698, 4.26587, 0.569872, 0.36987, 0.68975],
        [4, 0.25678, 1.23654, 5.68320, 0.986230, 0.87563, 2.58975]
    ],['id','v1', 'v2', 'v3', 'v4', 'v5', 'v6'])

# Get schema and load json correctly
json_schema = spark.read.json(df1.rdd.map(lambda row: row._c0)).schema
df1 = df1.withColumn('json', F.from_json('_c0', json_schema))

# Get column 1 values to compare
values = [row['v1'] for row in df2.select('v1').collect()]

# Define udf to compare values
def cmp_values(lst):
    list_cmp = map(lambda t: t[0] > t[1], zip(lst, values))  # Boolean list
    return [idx for idx, cond in enumerate(list_cmp) if cond]  # Indices of satisfying elements

udf_cmp_values = F.udf(cmp_values, ArrayType(IntegerType()))

# Apply udf on array
df1 = df1.withColumn('indices', udf_cmp_values(df1.json['values']))
df1.show()

+--------------------+---+--------------------+---------+
|                 _c0|_c1|                json|  indices|
+--------------------+---+--------------------+---------+
|{"type":"Fi","val...|  0|[Fi, [0.201009944...|      [1]|
|{"type":"Fi","val...|  0|[Fi, [0.6, 0.8, 0...|[0, 2, 4]|
+--------------------+---+--------------------+---------+
Sign up to request clarification or add additional context in comments.

3 Comments

Hi I am getting the error zip#1 must support iteration!
Have you tried copy/pasting this code and did it work (it works for me) ? If this sample works but it does not work on your full dataframr, it means your minimal example is incomplete. I guess you don't have the same number of values in all your rows, which could cause this error.
I have a follow-up question, dropping the link, thanks in advance stackoverflow.com/questions/61823544/… @Pierre Gourseaud

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.