1
df.withColumn("event", ..)

How to add a new column event to a dataframe which will be the result of generate_header ? How can we add a Row as the column value ?

May be we need to convert the function to UDF

def generate_header(df_row):
    
    header = {
        "id": 1,
        ...
    }

    return EntityEvent(header, df_row)


class EntityEvent:
    def __init__(self, _header, _payload):
        self.header = _header
        self.payload = _payload

Let's suppose we have something like this

+---------------+--------------------+
|book_id        |Author              |
+---------------+--------------------+
|865731         |{name: 'A',  }      |
+---------------+--------------------+

and we want to get this

+---------------+--------------------+------------------------------
|book_id        |Author              | event                        |
+---------------+--------------------+------------------------------+
|865731         |{name: 'A',  }      | {header: { id: '865731'}, payload: {name: 'A'}}
+---------------+--------------------+----------------------------------------------------------
4
  • I am not sure how you want to use EntityEvent later, but is it possible to just have dictionary with header and payload in event column and you can convert it after you take out value from the dataframe? Commented May 16, 2022 at 14:59
  • yeah ! how can we create a dict (or struct) in the new column ? Commented May 16, 2022 at 15:00
  • 1
    Depends. if it is static, you can just use create_map, otherwise, you might need to combine with other functions. If you can show me what data exactly should be in event column, I can share code. (data schema exactly but value can be fake) Commented May 16, 2022 at 15:05
  • 1
    @Emma you can check the updates Commented May 16, 2022 at 15:16

1 Answer 1

1

You can use create_map to generate the MapType in column.

(df.withColumn('event', F.create_map(
    F.lit('header'), F.create_map(F.lit('id'), F.col('book_id')),
    F.lit('payload'), F.col('Author'))
)

fyi: You probably cannot have Python object in Spark column. Is it possible to store custom class object in Spark Data Frame as a column value?

Update:

If you need to derive a part that involves some Python library functions.

import base64

# udf function takes the return type schema.
@F.udf(MapType(StringType(), MapType(StringType(), StringType())))
def generate_header(book_id, author):
    b64str = base64.b64encode('some text'.encode('utf-8'))
    return {
        'header': { 'id': book_id, 'key': b64str },
        'payload': author
    }

df.withColumn('event', generate_header(F.col('book_id'), F.col('Author')))
Sign up to request clarification or add additional context in comments.

10 Comments

Need to use the generate_header function bcz there is some complex calculations
You can use UDF in that case, if the complex calculations requires low level calculations. Otherwise, it is more efficient to use Spark functions. I would be able to suggest something if I know what type of calculation you are doing.
I updated my answer to show how you can use UDF, however, please consider checking my previous comment.
your 1st code doesn't really work, got the following error due to data type mismatch: The given values of function map should all be the same type, but they are [map<string,string>, string]
I mean the code of create_map
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.