6

I loaded a parquet file into a spark dataframe as follows :

val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")

when I perform a collect on my dataframe I get the following result :

message.collect()

Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.

The schema of this dataframe is

message.printSchema()
root


 |-- Id: string (nullable = true)
 |-- publishTime: string (nullable = true)
 |-- data: string (nullable = true)

My aim is to work on the data column which holds json data and to flatten it. I wrote the following code

val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))


val dataframe_extract=message.select($"Id",
$"publishTime",
from_json($"data",schemaTotal).as("content"))

val table = dataframe_extract.select(
$"Id",
$"publishTime",
$"content.id" as "id",
$"content.device_id" as "device_id",
$"content.group_id" as "group_id",
$"content.group" as "group",
$"content.profile_id" as "profile_id",
$"content.profile" as "profile",
$"content.type" as "type",
$"content.timestamp" as "timestamp",
$"content.count" as "count",
$"content.payload.timestamp" as "timestamp2",
$"content.payload.data.battery" as "battery",
$"content.payload.data.temperature" as "temperature",
$"content.payload_encrypted" as "payload_encrypted",
$"content.payload_cleartext" as "payload_cleartext",
$"content.device_properties.appeui" as "appeui"
)

table.show() gives me null values for all columns:

    +---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|             Id|         publishTime|  id|device_id|group_id|group|profile_id|profile|type|timestamp|count|timestamp2|battery|temperature|payload_encrypted|payload_cleartext|appeui|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|118738748835150|2018-08-20T17:44:...|null|     null|    null| null|      null|   null|null|     null| null|      null|   null|       null|             null|             null|  null|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+

, whereas table.printSchema() gives me the expected result, any idea how to solve this, please? I am working with Zeppelin as a first prototyping step thanks a lot in advance for your help. Best Regards

1 Answer 1

16

from_json() SQL function has below constraint to be followed to convert column value to a dataframe.

  1. whatever the datatype you have defined in the schema should match with the value present in the json, if there is any column's mismatch value leads to null in all column values

e.g.:

'{"name": "raj", "age": 12}' for this column value

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

The above schema will return you a null value on both the columns

StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))

The above schema will return you an expected dataframe

For this thread possible reason could be this, if there is any mismatched column value present, from_json will return all column value as null

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.