1

I am trying to convert a spark data frame to pandas and there is a error I am encountering with:

    databricks/spark/python/pyspark/sql/pandas/conversion.py:145: UserWarning: toPandas attempted
    Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has 
    reached the error below and can not continue. Note that 
    'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in 
     the middle of computation.
     An error occurred while calling o466.getResult.
    : org.apache.spark.SparkException: Exception thrown in awaitResult: 
     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:107)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:103)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 
    43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188- 
    62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at 
    
    
    
   java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
    at 
    org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
    at 
    org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
    at org.apache.spark.scheduler.DAGScheduler
    at org.apache.spark.scheduler.DAGScheduler
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2476)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2459)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2571)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3761)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3765)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3731)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3825)
    at org.apache.spark.sql.execution.SQLExecution$:130)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3823)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3731)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3730)
    at org.apache.spark.security.SocketAuthServer$
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.security.SocketAuthServer$
    at org.apache.spark.security.SocketAuthServer$.
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:117)
    at org.apache.spark.security.SocketAuthServer$$a
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:70)
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

warnings.warn(msg)

Below is the how the data frame/code looks like- (Dataset is formed by different datasets and joining) enter image description here

2
  • 1
    why you need to convert it at all? if you are more familiar with Pandas it is better to use new Pandas for Spark API. Normal Panda will not be executed on workers. Commented Nov 10, 2021 at 12:41
  • 1
    I am learning spark that's why. Commented Nov 11, 2021 at 13:02

3 Answers 3

2

In the traceback it says:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 
    43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188- 
    62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space

Note: java.lang.OutOfMemoryError: Java heap space

You ran out of memory. That's causing the issue.

This is expected, since it uses the single machine to gather all the data, as noted in the documentation:

Note that converting pandas-on-Spark DataFrame to pandas requires to collect all the data into the client machine; therefore, if possible, it is recommended to use pandas API on Spark or PySpark APIs instead.

To learn more about pandas-on-Spark DataFrame, pandas DataFrame and conversion between pyspark Dataframe please see: From/to pandas and PySpark DataFrames.

Sign up to request clarification or add additional context in comments.

Comments

1

There are some workarounds, but they don't help if the dataset is just too big for your master node.

You could use

df.coalesce(1).write.save(..., format='parquet') 
pdf = pd.read_parquet(...) 

Or

pdf = df.coalesce(1).toPandas() 

Coalesce(1) creates one partition, hence less shuffling during the creation of the pandas dataframe and the need of less memory.

Comments

0

One trick that works much better for moving data from pyspark dataframe to pandas dataframe is to avoid the collect via jvm altogether. It is much faster to write to disc or cloud storage and read back with pandas.read_parquet, this will never crash and will minimize memory consumption and time. This is very fast even if you have 100s of million of rows in your data.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.