2

Pardon my ignorance, I am new to pyspark. I'm trying to improve a udf to create a new column count_adj based on values from another column a_type using a dictionary. How do I account for None / Null types in this process to create my new column. This is super easy in pandas (df['adj_count'] = df.a_type.map(count_map)) but struggling do this in pyspark.

Sample data / imports:

# all imports used -- not just for this portion of the script
from pyspark.sql import SparkSession, HiveContext, SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import sql
import pyspark.sql.functions as F
import random
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
from datetime import date
from datetime import timedelta
from pyspark.sql import Window
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import rank, row_number, max as max_, col
import sys
import os

spark = SparkSession.builder.appName('a_type_tests').getOrCreate()

# note: sample data has results from the original udf for comparison
dataDictionary = [
(26551, 491, '2022-01-22', '740', -1, 16),
(24192, 338, '2022-01-22', '740', -1, 16),
(26555, 3013, '2022-01-22', '740', -1, 16),
(26571, 937, '2022-01-22', '740', -1, 16),
(24376, 371, '2022-01-22', '740', -1, 16),
(17716, 118, '2022-01-22', '740', -1, 16),
(26554, 3013, '2022-01-22', '740', -1, 16),
(26734, 105, '2022-01-22', '740', -1, 16),
(26051, 415, '2022-01-22', '600', -1, 8),
(26602, 501, '2022-01-22', '740', -1, 16),
(26125, 501, '2022-01-22', None, -1, 0)
        ]

sdf = spark.createDataFrame(data=dataDictionary, schema = ['id', 'loc_id', 'a_date', 'a_type', 'adj_val', 'udf_original'])
sdf.printSchema()
sdf.show(truncate=False)

The original udf is similar to:

def count_adj(a_type):
    if a_type is None:
        return 0
    elif a_type in ('703','704','705','708','900','910'):
        return 4
    elif a_type in ('701','702'):
        return 2
    elif a_type in ('711','712'):
        return 1
    elif a_type in ('600', '704'):
        return 8
    elif a_type in ('740'):
        return 16
    elif a_type in ('305','306'):
        return 32
    elif a_type in ('601','612','615'):
        return 64
    else:
        return 128

I've created a dictionary to correspond to these values.

# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# create None mapping manually
count_map[None] = 0

Searching SO I came across this which resulted in in the following error:

# Code Tried:

# Changes None type to NULL -- fine but how do I account for these None/Null Values in my dict?
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', map_expr.getItem('a_type'))

# or:

sdf2 = sdf.withColumn('count_adj',map_expr[col('a_type')]).show()
# Error

Py4JJavaError: An error occurred while calling o334.showString.
: java.lang.RuntimeException: Cannot use null as map key.

How do I account for None / NULL types when using a dictionary to create a new column based on values from another column? Is it possible to include a NULL check in my map expression or something else entirely?

3
  • Hi @tlk27, which version of pyspark were you using? Commented Sep 30 at 6:17
  • Hey @ale, I believe I was using spark with a version of 2.X at the time Commented Oct 1 at 13:01
  • thanks! having a similar issue upgrading to 4.X from 3.X Commented Oct 1 at 13:35

2 Answers 2

3

For completeness, I removed the None type from the dictionary and utilized a method similar to Karthik's answer and a combo of other SO posts mentioned in the question.

My final solution relied on the code below and using .when() and .isNull() to account for None / NULL conversions.

# Original Mapping
# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# New below:
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', F.when( col('a_type').isNull(), 0 ).otherwise( map_expr.getItem(col('a_type') ) ) )
Sign up to request clarification or add additional context in comments.

Comments

0

The key columns must all have the same data type, and can't be null. The key of the map won’t accept None/Null values.

Instead of above code, you can use when function, which gives your desired output as shown below:

newDF = sdf.withColumn("count_adj",F.when(F.col("a_type").isNull(),0)\
       .when(F.col("a_type").isin('711','712'),1)\
       .when(F.col("a_type").isin('701','702'),2)\
       .when(F.col("a_type").isin('703','704','705','708','900','910'),4)\
       .when(F.col("a_type").isin('600', '704'),8)\
       .when(F.col("a_type").isin('740'),16)\
       .when(F.col("a_type").isin('305','306'),32)\
       .when(F.col("a_type").isin('601','612','615'),64)\
       .otherwise(128))

enter image description here

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.