0

Problem Statement:

Hi, I am a newbie to the Spark World. I want to query the MySQL Database and then load one table into the Spark. Then I want to apply some filter on the table using SQL Query. Once the result is filtered I want to return the result as JSON. All this we have to do from a standalone Scala base application.

I am struggling to initialize the Spark Context and getting an error. I know I am missing some piece of information.

Can Somebody have a look on the code and tell me what I need to do.

Code:

import application.ApplicationConstants
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SparkSession, Dataset, Row, Column, SQLContext}

var sc: SparkContext = null

    val sparkSession = SparkSession.builder().master("spark://10.62.10.71:7077")
      .config("format","jdbc")
      .config("url","jdbc:mysql://localhost:3306/test")
      .config("user","root")
      .config("password","")
      .appName("MySQLSparkConnector")
      .getOrCreate()

    var conf = new SparkConf()
    conf.setAppName("MongoSparkConnectorIntro")
      .setMaster("local")
      .set("format", "jdbc")
      .set("url","jdbc:mysql://localhost:3306/test")
      .set("user","root")
      .set("password","")

    sc = new SparkContext(conf)
val connectionProperties = new java.util.Properties
    connectionProperties.put("user", username)
    connectionProperties.put("password", password)
     val customDF2 = sparkSession.read.jdbc(url,"employee",connectionProperties)

    println("program ended")

Error:

Following is the error that I am getting:

64564 [main] ERROR org.apache.spark.SparkContext - Error initializing SparkContext.
java.lang.NullPointerException
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:560)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$5(SparkSession.scala:935)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at manager.SparkSQLMySQLDBConnector$.main(SparkSQLMySQLDBConnector.scala:21)
    at manager.SparkSQLMySQLDBConnector.main(SparkSQLMySQLDBConnector.scala)
64566 [main] INFO org.apache.spark.SparkContext - SparkContext already stopped.
Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:560)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$5(SparkSession.scala:935)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at manager.SparkSQLMySQLDBConnector$.main(SparkSQLMySQLDBConnector.scala:21)
    at manager.SparkSQLMySQLDBConnector.main(SparkSQLMySQLDBConnector.scala)

P.S: If anybody can give me any link or tutorial that is showing the similar scenario with Scala.

Versions:

Spark: 2.4.0 Scala: 2.12.8 MySQL Connector Jar: 8.0.13

0

1 Answer 1

1

I think you are messing around creating spark context and configs to connect MYSQL

IF you are using spark 2.0+ only use SparkSession as a entry-point as

val spark = SparkSession.builder().master("local[*]").appName("Test").getOrCreate

//Add Properties asbelow  
val prop = new java.util.Properties()
prop.put("user", "user")
prop.put("password", "password")
val url = "jdbc:mysql://host:port/dbName"

Now read the table with as dataframe

val df = spark.read.jdbc(url, "tableName", prop)

To access sparkContext and sqlContext you can access from SparkSession as

val sc = spark.sparkContext

val sqlContext = spark.sqlContext

Make sure you have mysql-connector-java jar in classpath, Add the dependency to your pom.xml or built.sbt

Hope this helps!

Sign up to request clarification or add additional context in comments.

10 Comments

thanks for sharing the code. I have updated the question with the exact versions. Whe I try your stated logic. I am now getting following error on my Spark Console:
Error while invoking RpcHandler#receive() for one-way message.java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 5355123861728677005, local class serialVersionUID = 6543101073799644159
can you use scala 2.11, I think for scala 2.12 is still experimental.
can u guide me in another query. If I need to execute a query on the table that I read. I will do it on the DataFrame or in the SQL Context?
You can both ways, you can do it on dataframe or you can register a table as df.createOrReplaceTempView("tableName") and use as spark.sql("SELECT * From tableName") or df.sqlContext.sql("SELECT * From tableName")
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.