1

I have dictionary with information like,

dict_segs = {'key1' : {'a' : {'col1' : 'value1', 'col2' : 'value2', 'col3': 'value3'}, 
                'b' : {'col2' : 'value2', 'col3' : 'value3'}, 
                'c' : {'col1' : 'value1'}},
        'key2' : {'d' : {'col3' : 'value3', 'col2' : 'value2'},
                'f' : {'col1' : 'value1', 'col4' : 'value4'}}}

TO DO :

keys are basically 'segments', for which the underlying dictionaries i.e. a, b, c for key1 are 'subsegments'. For every subsegment the filter condition is available in underlying dictionaries for subsegments i.e. a, b, c, d, f. Also, the filter condition for subsegments dictionary keys are also the column names of pyspark dataframe.

I want to create subsegment columns in pyspark dataframe at one go for each segment, and values for each subsegment column when meets the filter condition will be 1, else 0, something like,

for item in dict_segs:
    pyspark_dataframe.withColumn(*dict_segs[item].keys(), when(meeting filter criteria with respect to each key), 1).otherwise(0))

On doing research i was able to find something similar in scala, but the column filtering condition there is static, but for above logic i.e. dynamic. Please see below scala logic,

Spark/Scala repeated calls to withColumn() using the same function on multiple columns

Need support to derive above logic for each segment as per pseudo code above.

Thanks.

1 Answer 1

3

You are looking for a select statement:

Let's create a sample dataframe:

df = spark.createDataFrame(
    sc.parallelize([["value" + str(i) for i in range(1, 5)], ["value" + str(i) for i in range(5, 9)]]), 
    ["col" + str(i) for i in range(1, 5)]
)

+------+------+------+------+
|  col1|  col2|  col3|  col4|
+------+------+------+------+
|value1|value2|value3|value4|
|value5|value6|value7|value8|
+------+------+------+------+

Now for all the keys in the dictionary, for all the subkeys in dict_seg[key] and for all the columns in dict_seg[key][subkey]:

import pyspark.sql.functions as psf
df.select(
    ["*"] +
    [
        eval('&'.join([
            '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys()
        ])).cast("int").alias(sk) 
        for k in dict_segs.keys() for sk in dict_segs[k].keys()
    ]
).show()

+------+------+------+------+---+---+---+---+---+
|  col1|  col2|  col3|  col4|  a|  b|  c|  d|  f|
+------+------+------+------+---+---+---+---+---+
|value1|value2|value3|value4|  1|  1|  1|  1|  1|
|value5|value6|value7|value8|  0|  0|  0|  0|  0|
+------+------+------+------+---+---+---+---+---+
  • "*" allows you to keep all the previously existing columns, it can be replaced by df.columns.
  • alias(sk) allows you to give name sk to the new column
  • cast("int") to change type boolean into type int

I don't really understand why you have a depth 3 dictionary though, it seems that key1, key2 aren't really useful.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the perfect solution. For extra level in dictionary. I was using this info in one of the filter condition. However, after analysing found your point is valid, and its not required. Dropped the level, and retained two levels finally.
Cool I'm glad I could help. Don't forget to mark the question as solved
sure, could you help in passing list instead of value. So i have modified as, Previous : eval('&'.join([ '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys() ])) Modified : eval('&'.join([ '(tbl["' + c + '"].isin("' + v + '"))' for c, v in self.dict_tstPlan[sk].iteritems() ])) I would like to pass list for values of 'v'...
you have to turn the list into a string using '","'.join(v) so inside the eval function you end up with: eval('&'.join([ '(tbl["' + c + '"].isin(["' + '","'.join(v) + '"]))'

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.