2

I have a Dataset<String> ds which consists of json rows.

Sample Json Row (This is just an example of one row in the dataset)

[ 
    "{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}", 
    "{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}"

]

ds.printSchema()

root
 |-- value: string (nullable = true)

Now I want to convert into the following dataset using Spark 2.2.0

name  |             address               |  docs 
----------------------------------------------------------------------------------
"foo" | {"state": "CA", "country": "USA"} | [{"subject": "english", "year": 2016}]
"bar" | {"state": "OH", "country": "USA"} | [{"subject": "math", "year": 2017}]

Preferably Java but Scala is also fine as long as there are functions available in Java API

Here is what I tried so far

val df = Seq("""["{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}", "{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}" ]""").toDF

df.show(false)

|value                                                                                                                                                                                                                     |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|["{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}", "{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}" ]|
0

2 Answers 2

1

I have found a workaround in Java. I hope this helps.

Create a Bean class (TempBean in my case)

import java.util.List;
import java.util.Map;

public class TempBean
    {
        String name;
        Map<String, String> address;
        List<Map<String, String>> docs;
        public String getName()
            {
                return name;
            }
        public void setName(String name)
            {
                this.name = name;
            }
        public Map<String, String> getAddress()
            {
                return address;
            }
        public void setAddress(Map<String, String> address)
            {
                this.address = address;
            }
        public List<Map<String, String>> getDocs()
            {
                return docs;
            }
        public void setDocs(List<Map<String, String>> docs)
            {
                this.docs = docs;
            }

    }

Use the following code with below imports:

//import com.fasterxml.jackson.core.JsonGenerator;
//import com.fasterxml.jackson.core.JsonParseException;
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.core.type.TypeReference;
//import com.fasterxml.jackson.databind.JsonMappingException;
//import com.fasterxml.jackson.databind.ObjectMapper;

ObjectMapper mapper = new ObjectMapper();
List<String> dfList = ds.collectAsList(); //using your Dataset<String>
List<TempBean> tempList = new ArrayList<TempBean>();
try
    {
        for (String json : dfList)
            {
             List<Map<String, Object>> mapList = mapper.readValue(json, new TypeReference<List<Map<String, Object>>>(){});
             for(Map<String,Object> map : mapList)
             {
                TempBean temp = new TempBean();
                temp.setName(map.get("name").toString());
             temp.setAddress((Map<String,String>)map.get("address"));
             temp.setDocs((List<Map<String,String>>)map.get("docs"));
             tempList.add(temp);
             }
            }
    }
catch (JsonParseException e)
    {
        e.printStackTrace();
    }
catch (JsonMappingException e)
    {
        e.printStackTrace();
    }
catch (IOException e)
    {
        e.printStackTrace();
    }

Create dataframe:

Dataset<Row> dff = spark.createDataFrame(tempList, TempBean.class);

Show database

dff.show(false);
+--------------------------------+---------------------------------------+----+
|address                         |docs                                   |name|
+--------------------------------+---------------------------------------+----+
|Map(state -> CA, country -> USA)|[Map(subject -> english, year -> 2016)]|foo |
|Map(state -> OH, country -> USA)|[Map(subject -> math, year -> 2017)]   |bar |
+--------------------------------+---------------------------------------+----+

Print Schema:

dff.printSchema();
root
 |-- address: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- docs: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- name: string (nullable = true)
Sign up to request clarification or add additional context in comments.

Comments

1

You can use function from_json combined with explode to separate each row of your dataset.

from_json exists since Spark 2.1 and transforms columns containing JSON string to Spark's structure. It takes two main arguments, the column containing the json and the json schema. Json schema can be defined using JSON string, DDL string or Spark's DataType.

Then, as your JSON represent an array, you use explode (existing since Spark 1.3) to transform a column containing an array to several rows each containing an element of the array.

So the complete code is:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.HashMap;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.from_json;

public class Functions {

  public static Dataset<Row> convertJson(Dataset<Row> jsonRows) {
    return jsonRows.withColumn("json", from_json(
        col("value"),
        "array<struct<name : string, address : struct<state : string, country : string>, docs : array<struct<subject: string, year: int>>>>",
        new HashMap<>()
      ))
      .withColumn("json", explode(col("json")))
      .select(col("json.*"));

  }
}

Note: I chose DDL representation for schema

So, if you apply previous convertJson function with your input:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}, {"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

You get the following result dataframe:

+----+---------+-----------------+
|name|address  |docs             |
+----+---------+-----------------+
|foo |{CA, USA}|[{english, 2016}]|
|bar |{OH, USA}|[{math, 2017}]   |
+----+---------+-----------------+

having the following schema:

root
 |-- name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- country: string (nullable = true)
 |-- docs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- subject: string (nullable = true)
 |    |    |-- year: integer (nullable = true)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.