0

I am new to spark. I am trying to develop an application that saves json data to a Hive table Using Spark 1.6. Here is my code:

 val rdd = sc.parallelize(Seq(arr.toString)) //arr is the Json array
 val dataframe = hiveContext.read.json(rdd)
 dataframe.registerTempTable("RiskRecon_tmp")
 hiveContext.sql("DROP TABLE IF EXISTS RiskRecon_TOES")
 hiveContext.sql("CREATE TABLE RiskRecon_TOES as select * from RiskRecon_tmp")

when I run this, I get the following error:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: file:/tmp/spark-2c2e53f5-6b5f-462a-afa2-53b8cf5e53f1/scratch_hive_2017-07-12_07-41-07_146_1120449530614050587-1, expected: hdfs://nameservice1
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:660)
        at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:480)
        at org.apache.hadoop.hive.ql.Context.getStagingDir(Context.java:229)
        at org.apache.hadoop.hive.ql.Context.getExternalScratchDir(Context.java:359)
        at org.apache.hadoop.hive.ql.Context.getExternalTmpPath(Context.java:437)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:132)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.hive.execution.CreateTableAsSelect.run(CreateTableAsSelect.scala:89)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:145)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
        at test$.main(test.scala:25)
        at test.main(test.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

It gives me the error with the create table statement.

What does this error mean? Am I doing it the right way or is there a better way to save the dataframe to a table? Also if this code works, the table created would be an internal table? Ideally, I'll be needing an external table for my data.

Any help would be appreciated. Thank you.

3
  • what is rdd? How did you create it? Commented Jul 12, 2017 at 13:56
  • val rdd = sc.parallelize(Seq(arr.toString)). arr is the Json array. @philantrovert Commented Jul 12, 2017 at 13:58
  • You have methods like saveAsTable and insertInto that are built-in in Spark. You can try using those and see if it works. Then, the table you will refer to in those functions should already exist in Hive. Therefore, you can create your own EXTERNAL table for that. Commented Jul 12, 2017 at 14:01

1 Answer 1

1

Suppose df contains the data of your JSON file stored as a dataframe :

val df = sqlContext.read.json(rdd)

Then you can use saveAsTable to load that to your hive table. Please note that the hive table you are loading to, should already exist in the desired location so you can create an EXTERNAL table if you like. And that your spark user has access to write data to that respective folder.

df.write.mode("append").saveAsTable("database.table_name")

Depending on your requirement, you can use several other write modes that are available like append, overwrite etc.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the answer. However, I read here cloudera.com/documentation/enterprise/release-notes/topics/… that using saveAsTable will create tables that are not compatible with Hive. I'll try the solution anyaway. @philantrovert
@HemanthAnnavarapu Note that you are not creating the table through Spark. Your table already exists in Hive. You're just loading the dataframe to it.
Yes, you're right. It works. Thanks a lot! Please add your comment to the answer and I'll accept it. @philantrovert
Hello. I've used the same solution to write to a partitioned table and I get this error: java.lang.IllegalArgumentException: Wrong FS: file:/tmp/spark-44151075-0d85-41df-8d96-68649ddd5ca5/scratch_hive, expected: hdfs://nameservice1. I am using this statement: DF.write.partitionBy("column").mode("append").saveAsTable("tablename"). Am I missing something? @philantrovert

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.