0

I'm trying to aggregate a CSV file via Spark SQL and then show the result as JSON:

val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

That's where I'm stuck. I can to a result.schema().prettyJson() which works flawlessly, but I don't find a way to return the result as JSON.

I was assuming that result.toJSON.collect() should do what I desire, but this fails with a

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:171)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

error. Can somebody guide me?

3 Answers 3

2

The error you're getting is odd, it sounds like result is probably empty?

You might want to try this command on the dataframe to get each line printed out instead:

result.toJSON.foreach(println)

See the Dataframe API for a little more information

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for your answer. I'm search for a way to receive a complete Dataframe as single JSON. What you're suggesting would print each Row as JSON, if I understand correctly (Spark/Scala beginner here...)
It would yes, does it output anything? As I say, I think your result is probably empty at the moment based on the error you're getting. result.toJSON.collect() should return something like this based on the spark people.json test file: Array[String] = Array({"name":"Andy","cnt":1}, {"name":"Michael","cnt":1}, {"name":"Justin","cnt":1})
I can do a people.show() but not a result.show()
The joy of Spark, every error message is strange :) glad you've sorted it
1

Turns out this error was because of a "malformed" CSV file. It contained some rows which had more columns than others (with no header field name)... Strange error message though.

4 Comments

Indeed, but one can only accept own answers after two days
It turns out this will happen even if the file is well formed but the schema you specify doesn't have enough fields. I just encountered it with a file that had an extra comma at the end of each line, and solved it by adding an extra string field to the schema.
Additionally, it seems this this has nothing to do with JSON. It seems to me that anything you do that actually forces the query to run will hit this.
Well, I used the inferSchema option, so I didn't specify a schema myself.
1

Try

val people = sqlContext.read().format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("mode", "DROPMALFORMED")
  .option("delimiter", ",")
  .load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.