4

How can I pass a Python dictionary key value into dataframe where clause in Pyspark ...

Python dictionary as below ...

column_dict= { 'email': 'customer_email_addr' ,
               'addr_bill': 'crq_st_addr' ,
               'addr_ship': 'ship_to_addr' ,
               'zip_bill': 'crq_zip_cd' ,
               'zip_ship':  'ship_to_zip' ,
               'phone_bill': 'crq_cm_phone' ,
               'phone_ship' : 'ship_to_phone'}

I've a spark dataframe with around 3 billion records. Dataframe as follows ...

source_sql= ("select cust_id, customer_email_addr, crq_st_addr, ship_to_addr,
 crq_zip_cd,ship_to_zip,crq_cm_phone,ship_to_phone from odl.cust_master  where
 trans_dt >= '{}' and trans_dt <= '{}' ").format('2017-11-01','2018-10-31')

cust_id_m = hiveCtx.sql(source_sql)
cust_id.cache()

My intention to find out distinct valid customer's for Email, Addr, Zip and Phone and run in loop for above dictionary keys. For this when I test spark shell for one key value as below ...

>>> cust_id_risk_m=cust_id_m.selectExpr("cust_id").where( 
("cust_id_m.'{}'").format(column_dict['email'])  != ''  ).distinct()

I'm getting error ... Need experts assistance in resolving this.

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-2.1.0/python/pyspark/sql/dataframe.py", line 1026, in filter
    raise TypeError("condition should be string or Column")
TypeError: condition should be string or Column
0

1 Answer 1

4

Can you try using get method on your dictionary? I have tested this with below dataframe as:

df =spark.sql("select emp_id, emp_name, emp_city,emp_salary from udb.emp_table  where emp_joining_date >= '{}' ".format(2018-12-05))

>>> df.show(truncate=False)
+------+----------------------+--------+----------+
|emp_id|emp_name              |emp_city|emp_salary|
+------+----------------------+--------+----------+
|1     |VIKRANT SINGH RANA    |NOIDA   |10000     |
|3     |GOVIND NIMBHAL        |DWARKA  |92000     |
|2     |RAGHVENDRA KUMAR GUPTA|GURGAON |50000     |
+------+----------------------+--------+----------+

thedict={"CITY":"NOIDA"}

>>> newdf = df.selectExpr("emp_id").where("emp_city ='{}'".format(thedict.get('CITY'))).distinct()
>>> newdf.show();
+------+
|emp_id|
+------+
|     1|
+------+

or you can share your sample data for your dataframe?

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.