2

I've googled it all day long and couldn't find straight answer, so ended up posting a question here.

I have a file containing line-delimited json objects:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "103b", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "103b", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

My goal is to parse this file with Apache Spark in Java. I referenced How to Parsing CSV or JSON File with Apache Spark and so far I could successfully parse each line of json to JavaRDD using Gson.

JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> data = sc.textFile("fileName");
JavaRDD<JsonObject> records = data.map(new Function<String, JsonObject>() {
    public JsonObject call(String line) throws Exception {
        Gson gson = new Gson();
        JsonObject json = gson.fromJson(line, JsonObject.class);
        return json;
    }
});

Where I'm really stuck is I want to deserialize the "rooms" array so that it can fit to my class Event.

public class Event implements Serializable {
    public static final long serialVersionUID = 42L;
    private String deviceId;
    private int timestamp;
    private String room;
    // constructor , getters and setters 
}

In other words, from this line:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}

I want to create two Event objects in Spark:

obj1: deviceId = "103b", timestamp = 1436941050, room = "Office"
obj2: deviceId = "103b", timestamp = 1436941050, room = "Foyer"

I did my little search and tried flatMapVlue, but no luck... It threw me an error...

JavaRDD<Event> events = records.flatMapValue(new Function<JsonObject, Iterable<Event>>() {
    public Iterable<Event> call(JsonObject json) throws Exception {
        JsonArray rooms = json.get("rooms").getAsJsonArray();
        List<Event> data = new LinkedList<Event>();
        for (JsonElement room : rooms) {
            data.add(new Event(json.get("device_id").getAsString(), json.get("timestamp").getAsInt(), room.toString()));
        }
        return data;
    }
});

I'm very new to Spark and Map/Reduce. I would be grateful if you can help me out. Thanks in advance!

1
  • Please, post your error. Edit your post and add the stacktrace Commented Jul 13, 2016 at 7:53

2 Answers 2

3

If you load json data into a DataFrame:

DataFrame df = sqlContext.read().json("/path/to/json");

You could easily do this by explode.

df.select(
    df.col("device_id"),
    df.col("timestamp"),
    org.apache.spark.sql.functions.explode(df.col("rooms")).as("room")
);

For input:

{"device_id": "1", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "2", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "3", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

You will get:

+---------+------+----------+
|device_id|  room| timestamp|
+---------+------+----------+
|        1|Office|1436941050|
|        1| Foyer|1436941050|
|        2|Office|1435677490|
|        2|   Lab|1435677490|
|        3|Office|1436673850|
|        3| Foyer|1436673850|
+---------+------+----------+
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for letting me know this useful feature. I didn't know Spark supports Hive like UDF. It's very helpful!
spark is full compatible with hive (*≧▽≦)
1
val formatrecord = records.map(fromJson[mapClass](_))

mapClass should be a case class for mapping the object inside the records json.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.