0

Using Spark 2.11, I've the following Dataset (read from Cassandra table):

+------------+----------------------------------------------------------+
|id         |attributes                                                 |
+------------+----------------------------------------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]     |
+------------+----------------------------------------------------------+

This is the printSchema():

root
 |-- id: string (nullable = true)
 |-- attributes: string (nullable = true)

The attributes column is an array of JSON objects. I'm trying to explode it into Dataset but keep failing. I was trying to define schema as follow:

StructType type = new StructType()
                .add("id", new IntegerType(), false)
                .add("name", new StringType(), false)
                .add("score", new FloatType(), false)
                .add("snippets", new IntegerType(), false );
        
ArrayType schema = new ArrayType(type, false);

And provide it to from_json as follow:

df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));

This fails with MatchError:

Exception in thread "main" scala.MatchError: org.apache.spark.sql.types.IntegerType@43756cb (of class org.apache.spark.sql.types.IntegerType)

What's the correct way to do that?

2 Answers 2

2

You can specify the schema this way :

val schema = ArrayType(
  StructType(Array(
    StructField("id", IntegerType, false),
    StructField("name", StringType, false),
    StructField("score", FloatType, false),
    StructField("snippets", IntegerType, false)
  )),
  false
)

val df1 = df.withColumn("val", from_json(col("attributes"), schema))

df1.show(false)

//+-----------+------------------------------------------------------+------------------------+
//|id         |attributes                                            |val                     |
//+-----------+------------------------------------------------------+------------------------+
//|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
//+-----------+------------------------------------------------------+------------------------+

Or for Java:

import static org.apache.spark.sql.types.DataTypes.*;


StructType schema = createArrayType(createStructType(Arrays.asList(
    createStructField("id", IntegerType, false),
    createStructField("name", StringType, false),
    createStructField("score", FloatType, false),
    createStructField("snippets", StringType, false)
)), false);
Sign up to request clarification or add additional context in comments.

Comments

1

You can define the schema as a literal string instead:

val df2 = df.withColumn(
    "val",
    from_json(
        df.col("attributes"),
        lit("array<struct<id: int, name: string, score: float, snippets: int>>")
    )
)

df2.show(false)
+-----------+------------------------------------------------------+------------------------+
|id         |attributes                                            |val                     |
+-----------+------------------------------------------------------+------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
+-----------+------------------------------------------------------+------------------------+

If you prefer to use a schema:

val spark_struct = new StructType()
                .add("id", IntegerType, false)
                .add("name", StringType, false)
                .add("score", FloatType, false)
                .add("snippets", IntegerType, false)

val schema = new ArrayType(spark_struct, false)

val df2 = df.withColumn(
    "val",
    from_json(
        df.col("attributes"),
        schema
    )
)

Two problems with your original code were: (1) you used the reserved keyword type as a variable name, and (2) you don't need to use new in add.

5 Comments

This doesn't compiles for me as from_json expect the 2nd argument to be a DataType rather than Column.. I'm using 2.11
@Seffy then try the second method ?
Tried, same error.. What do u meant no need to use 'new'? My coding is Java, not Scala..
Oh sorry, I misunderstood... this answer might be helpful for java
Now no exception, but the from_json hangs for ever.. Dataset contains just 1 row, how clue to check what's going on there? Running with local.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.