3
  1. I'm using following run time spark configuration values

spark-submit --executor-memory 8G --spark.yarn.executor.memoryOverhead 2G

but it still raise following out of memory error:

I have a pairRDD having 8362269460 lines and partition size is 128 .It raise this error when pairRDD.groupByKey.saveAsTextFile .Any clue?

update: I add a filter,and now data lines is 2300000000.Running in spark shell,no error. My cluster: 19 datenode 1 namdnode

             Min Resources: <memory:150000, vCores:150>
             Max Resources: <memory:300000, vCores:300>

Thanks for your help.

org.apache.spark.shuffle.FetchFailedException: java.lang.OutOfMemoryError: Direct buffer memory
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:132)
  at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
  at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:89)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:  Direct buffer memory
  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  ... 1 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
  at java.nio.Bits.reserveMemory(Bits.java:658)
  at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
  at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
  at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
  at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
  at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:121)
  at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
  at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
  at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
  at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
  at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
  ... 10 more
)

I'd like to know how to correctly configure Direct Memory size. Best regards

6
  • 1
    please format your question properly and give some context to it Commented Jan 21, 2016 at 11:55
  • @ssyue -XX:MaxDirectMemorySize Commented Jan 21, 2016 at 11:57
  • @manRo sorry,english is my weakness. Commented Jan 21, 2016 at 13:42
  • @Marek-A- thanks,but how to set it on spark application? Commented Jan 21, 2016 at 13:52
  • Post the spark-defaults.conf file, it will provide context to the question, and use Garbage collection correctly. G1GC should be used. Commented Jan 22, 2016 at 4:22

1 Answer 1

2

I do not know any details about spark app, but i find the memory configuration here you need to set -XX:MaxDirectMemorySize similar as any else JVM mem. setting (over -XX:) try to use spark.executor.extraJavaOptions

If you are using spark-submit you can use:

./bin/spark-submit --name "My app" ...
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=512m" myApp.jar
Sign up to request clarification or add additional context in comments.

4 Comments

but this memory error rather means that your application has any memory issue, e.g you read whole stream content to memory buffer
@ravindra I have a pairRDD having 8362269460 lines (eg:(867196025682574,(A10000456C2DA1,0.0010017530678687703))) and partition size is 128 .It raise this error when pairRDD.groupByKey.saveAsTextFile
As suggested before, and also by different folks here, please post the code and the spark-defaults.conf file. Your comments do not provide enough context to the problem
@ssyue - the value you use in parameter needs to correspond with amount data you processing within direct memory, it may still raise same issue if the data are to huge. Please post your code, solution will be in change of processing algorithm.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.