5

I'm new to pyspark, I have a list of jsons coming from an api, each json object has same schema(key-value pair). Like this

[ {'count': 308,
  'next': 'some_url',
  'previous': None,
  'results': [{'assigned_to': 43,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': {'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Alisha'}}]},
  {'count': 309,
  'next': 'some_url',
  'previous': None,
  'results': [{'assigned_to': 44,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': {'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Ali'}}]},......}]

if it would have been separate json files. I would have created dataframe using

df =spark.read.json('myfile.json') and then would have merged all dataframes into one. I'm facing issue in converting the datframe directly from list itself. I have used this

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Basics").getOrCreate()
sc= spark.sparkContext
df = pyspark.sql.SQLContext(sc.parallelize(data_list))`

It gives me AttributeError: 'RDD' object has no attribute '_jsc'

4
  • How d you call that API? Is there a loop or some interval based daemon running? Also all messages share the same schema? Commented May 28, 2019 at 13:42
  • a function has loop in it, if there's any url in next key (check the json), then it keeps fetching the data, until next is not null. Commented May 29, 2019 at 6:06
  • @Rohan Kumar I have a similar problem where I have to read incoming json data in batches and dump it to some file. The output file thus has list of json objects. Can you share how did you loop them Commented Apr 27, 2021 at 1:48
  • @Neha0908 not sure how I did it back then, but you can use Apache Kafka to capture streaming data & then load the specific variables from the data in Pyspark. spark.apache.org/docs/2.1.0/… Commented Apr 28, 2021 at 2:17

1 Answer 1

8

I couldn't find a straight forward answer to your problem. But this solution works,

import json
import ast

df = sc.wholeTextFiles(path).map(lambda x:ast.literal_eval(x[1]))\
                            .map(lambda x: json.dumps(x))

df = spark.read.json(df)

This will give you output as,

+-----+--------+--------+--------------------+
|count|    next|previous|             results|
+-----+--------+--------+--------------------+
|  308|some_url|    null|[[43,Unused,null,...|
|  309|some_url|    null|[[44,Unused,null,...|
+-----+--------+--------+--------------------+

EDIT: If it is in a variable, all you have to do is,

import json

df = sc.parallelize(data).map(lambda x: json.dumps(x))
df = spark.read.json(df)
Sign up to request clarification or add additional context in comments.

5 Comments

path?? I do not have any file just a list of json strings/objects. Did you assume we have these files??
Yes! Do you have this in a variable?
It's in a variable, not a file.
I worked on it & realised there were some other problems too like environment variables pyspark_python & pyspark_driver_python were not same. I was running multiple iterations of sparkContext.
For variable example @mayankagrawal, what is data in .parallize(). I have a json string hold in a variable.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.