4

I'm running spark via pycharm and respectively pyspark shell. I've stacked with this error:

: java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416)
    at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:748)

My code is:

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
import time

if __name__ == '__main__':

    print("Started at " + time.strftime("%H:%M:%S"))

    conf = (SparkConf()
            .setAppName("TestRdd") \
            .set('spark.driver.cores', '1') \
            .set('spark.executor.cores', '1') \
            .set('spark.driver.memory', '16G') \
            .set('spark.executor.memory', '9G'))
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(range(1000000000),100)

    print(rdd.take(10))

    print("Finished at " + time.strftime("%H:%M:%S"))

These are max memory settings,I can set on the cluster. I tried to allocate all memory to 1 core for creating rdd. But seems to me that application fails before distributing dataset. It fails on creating step I assume. Also I tried to set various number of partitions 100-10000. I've calculated how much memory it would take, so 1Billion of int - aproximately 4.5-4.7Gb in memory, less then I have, but no luck.

How can I optimize and force to run my code?

0

1 Answer 1

2

TL;DR Don't use parallelize outside tests and simple experiments. Because you use Python 2.7, range is not lazy, so you'll materialize a full range of values multiple types:

  • Python list after the call.
  • Serialized version which will be later written to disk.
  • Serialized copy loaded on JVM.

Using xrange would help, but you shouldn't use parallelize in the first place (or Python 2 in 2018).

If you want to create a series of values just use SparkContext.range

range(start, end=None, step=1, numSlices=None)

Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as end, and start is set to 0.

so in your case:

rdd = sc.range(1000000000, numSlices=100)

With DataFrame:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.range(1000000000, numPartitions=100)
Sign up to request clarification or add additional context in comments.

1 Comment

sc.range needs to store only the beginning and end of each partition.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.