0

I am trying to create a function that will accept a dict and schema as input and returns a data frame automatically filling unspecified fields as nulls. This is my below code

def get_element(name, row_dict):
    value = None
    if name in row_dict:
        value = row_dict[name]

    return value


def create_row(schema, row_dict):
    row_tuple = ()
    for fields in schema:
        element = get_element(fields.name, row_dict)
        row_tuple = (*row_tuple, element)

    return row_tuple


def fill(schema, values):
    spark = (
        SparkSession
            .builder
            .master("local[*]")
            .appName("pysparktest")
            .getOrCreate()
    )
    return \
        spark.createDataFrame(
            spark.sparkContext.parallelize(
                [(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
            ),
            schema
        )

This is how I'm calling the function:

   schema = T.StructType([T.StructField("base_currency", T.StringType()),
                           T.StructField("target_currency", T.StringType()),
                           T.StructField("valid_from", T.StringType()),
                           T.StructField("valid_until", T.StringType())])

    values = [
        {"base_currency": "USD", "target_currency": "EUR", "valid_from": "test",
         "valid_until": "test"},
        {"base_currency": "USD1", "target_currency": "EUR2"}
    ]

    fill(schema, values).show()

Error message:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
test_utilities/create_df_from_schema.py:37: in fill
    [(Row(create_row(schema.fields, row_dict)) for row_dict in values)]
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:566: in parallelize
    jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/context.py:603: in _serialize_to_jvm
    serializer.dump_stream(data, tempFile)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:211: in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:133: in dump_stream
    self._write_with_length(obj, stream)
../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:143: in _write_with_length
    serialized = self.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = PickleSerializer()
obj = [<generator object fill.<locals>.<genexpr> at 0x1091b9350>]

    def dumps(self, obj):
>       return pickle.dumps(obj, pickle_protocol)
E       TypeError: can't pickle generator objects

../../../.virtualenv/etl-orderlines-generic-pivot/lib/python3.7/site-packages/pyspark/serializers.py:427: TypeError

Somehow the syntax to construct the data frame is not right.

1
  • The complaint seems to be TypeError: can't pickle generator objects. What happens if you evaluate that to a list of rows beforehand or use a list comprehension instead? Commented Jan 24, 2022 at 13:35

1 Answer 1

1

You are already returning tuples from create_row function, you don't need to create Row object, simply pass the list of tuples to spark.createDataFrame like this:

def fill(schema, values):
    return spark.createDataFrame(
            [create_row(schema.fields, row_dict) for row_dict in values],
            schema
        )

Now you can call:

fill(schema, values).show()

#+-------------+---------------+----------+-----------+
#|base_currency|target_currency|valid_from|valid_until|
#+-------------+---------------+----------+-----------+
#|          USD|            EUR|      test|       test|
#|         USD1|           EUR2|      null|       null|
#+-------------+---------------+----------+-----------+

Moreover, you can actually simplify your code to a one line list-comprehension without having to define those functions:

spark.createDataFrame(
    [[row.get(f.name) for f in schema.fields] for row in values],
    schema
).show()

Calling .get(key) on a dict object returns None if the key does not exist.

Sign up to request clarification or add additional context in comments.

1 Comment

Wow, that's awesome simplification!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.