0

I am new to scala; The following code, is not printing the values from the df and spark is not stopped it still continues even after 1/2 hour of running this code.

    import java.sql.DriverManager
    import java.sql.Connection
    import org.apache.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql.SQLContext._
    import org.apache.spark.sql._
    import java.util.concurrent.TimeUnit

    object MysqlTest {

     def main(args: Array[String]) {
       val prop = new java.util.Properties()
       val conf = new SparkConf().setAppName("MysqlDataLoad").setMaster("local")
       val sc = new SparkContext(conf)
       val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

       prop.put("user", "***")

       prop.put("password", "*****")

       val url = "jdbc:mysql://acb-cluster.cluster-cfdz.us-wt-2.rds.amazonaws.com:3306/gsl"

       val df: DataFrame = sqlcontext.read.jdbc(url, "test_20160930_result_prop_alpha", prop)

       df.createOrReplaceTempView("gsl")

// Create dataframe of required columns from GSL table

       println("********* Data For GSL **********")

       val dataFrame2 = sqlcontext.sql("select * from gsl limit 10")

       dataFrame2.show()

       sc.stop()
      }

    }

Logs :

7/05/31 12:30:51 INFO Executor: Starting executor ID driver on host localhost
17/05/31 12:30:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41593.
17/05/31 12:30:51 INFO NettyBlockTransferService: Server created on 192.168.0.132:41593
17/05/31 12:30:51 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/05/31 12:30:51 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.132:41593 with 1407.3 MB RAM, BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:51 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.132, 41593, None)
17/05/31 12:30:52 INFO SharedState: Warehouse path is 'file:/home/vna/spark_workspace/sz-dw-etl/spark-warehouse/'.
17/05/31 12:30:57 INFO SparkSqlParser: Parsing command: gsl
********* Data For GSL **********17/05/31 12:30:57 INFO SparkSqlParser: Parsing command: select * from gsl limit 10

17/05/31 12:30:57 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
17/05/31 12:30:58 INFO CodeGenerator: Code generated in 320.985934 ms
17/05/31 12:30:58 INFO SparkContext: Starting job: collect at MysqlTest.scala:34
17/05/31 12:30:58 INFO DAGScheduler: Got job 0 (collect at MysqlTest.scala:34) with 1 output partitions
17/05/31 12:30:58 INFO DAGScheduler: Final stage: ResultStage 0 (collect at MysqlTest.scala:34)
17/05/31 12:30:58 INFO DAGScheduler: Parents of final stage: List()
17/05/31 12:30:58 INFO DAGScheduler: Missing parents: List()
17/05/31 12:30:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at collect at MysqlTest.scala:34), which has no missing parents
17/05/31 12:30:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 14.8 KB, free 1407.3 MB)
17/05/31 12:30:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 6.2 KB, free 1407.3 MB)
17/05/31 12:30:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.132:41593 (size: 6.2 KB, free: 1407.3 MB)
17/05/31 12:30:58 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/05/31 12:30:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at collect at MysqlTest.scala:34)
17/05/31 12:30:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/05/31 12:30:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5723 bytes)
17/05/31 12:30:58 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

EDIT : By fetching another smaller table from the database, this is returning results.

Not sure why even though I limit my query to 10 records, it still needs to fail.

Since I am running a spark cluster on my local ( 12 gb machine) Does it need more memory to operate? All I am trying to run, is a single 10 record query. ( Running this by SCALA IDE)

More details of the table I am trying to fetch is : its 44 gb, has 100000000 records. But my query clearly limits it to fetch 10 records without any kind of sort.

5
  • 2
    Can you change setMaster("local") to setMaster("local[*]")? local gives you 1 core for execution while local[*] will take as much as available. Commented May 31, 2017 at 7:43
  • Can you also make sure you can access jdbc:mysql://acb-cluster.cluster-cfdz.us-wt-2.rds.amazonaws.com:3306/gsl using MySQL-specific tools? Commented May 31, 2017 at 7:45
  • 2
    Possible duplicate of More than one hour to execute pyspark.sql.DataFrame.take(4) Commented May 31, 2017 at 7:48
  • I can connect to that db through mysql workbench from my system; Commented May 31, 2017 at 8:54
  • 1
    If it was a connection issue, shouldnt it timeout Commented May 31, 2017 at 8:58

1 Answer 1

1

Try doing something like this :

  val properties = new Properties()
  properties.put("user", "root")
  properties.put("password", "123456")
  
  val url = "jdbc:mysql://localhost:3306/sakila"

  val df = spark.read.jdbc(url,"actor",properties = properties)

Make sure the connector is loaded.

libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.49"
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.