1

I have pyspark dataframe with multiple columns (Around 30) of nested structs, that I want to write into csv. (struct

In order to do it, I want to stringify all of the struct columns.

I've checked several answers here:

Pyspark converting an array of struct into string

PySpark: DataFrame - Convert Struct to Array

PySpark convert struct field inside array to string

This is the structure of my dataframe (with around 30 complex keys):

root  
 |-- 1_simple_key: string (nullable = true)  
 |-- 2_simple_key: string (nullable = true)  
 |-- 3_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 4_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 5_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  

The proposed solutions are for a single column, and I can't adopt it to multiple columns.

I want to do something of this type:
1. For each struct_column:
2. col = stringify(struct_column)

I don't mind creating an additional dataframe for it. I just need to make it ready for csv writing.

Minimal reproducible example:

from pyspark.sql import Row
d = d = {'1_complex_key': {0: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=954, y=238), 1: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=956, y=250), 2: Row(type='1_complex_key', s=Row(n1=True, n2=False, n3=False), x=886, y=269)}, '2_complex_key': {0: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=901, y=235), 1: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=905, y=249), 2: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=868, y=270)}, '3_complex_key': {0: Row(type='3_complex_key', s=Row(n1=True, n2=False, n3=False), x=925, y=197), 1: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=928, y=206), 2: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=883, y=236)}}
df = pd.DataFrame.from_dict(d)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
s_df = spark.createDataFrame(df)
s_df.printSchema()
s_df.write.csv('it_doesnt_write.csv')

enter image description here enter image description here

So - to summarize: I have a spark dataframe that I want to write to CSV. I can't write it to CSV because:

'CSV data source does not support struct<s:struct<n1:boolean,n2:boolean,n3:boolean>,type:string,x:bigint,y:bigint> data type.;'

So I want to perform some actions / reversible transformations on this dataframe so that I can write it to CSV, and later read it from the CSV and make it a spark dataframe with the same schema.

How can I do it? Thanks

4
  • Use a list comprehension over the struct columns as shown in Apply a transformation to multiple columns pyspark dataframe. For a more detailed answer, please provide a minimal reproducible example. Commented Oct 28, 2019 at 17:08
  • Hey. I've added the extract structure. I've seen your answer, but how can I stringify this struct object in order to write it to csv later? And be able to reconstruct it? I want to be able to write it to CSV and then read it from the CSV and transform it to pyspark again. Commented Oct 28, 2019 at 20:11
  • Please add a minimal reproducible example and someone will answer your question. Commented Oct 28, 2019 at 20:40
  • @cronoik it was hard, but here is a minimal reproducible example :) Commented Oct 29, 2019 at 20:07

1 Answer 1

1

As pault has already mentioned in the comments, you need a list comprehension. Such a list comprehension requires a list of columns and a functions which converts this columns to strings. I will use df.columns and to_json but you can also provide your own python list of column names and a custom function to stringfy your complex columns.

#this converts all columns to json strings
#and writes it as to disk
s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')

In case you don't want to apply to_json to all columns, you can simply modify it like that:

list4tojson = ['2_complex_key', '3_complex_key']
s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')

You can restore dataframe with from_json:

df = spark.read.csv('/tmp/testcsv')
df.printSchema()
#root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)

#interfering the schema
json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schema

df.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()
#root
# |-- jsontostructs(_c0): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c1): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c2): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)

In case you just want to store your data in a readable format, you can avoid all of the above code by writing it to json directly:

s_df.coalesce(1).write.json('/tmp/testjson')

df = spark.read.json('/tmp/testjson')
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.