2

How can we change the datatype of a nested column in Pyspark? For rxample, how can I change the data type of value from string to int?

Reference:how to change a Dataframe column from String type to Double type in pyspark

{
    "x": "12",
    "y": {
        "p": {
            "name": "abc",
            "value": "10"
        },
        "q": {
            "name": "pqr",
            "value": "20"
        }
    }
}
2
  • 1. Does this change need to be persistent, with changes saved to the json file? Or do you need the precision while you are performing an operation? Commented Aug 24, 2017 at 0:27
  • @diek Need it white writing to json file Commented Aug 24, 2017 at 0:47

2 Answers 2

2

You can read the json data using

from pyspark import SQLContext

sqlContext = SQLContext(sc)
data_df = sqlContext.read.json("data.json", multiLine = True)

data_df.printSchema()

output

root
 |-- x: long (nullable = true)
 |-- y: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)

Now you can access the data from y column as

data_df.select("y.p.name")
data_df.select("y.p.value")

output

abc, 10

Ok, the solution is to add a new nested column with correct schema and drop the column with wrong schema

from pyspark.sql.functions import *
from pyspark.sql import Row

df3 = spark.read.json("data.json", multiLine = True)

# create correct schema from old 
c = df3.schema['y'].jsonValue()
c['name'] = 'z'
c['type']['fields'][0]['type']['fields'][1]['type'] = 'long'
c['type']['fields'][1]['type']['fields'][1]['type'] = 'long'

y_schema = StructType.fromJson(c['type'])

# define a udf to populate the new column. Row are immuatable so you 
# have to build it from start.

def foo(row):
    d = Row.asDict(row)
    y = {}
    y["p"] = {}
    y["p"]["name"] = d["p"]["name"]
    y["p"]["value"] = int(d["p"]["value"])
    y["q"] = {}
    y["q"]["name"] = d["q"]["name"]
    y["q"]["value"] = int(d["p"]["value"])

    return(y)
map_foo = udf(foo, y_schema)

# add the column
df3_new  = df3.withColumn("z", map_foo("y"))

# delete the column
df4 = df3_new.drop("y")


df4.printSchema()

output

root
 |-- x: long (nullable = true)
 |-- z: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)


df4.show()

output

+---+-------------------+
|  x|                  z|
+---+-------------------+
| 12|[[abc,10],[pqr,10]]|
+---+-------------------+
Sign up to request clarification or add additional context in comments.

10 Comments

@aswinids I have edited the question. Any thoughts on this one?
@aswinids : Thanks for helping. Do we have decima/timestamp data type in json schema?
@aswinids: If I change the value of 10 to "10" and use type: 'long', I get null
@zero323 Do you have any idea ?
@J.D It's working completely fine with above json_schema. can you check it again? And yes, I'm reading the json file after converting values to string.
|
0

It may seem simple to use arbitrary variable names but this is problematic and contrary to PEP8. And when dealing with numbers, I suggest avoiding the common names used in iterating over such structures... ie, value.

import json

with open('random.json') as json_file:
    data = json.load(json_file)

for k, v in data.items():
    if k == 'y':
        for key, item in v.items():
            item['value'] = float(item['value'])


print(type(data['y']['p']['value']))
print(type(data['y']['q']['value']))
# mac → python3 make_float.py
# <class 'float'>
# <class 'float'>
json_data = json.dumps(data, indent=4, sort_keys=True)
with open('random.json', 'w') as json_file:
    json_file.write(json_data)

out json file

2 Comments

The crucial part of this problem is that we have around 60GB of data produced everyday and we need to ensure the scalability and thats why Spark was the way out
Of course this would not be able o handle such a massive amount of data. Why did the question you referenced not work? From the documentation they give an example of dealing with this: ghostbin.com/paste/wt5y6

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.