1

My fnl2 dataset is of the form:

scala> fnl2.first()
res4: org.apache.spark.mllib.regression.LabeledPoint = (0.0,(612515,[28693,86703,94568,162663,267733,292870,327313,347868,362660,396595,415817,436773,443713,470149,485282,486556,489594,496185,541453,570126,571088],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

scala> fnl2.count()
res5: Long = 775946

Then, I try to build a SVMWithSGD model:

import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

val splits = fnl2.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

But I get the following Java heap size error and then the spark context closes unexpectedly:

15/08/10 09:15:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/08/10 09:15:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
15/08/10 09:23:50 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-30] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
    at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/08/10 09:23:56 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(eastspark1,57211) not found
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala> 15/08/10 09:23:56 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(10.2.0.14,37151) not found
15/08/10 09:23:56 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(10.2.0.16,54187)
java.nio.channels.ClosedChannelException
    at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:257)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:300)
    at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
    at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The spark context is based on 12 cores with 8G of memory on each node.

Any ideas?

EDIT

This is the error I get after increasing the diver's memory to 5G: export SPARK_DRIVER_MEMORY="5000M"

scala> val model = SVMWithSGD.train(training, numIterations)
15/08/10 11:33:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/08/10 11:33:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
Exception in thread "qtp950243028-158" java.lang.OutOfMemoryError: GC overhead limit exceeded
15/08/10 11:46:26 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-32] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
    at com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:90)
    at com.google.protobuf_spark.CodedInputStream.readBytes(CodedInputStream.java:289)
    at akka.remote.WireFormats$SerializedMessage$Builder.mergeFrom(WireFormats.java:2700)
    at akka.remote.WireFormats$SerializedMessage$Builder.mergeFrom(WireFormats.java:2546)
    at com.google.protobuf_spark.CodedInputStream.readMessage(CodedInputStream.java:275)
    at akka.remote.WireFormats$RemoteEnvelope$Builder.mergeFrom(WireFormats.java:1165)
    at akka.remote.WireFormats$RemoteEnvelope$Builder.mergeFrom(WireFormats.java:949)
    at com.google.protobuf_spark.CodedInputStream.readMessage(CodedInputStream.java:275)
    at akka.remote.WireFormats$AckAndEnvelopeContainer$Builder.mergeFrom(WireFormats.java:479)
    at akka.remote.WireFormats$AckAndEnvelopeContainer$Builder.mergeFrom(WireFormats.java:300)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at com.google.protobuf_spark.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at com.google.protobuf_spark.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709)
    at akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:234)
    at akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181)
    at akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:821)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:755)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/08/10 11:46:45 WARN AbstractNioWorker: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
    at org.jboss.netty.buffer.HeapChannelBuffer.<init>(HeapChannelBuffer.java:42)
    at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.<init>(BigEndianHeapChannelBuffer.java:34)
    at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
    at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:69)
    at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:75)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:472)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:333)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
5
  • 1
    how much memory did you allocate for JVM? I suggest reading this stackoverflow.com/questions/8331135/… . The default 512M might not be enough for you. Commented Aug 10, 2015 at 9:59
  • re-run it with 5G of memory; export SPARK_DRIVER_MEMORY="5000M" I am still facing problem yet the problem is slightly different now. please have a look on my edit. Commented Aug 10, 2015 at 12:06
  • The problem might be that your data is simply too big to be kept in memory. You can try to calculate the SVM model on a smaller sample size. Commented Aug 10, 2015 at 12:38
  • Wierd, the data set does not look that big, yet it is chucking out GC overhead exceeded. Do try to reduce the size of fn12 and try to see if it works. It could also be due to the way you are calling SVMWithSGD Commented Aug 10, 2015 at 13:07
  • finally it works. instead of using export SPARK_DRIVER_MEMORY="5000M", I increased driver's memory on the spark config level. Commented Aug 14, 2015 at 10:29

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.