3

I am trying to run a Scala example with SBT to read data from MongoDB. I am getting this error whenever I try to access the data read from Mongo into the RDD.

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1431)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

I have imported the Dataframe explicitly, even though it is not used in my code. Can anyone help with this issue?

My code:

package stream

import org.apache.spark._
import org.apache.spark.SparkContext._
import com.mongodb.spark._
import com.mongodb.spark.config._
import com.mongodb.spark.rdd.MongoRDD
import org.bson.Document
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object SpaceWalk {

def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("SpaceWalk")
    .setMaster("local[*]")
    .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/nasa.eva")
    .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/nasa.astronautTotals")

    val sc = new SparkContext(sparkConf)
    val rdd = sc.loadFromMongoDB()


    def  breakoutCrew (  document: Document  ): List[(String,Int)]  = {
    println("INPUT"+document.get( "Duration").asInstanceOf[String])
      var minutes = 0;
      val timeString = document.get( "Duration").asInstanceOf[String]
      if( timeString != null && !timeString.isEmpty ) {
        val time =  document.get( "Duration").asInstanceOf[String].split( ":" )
        minutes = time(0).toInt * 60 + time(1).toInt
      }

      import scala.util.matching.Regex
      val pattern = new Regex("(\\w+\\s\\w+)")
      val names =  pattern findAllIn document.get( "Crew" ).asInstanceOf[String]
      var tuples : List[(String,Int)] = List()
      for ( name <- names ) { tuples = tuples :+ (( name, minutes ) ) }

      return tuples
    }

    val logs = rdd.flatMap( breakoutCrew ).reduceByKey( (m1: Int, m2: Int) => ( m1 + m2 ) )

    //logs.foreach(println)

    def mapToDocument( tuple: (String, Int )  ): Document = {
      val doc = new Document();
      doc.put( "name", tuple._1 )
      doc.put( "minutes", tuple._2 )

      return doc
    }

    val writeConf = WriteConfig(sc)
    val writeConfig = WriteConfig(Map("collection" -> "astronautTotals", "writeConcern.w" -> "majority", "db" -> "nasa"), Some(writeConf))

    logs.map( mapToDocument ).saveToMongoDB( writeConfig )

    import org.apache.spark.sql.SQLContext
    import com.mongodb.spark.sql._
    import org.apache.spark.sql.DataFrame

    // load the first dataframe "EVAs"
    val sqlContext = new SQLContext(sc);
    import sqlContext.implicits._
    val evadf = sqlContext.read.mongo()
    evadf.printSchema()
    evadf.registerTempTable("evas")

    // load the 2nd dataframe "astronautTotals"

    val astronautDF = sqlContext.read.option("collection", "astronautTotals").mongo[astronautTotal]()
    astronautDF.printSchema()
    astronautDF.registerTempTable("astronautTotals")

    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes FROM astronautTotals"  ).show()


    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes, evas.Vehicle, evas.Duration FROM " +
      "astronautTotals JOIN evas ON astronautTotals.name LIKE evas.Crew"  ).show()
}
}
 case class astronautTotal ( name: String, minutes: Integer )

This is my sbt file -

name := "Project"
version := "1.0"    
scalaVersion := "2.11.7"    
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
//libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.1"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "0.1"  

addCommandAlias("c1", "run-main stream.SaveTweets")
addCommandAlias("c2", "run-main stream.SpaceWalk")

outputStrategy := Some(StdoutOutput)
//outputStrategy := Some(LoggedOutput(log: Logger))    
fork in run := true

1 Answer 1

5

This error message is because you are using an incompatible library that only supports Spark 1.x. You should use mongo-spark-connector 2.0.0+ instead. See: https://docs.mongodb.com/spark-connector/v2.0/

Sign up to request clarification or add additional context in comments.

2 Comments

Thx. I faced the same problem when trying to use the Spark Cassandra connector and your answer helped fix it.
Even , i think i am also facing the same issue. I have Spark 1.6.3 which has Scala 2.10.5 I am using the Mongo DB Connector Version 1.1 and package 2.10 <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId> <version>1.1.0</version> </dependency> Getting error like "java.lang.NoClassDefFoundError: com/mongodb/spark/rdd/api/java/JavaMongoRDD " But was able to find the class existing in the reference libraries.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.