1

I need to be able to compare two dataframes using multiple columns.

pySpark attempt

# get PrimaryLookupAttributeValue values from reference table in a dictionary to compare them to df1. 

primaryAttributeValue_List = [ p.PrimaryLookupAttributeValue for p in AttributeLookup.select('PrimaryLookupAttributeValue').distinct().collect() ]
primaryAttributeValue_List #dict of value, vary by filter 

Out: ['Archive',
 'Pending Security Deposit',
 'Partially Abandoned',
 'Revision Contract Review',
 'Open',
 'Draft Accounting In Review',
 'Draft Returned']


# compare df1 to PrimaryLookupAttributeValue
output = dataset_standardFalse2.withColumn('ConformedLeaseStatusName', f.when(dataset_standardFalse2['LeaseStatus'].isin(primaryAttributeValue_List), "FOUND").otherwise("TBD"))

display(output)

2
  • are you looking to map df1.LeaseStatus and df1.LeaseRecoveryType based on reference_df.DomainName and from reference_df.PrimaryLookupAttributeValue to reference_df.OutputItemNameByValue? Commented May 15, 2020 at 21:00
  • yes! Except, looking to map df1.LeaseStatus and df1.LeaseRecoveryType based on reference_df.PrimaryLookupAttributeName , which is why I have dataframe AttributeLookup @jxc Commented May 15, 2020 at 21:08

1 Answer 1

1

From my understanding, you can create a map based on columns from reference_df (I assumed this is not a very big dataframe):

map_key = concat_ws('\0', PrimaryLookupAttributeName, PrimaryLookupAttributeValue)
map_value = OutputItemNameByValue

and then use this mapping to get the corresponding values in df1:

from itertools import chain
from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map

d = reference_df.agg(collect_set(array(concat_ws('\0','PrimaryLookupAttributeName','PrimaryLookupAttributeValue'), 'OutputItemNameByValue')).alias('m')).first().m
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseStatus\x00Draft', 'Pending'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

mappings = create_map([lit(i) for i in chain.from_iterable(d)])

primaryLookupAttributeName_List = ['LeaseType', 'LeaseRecoveryType', 'LeaseStatus']

df1.select("*", *[ mappings[concat_ws('\0', lit(c), col(c))].alias("Matched[{}]OutputItemNameByValue".format(c)) for c in primaryLookupAttributeName_List ]).show()
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+
|SourceSystemName|...|Matched[LeaseType]OutputItemNameByValue|Matched[LeaseRecoveryType]OutputItemNameByValue|Matched[LeaseStatus]OutputItemNameByValue|
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+
|          ABC123|...|                                   null|                                          Gross|                               Terminated|
|          ABC123|...|                                   null|                                 Modified Gross|                                  Expired|
|          ABC123|...|                                   null|                                 Modified Gross|                                  Pending|
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+

UPDATE: to set Column names from the information retrieved through reference_df dataframe:

# a list of domains to retrieve
primaryLookupAttributeName_List = ['LeaseType', 'LeaseRecoveryType', 'LeaseStatus']

# mapping from domain names to column names: using `reference_df`.`TargetAttributeForName`
NEWprimaryLookupAttributeName_List = dict(reference_df.filter(reference_df['DomainName'].isin(primaryLookupAttributeName_List)).agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)

test = dataset_standardFalse2.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in NEWprimaryLookupAttributeName_List.items()]) 

Note-1: it is better to loop through primaryLookupAttributeName_List so the order of the columns are preserved and in case any entries in primaryLookupAttributeName_List is missing from the dictionary, we can set a default column-name, i.e. Unknown-<col>. In the old method, columns with the missing entries are simply discarded.

test = dataset_standardFalse2.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c))) for c in primaryLookupAttributeName_List])

Note-2: per comments, to overwrite the existing column names(untested):

(1) use select:

test = dataset_standardFalse2.select([c for c in dataset_standardFalse2.columns if c not in NEWprimaryLookupAttributeName_List.values()] + [ mappings[concat_ws('\0', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c))) for c in primaryLookupAttributeName_List]).show()

(2) use reduce (not recommended if the List is very long):

from functools import reduce

df_new = reduce(lambda d, c: d.withColumn(c, mappings[concat_ws('\0', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c)))), primaryLookupAttributeName_List, dataset_standardFalse2)

reference: PySpark create mapping from a dict

Sign up to request clarification or add additional context in comments.

14 Comments

this is great. It outputs what I was looking for, thank you! I'm looking into methods concat_ws, and chain. Qq regarding using \0 as delimiter, why? @jxc
@jessgtrz you can use any delimiters so that it won't mess up: PrimaryLookupAttributeName="A" + PrimaryLookupAttributeValue="B C" (A\0B C) with PrimaryLookupAttributeName="A B" + PrimaryLookupAttributeValue="C" (A B\0C). I prefer the NULL char \0 as delimiter as it is not common in the normal texts. Also if you write Linux command scripts often, the NUL char('\0') is not allowed in Linux filenames, so it became my favorite as delimiter whenever needed.
,go it! so the logic in my question maps values from references table and dataset, hense OutputItemNameByValue. Can we map output column names w/ reference and dataset? For examples, taking .alias("Matched[{}]OutputItemNameByValue".format(c)) to make it iterate and matched corresponding output columns? Asking abt this since it may not always be Matched....OutputItemNameByValue. I'm thinking I can map DomainName, PrimaryAtrributeName (map_key) and say TargetOutputColumn(map_key). Can .alias("Matched[{}]OutputItemNameByValue".format(c)) be replace with another map? @jxc
@jessgtrz, I think you can convert the Python list to a dictionary, for example: primaryLookupAttributeName_List = {'LeaseType':'Name1', 'LeaseRecoveryType':'Name2', 'LeaseStatus':'Name3'} and then use *[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in primaryLookupAttributeName_List.items() ]
hmm, gotcha. I'm having difficulty finding the most efficient way to this , since building a dictionary for primaryLookupAttributeName_List is embedded with another dataframe. Would appreciate another set of eyes/brains. thank so much! I've learned a lot from you! stackoverflow.com/questions/61875664/… @jxc
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.