4

I have objects I want to do calculations on in parallel, thus I thought I can resort to pyspark.

Consider this example, a class whose objects do have a number i, that can be squared wiht square():

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2


print(MyMathObject(3).square()) # Test one instance with regular python - works

Aditionally, I set up pyspark (in a jupyter notebook), and now I want to calculate the squares from 0 to 4 in parallel on my objects:

import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext("local[2]")

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails

This does not work - it results in a very long and for me mostly unhelpful error message. The only line I find somewhat interesting is:

AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

So, it seems to be something related with how the attribute square() is called, or so. I copy the full message at the end.

Pyspark itself seems to work; for example executing a the following on a plain python list returns the squared numbers as expected.

rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()

Thus, there seems to be something flawed with the way I create or operate on my objects, but I can not track down the mistake.

The complete error message:

Py4JJavaError Traceback (most recent call last) in 1 rdd = sc.parallelize([MyMathObject(i) for i in range(5)]) ----> 2 rdd.map(lambda obj: obj.square()).collect()

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self) 887 """ 888 with SCCallSiteSync(self.context) as css: --> 889 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 890 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 891

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, 192.168.2.108, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_stream yield self._read_with_length(stream) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads return pickle.loads(obj, encoding=encoding) AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.collect(RDD.scala:1003) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_stream yield self._read_with_length(stream) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads return pickle.loads(obj, encoding=encoding) AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more

1 Answer 1

4

It doesn't look like your mistake, but a way around this is to put the module in a separate Python file and import it:

e.g. Open a file mymodule.py with

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2

And in your main script, you can do

from mymodule import MyMathObject

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect()

which should give [0, 1, 4, 9, 16].

Sign up to request clarification or add additional context in comments.

1 Comment

Okay - outsourcing the class to its own file/to a module works. If it's not a mistake on my end, maybe it's worth submitting a git issue, or so...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.