0

I'm using dataframe in spark to split and store data in a tablular format. My data in file looks as below -

{"click_id": 123, "created_at": "2016-10-03T10:50:33", "product_id": 98373, "product_price": 220.50, "user_id": 1, "ip": "10.10.10.10"}
{"click_id": 124, "created_at": "2017-02-03T10:51:33", "product_id": 97373, "product_price": 320.50, "user_id": 1, "ip": "10.13.10.10"}
{"click_id": 125, "created_at": "2017-10-03T10:52:33", "product_id": 96373, "product_price": 20.50, "user_id": 1, "ip": "192.168.2.1"}

and I've written this code to split the data -

from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as psf

spark = SparkSession \
    .builder \
    .appName("Hello") \
    .config("World") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

ratings = spark.createDataFrame(
    sc.textFile("transactions.json").map(lambda l: l.split(',')),
    ["Col1","Col2","Col3","Col4","Col5","Col6"]
)

ratings.registerTempTable("ratings")

final_df = sqlContext.sql("select * from ratings");
final_df.show(20,False)

The above code works fine and gives the below output :

enter image description here

As you can see from the output the "click_id and number" is being shown, similarly created_at and timestamp is being shown.

I want to actually have only the values in the table - click_id, created_at, product_id and so on.

How do I get only those values into my table ?

2
  • You mean, removing the keys (click_id, created_at etc) and keeping only the values for all 6 columns? Commented Oct 16, 2017 at 15:13
  • @desertnaut Yes Commented Oct 16, 2017 at 15:14

2 Answers 2

2

In your map function, parse the json object instead of splitting it

map(lambda l: l.split(','))

should become

map(lambda l: json.loads(l))

(after you have imported json)

import json

Also if you remove the columns definition

["Col1","Col2","Col3","Col4","Col5","Col6"]

you will get the columns from json

Sign up to request clarification or add additional context in comments.

Comments

1

Assuming you want to use only the dataframe API, then you could use the following code:

ratings = spark.read.json("transactions.json")

This will load the json into a dataframe, mapping the json keys into column names. Then you can select and rename the columns with the code below.

ratings = ratings.select(col('click_id').alias('Col1'),
                         col('created_at').alias('Col2'),
                         col('product_id').alias('Col3'),
                         col('product_price').alias('Col4'),
                         col('user_id').alias('Col5'),
                         col('ip').alias('Col6'))

This way you can also cast columns into relevant datatypes, e.g. col('product_price').cast('double').alias('Col4') and properly save to database.

4 Comments

Is there a way to store individual column name as list and pass each of those values as parameters ?
If i understood correctly what you mean, you could have an array, e.g. arr = ["Col1","Col2","Col3","Col4","Col5","Col6"] and do df = df.select(arr)
For example - I want to store the click_id into a list/array and pass each of those values arr[0], arr[1] to a function get_value(click_id) which takes click_id as input parameter @geopet
You can try df.select('click_id').rdd.flatMap(lambda x: x).collect() which will produce an array of all click_id values, which can be handled as you wish.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.