0

Good Morning ! I have a dataframe where the data source is a MongoDB JSON:

+---------------------------------------------------+--------------------+
|                                                 Id|Data                |
+---------------------------------------------------+--------------------+
|{"𝑏𝑖𝑛𝑎𝑟𝑦":"7𝐾𝑔𝑅𝑄𝐾𝑎𝑏𝑞𝑘𝑢𝑥𝐸+1𝑝𝑆𝑤9𝑏7𝑄==","type":"03"} |       1651374000000|
|{"𝑏𝑖𝑛𝑎𝑟𝑦":"𝐻𝑡𝐼𝑂6𝑄/𝐺𝐿𝐸𝐺𝐷𝐵𝑑𝑡𝑊𝑑𝑑𝑝6𝑋𝑔==","type":"03"} |       1622419200000|
|{"𝑏𝑖𝑛𝑎𝑟𝑦":"𝑣𝑝𝑈𝑇𝑒𝑢𝑣𝑒𝐷0𝐺𝐿𝑚𝑙𝑟𝑧𝑗𝑏ℎ𝑖𝐵𝑔==","type":"03"}   |       1622419200000|
|{"𝑏𝑖𝑛𝑎𝑟𝑦":"𝑆6𝑗𝑧𝐷𝐸𝑖𝐺𝑥𝑈22𝑂𝑏𝑅𝑉1/𝑁𝑔2𝑄==","type":"03"}  |       1622419200000|
+---------------------------------------------------+--------------------+

I need to convert the data to:

+--------------------------------------+--------------------+
|                                   Id |Data                |
+--------------------------------------+--------------------+
| 401148EE-9BA6-4BAA-B113-ED694B0F5BED | 2022-05-01 03:00:00|
| E90ED21E-C60F-412C-8305-DB5675DA7A5E | 2021-05-31 00:00:00|
| 7A1395BE-DEEB-410F-8B9A-5AF38DB86206 | 2021-05-31 00:00:00|
| 0CF3A84B-8648-4DC5-B639-B455D7F360D9 | 2021-05-31 00:00:00|
+--------------------------------------+--------------------+

In Python, it is possible to convert the information with the code below:

id_bin = 'S6jzDEiGxU22ObRV1/Ng2Q=='
message_bytes = base64.b64decode(id_bin)

id = uuid.UUID(bytes_le=message_bytes)

data = (1622419200000 // 1000)
date_conv = datetime.datetime.utcfromtimestamp(data).strftime('%Y-%m-%d %H:%M:%S')

pyspark running in a column of a dataframe does not work, can you help me?

1
  • base64 Commented Nov 29, 2021 at 15:12

1 Answer 1

1

The UUID can be computed using an udf and epoch to timestamp can be handled using from_unixtime.

Your example included non-ascii characters however your python code included valid ASCII string, hence I have converted the input into valid ASCII.


# Explicitly set time to UTC to get the timestamp in UTC
spark.conf.set("spark.sql.session.timeZone", "UTC")

from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import base64
import uuid

@F.udf(StringType())
def uuid_converter(data: str):
    message_bytes = base64.b64decode(data)
    res_uuid = uuid.UUID(bytes_le=message_bytes)
    return str(res_uuid).upper()


data = [({"binary":"7KgRQKabqkuxE+1pSw9b7Q==","type":"03"}, 1651374000000),
({"binary":"HtIO6Q/GLEGDBdtWddp6Xg==","type":"03"}, 1622419200000),
({"binary":"vpUTeuveD0GLmlrzjbhiBg==","type":"03"}, 1622419200000),
({"binary":"S6jzDEiGxU22ObRV1/Ng2Q==","type":"03"}, 1622419200000),]

df = spark.createDataFrame(data, ("Id", "Data",))

df.withColumn("Id", uuid_converter(F.col("Id")["binary"]))\
  .withColumn("Data", F.from_unixtime(F.col("Data") / 1000))\
  .show(200, False)

Output

+------------------------------------+-------------------+
|Id                                  |Data               |
+------------------------------------+-------------------+
|4011A8EC-9BA6-4BAA-B113-ED694B0F5BED|2022-05-01 03:00:00|
|E90ED21E-C60F-412C-8305-DB5675DA7A5E|2021-05-31 00:00:00|
|7A1395BE-DEEB-410F-8B9A-5AF38DB86206|2021-05-31 00:00:00|
|0CF3A84B-8648-4DC5-B639-B455D7F360D9|2021-05-31 00:00:00|
+------------------------------------+-------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Yes, I know that there is text with a value other than ASCII, because it's a problem I face too. Thank you very much for the solution, it worked perfectly! I had managed to do it using lambda...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.