1

I am very new to spark programming. I am trying to implement a map and reduceByKey to the following data set with 15 fields.

rdd=sc.parallelize([
("West",  "Apple",  2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0,2.0, 10),
("West",  "Apple",  3.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0])

This is my map function where i am trying to create a tuple with multiple keys and values

rdd1 = rdd.map(lambda x: ((x[0],x[1]),(x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14])))

Next step i am trying to reduceByKey (to implement a sql like aggregate on values in the above tuple)

rdd2 = rdd1.reduceByKey(lambda x,y: (x[1]+','+y[1]))

This reduce function works as expected for the tuple index value 0-4, but when i try the tuple index value 5-14, I get the IndexError.

rdd2 = rdd1.reduceByKey(lambda x,y: (x[10]+','+y[10]))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/python/pyspark/rdd.py", line 1277, in take
res = self.context.runJob(self, takeUpToNumLeft, p, True)
File "/opt/spark/python/pyspark/context.py", line 897, in runJob
allowLocal)
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",   line 538, in __call__
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError15/08/28 01:23:22 WARN TaskSetManager: Lost task 1.0 in stage 78.0 (TID 91, localhost): TaskKilled (killed  intentionally)
: An error occurred while calling  z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:   Task 0 in stage 78.0 failed 1 times, most recent failure: Lost task 0.0 in   stage 78.0 (TID 90, localhost):   org.apache.spark.api.python.PythonException: Traceback (most recent call   last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/spark/python/pyspark/rdd.py", line 316, in func
return f(iterator)
File "/opt/spark/python/pyspark/rdd.py", line 1758, in combineLocally
merger.mergeValues(iterator)
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 268, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
File "<stdin>", line 1, in <lambda>
IndexError: string index out of range

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at      java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

This looks like a very non trivial error. I am not sure if this error is because of hardware of my machine or my implementation of reduce function or something to do with spark.

Any sort of help is appreciated.

1 Answer 1

2
File "<stdin>", line 1, in <lambda>
IndexError: string index out of range

The error occurs in your lambda function. The sequence type (tuple, list, string) that you have doesn't have as many elements as you coded your function to expect.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.