13

I'm having trouble replicating the Spark code from the Pyspark documentation available here.

For example, when I try the following code pertaining to Grouped Map:

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession

spark.stop()

spark = SparkSession.builder.appName("New_App_grouped_map").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

I get the following error log.

main errors:

ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available

I'm using the following versions for relevant packages, could there some compatibility problems:

pyarrow==0.17.1
pandas==1.0.4
numpy==1.18.4

I've downloaded spark in a separate C:\spark\ folder so I'm not sure if I have to move the pyarrow package I installed globally into the spark folder. Is that the issue?

Full Error Log:

>>> df.groupby("id").apply(subtract_mean).show()
[Stage 16:======================================================>(99 + 1) / 100]20/05/
30 16:57:17 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 577, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 837, in read_int

    raise EOFError
EOFError

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonExc
eption(PythonRunner.scala:484)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(Python
ArrowOutput.scala:99)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(Python
ArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonR
unner.scala:437)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:
37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorF
orCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowItera
tor.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeS
tageCodegenExec.scala:726)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPl
an.scala:321)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala
:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala
:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec
utor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available
        at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
        at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
        at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
        at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
        at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR ArrowPythonRunner: This may have been caused by a prior except
ion:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.
<init>(long, int) not available
        at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
        at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
        at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
        at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
        at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR Executor: Exception in task 44.0 in stage 16.0 (TID 159)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.
<init>(long, int) not available
        at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
        at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
        at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
        at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
        at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR TaskSetManager: Task 44 in stage 16.0 failed 1 times; aborting
 job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\spark\python\pyspark\sql\dataframe.py", line 407, in show
    print(self._jdf.showString(n, 20, vertical))
  File "C:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1286, in
 __call__
  File "C:\spark\python\pyspark\sql\utils.py", line 98, in deco
    return f(*a, **kw)
  File "C:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 328, in get_
return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o170.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage
16.0 failed 1 times, most recent failure: Lost task 44.0 in stage 16.0 (TID 159, DESKT
OP-ASG768U, executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe
 or java.nio.DirectByteBuffer.<init>(long, int) not available
        at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
        at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
        at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
        at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
        at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGSche
duler.scala:1989)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.
scala:1977)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGSc
heduler.scala:1976)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGS
cheduler.scala:956)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adap
ted(DAGScheduler.scala:956)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.sc
ala:956)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGSche
duler.scala:2206)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedu
ler.scala:2155)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedu
ler.scala:2144)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:431)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:
47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(
SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecu
tion.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecutio
n.scala:87)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Meth
od)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethod
AccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Delegati
ngMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available
        at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
        at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
        at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
        at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
        at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
        at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)

5 Answers 5

21

Spark 3.0 uses Java 11 by default. There is a known issue about Arrow integration with PySpark, which is being used for pandas UDF. If you do not want to downgrade to Java 8, you can follow the instructions below.

Since you are using PySpark on your local machine, you need to go to

$SPARK_HOME/conf/spark-defaults.conf.template

In your case it would be C:\Spark\conf\spark-defaults.conf.template.

Make a copy of the file (remame it spark-defaults.conf) and add the following at the bottom of the file

spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"

When starting PySpark, go to the spark UI (usually localhost:4040 and look for the "Environment" tab. Under "Spark Properties", you should see the two options listed there.

The pull request in question that fixes the problem is here: https://github.com/apache/spark/pull/26552

Recently, the Spark team added a (very) short sentence about this on the documentation page (https://spark.apache.org/docs/latest/, at the end of the "Downloading" sub-section). You can pass the options mentioned above as a --conf parameter when launching PySpark, but I found that it's easier to have it as a default option.

Sign up to request clarification or add additional context in comments.

7 Comments

Thank you! This is the answer I was hoping for when I had to figure it out myself :)
It still doesn't work after I have done this. And I can't access the SparkUI
Hi @xiaodai! Can you precise your environment? How are you running Spark? The Spark UI should be launched when you start Spark.
I edited spark-defaults.conf as you recommended, and got it verified on Spark UI. But when launch pyspark or spark-shell, I still got the warnings: ``` WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (spark-3.0.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform ``` Is it normal?
Hi @aGuegu! I have the same: since it's a WARNING I am not too worried. I think it's tracked on the Spark JIRA but it's low priority since it's not breaking anything.
|
4

This just happened to me as well.

I was able to fix it by setting JAVA_HOME to the Java 8 JDK that I have installed. For me, on a GCE VM, it was:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/

I am not sure if this worked because it added the JDK or because it moved to Java 8. To start out the VM had a Java 11 JRE, but no JDK.

2 Comments

I am at Mac and with local spark. export java home works for me.
I uninstalled Java 11 JDK and installed Java 8 JDK (it wasn't interpreting the env variable correctly unfortunately) and it worked. So I can confirm it's a problem of Java 11
1

EDIT : spark 3.1.1 do not have anymore this bug

ORIGINAL ANSWER :

The solution of @Chogg DON'T WORK

def _build_spark_session(app_name: str) -> SparkSession:
    conf = SparkConf()
    conf.set("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
    conf.set("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")

    return SparkSession \
        .builder \
        .config(conf=conf) \
        .appName(app_name) \
        .getOrCreate()

Pyspark can't set at runtime theses options

You need to add the setup before running pyspark code :

if spark-defaults.conf do not yet exist :

SPARK_HOME=/usr/local/lib/python3.8/site-packages/pyspark/
mkdir -p $SPARK_HOME/conf
touch $SPARK_HOME/conf/spark-defaults.conf
echo spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> $SPARK_HOME/conf/spark-defaults.conf
echo spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> $SPARK_HOME/conf/spark-defaults.conf

Comments

0

Adding to jonesberg answer, this parameter can be set in the pyspark call itself rather than in an external configuration file as follows:

conf = {"spark.driver.extraJavaOptions":
"-Dio.netty.tryReflectionSetAccessible=true",
        "spark.executor.extraJavaOptions":
"-Dio.netty.tryReflectionSetAccessible=true"
}

SparkSession.builder.config(conf=conf).getOrCreate()

2 Comments

It do not work , pyspark canot edit these config
Define conf as a list of tuple rather than dict and use pyspark.SparkConf().setAll().
0

Set the following if you are kicking off the spark-submit from the shell to fix this issue

export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true"

Refer similar issues

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.