8

Does anyone have experience using pandas UDFs on a local pyspark session running on Windows? I've used them on linux with good results, but I've been unsuccessful on my Windows machine.

Environment:

python==3.7
pyarrow==0.15
pyspark==2.3.4
pandas==0.24

java version "1.8.0_74"

Sample script:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())


out_df = df.groupby("id").apply(subtract_mean).toPandas()
print(out_df.head())

# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

After running for a loooong time (splits the toPandas stage into 200 tasks each taking over a second) it returns an error like this:

Traceback (most recent call last):
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 1953, in toPandas
    tables = self._collectAsArrow()
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 2004, in _collectAsArrow
    sock_info = self._jdf.collectAsArrowToPython()
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o62.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 69 in stage 3.0 failed 1 times, most recent failure: Lost task 69.0 in stage 3.0 (TID 201, localhost, executor driver): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(Unknown Source)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNextMessage(MessageChannelReader.java:64)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema(MessageSerializer.java:104)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:128)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:161)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.hasNext(ArrowConverters.scala:96)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.foreach(ArrowConverters.scala:94)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.to(ArrowConverters.scala:94)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.toBuffer(ArrowConverters.scala:94)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.toArray(ArrowConverters.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
0

2 Answers 2

9

Your java.lang.IllegalArgumentException in pandas_udf has to do with pyarrow version, not with OS environment. See this issue for details.

You have two routs of action:

  1. Downgrade pyarrow to v.0.14, or
  2. Add environment variable ARROW_PRE_0_15_IPC_FORMAT=1 to SPARK_HOME/conf/spark-env.sh
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks @Sergey! I downgraded to pyarrow v0.14 and it ran the example producing the expected output; however, it took 5 minutes after initializing spark. Any thoughts on why it could be executing so slowly or where I could look to debug the performance?
You may have a look at localhost:4040 and see what stage is taking so long time.
On my Linux machine it took 7 secs for 2 stages with 202 tasks. Not surprisingly, it took longest to move data between executors.
Running on Windows I get 2 stages 201 tasks, with the second stage taking median duration of 2s/task.
I still don't understand why it takes 5 minutes on Windows vs 7 seconds on Linux. All additional metrics show very low time (1 ms) and the event timeline shows virtually all "Executor Computing Time". Seems like it's just taking a long time to map that function across the default of 200 partitions(?)
|
5

Addendum to the answer of Sergey: if you prefer to build your own sparkSession in python and not change your config files, you'll need to set both spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT and the environment variable of the local executor spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT

spark_session = SparkSession.builder \
            .master("yarn") \
            .config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
            .config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1)

spark = spark_session.getOrCreate()

Hope this helps!

1 Comment

I'm running my pyspark jobs on a cluster on Google DataProc and this is REALLY useful for me. Thank you so much!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.