1

I have a text file which contains some data. The data is as follows

join2_train = sc.textFile('join2_train.csv',4)
join2_train.take(3)   

 [u'21.9059,TA-00002,S-0066,7/7/2013,0,0,Yes,1,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0',
     u'12.3412,TA-00002,S-0066,7/7/2013,0,0,Yes,2,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0',
     u'6.60183,TA-00002,S-0066,7/7/2013,0,0,Yes,5,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0']

Now I am trying to parse this string into a function which splits each of the lines of text and convert into a LabeledPoint. I have also included a line for converting the string elements to float

The function is as follows

from pyspark.mllib.regression import LabeledPoint
import numpy as np

def parsePoint(line):


    """Converts a comma separated unicode string into a `LabeledPoint`.

    Args:
        line (unicode): Comma separated unicode string where the first element is the label and the
            remaining elements are features.

    Returns:
        LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
            features.
    """
    values = line.split(',')
    value1 = [map(float,i) for i in values]
    return LabeledPoint(value1[0],value1[1:]) 

Now when I try to do some actions on this parsed line and I get ValueError. The action which I try to do is as below

parse_train = join2_train.map(parsePoint)

parse_train.take(5)

The error message I get is as below

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-63-f53b10964381> in <module>()
      1 parse_train = join2_train.map(parsePoint)
      2 
----> 3 parse_train.take(5)

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in take(self, num)
   1222 
   1223             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1224             res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1225 
   1226             items += res

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    840         mappedRDD = rdd.mapPartitions(partitionFunc)
    841         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions,
--> 842                                           allowLocal)
    843         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    844 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 31, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1220, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-62-0243c4dd1876>", line 18, in parsePoint
ValueError: could not convert string to float: .

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
2
  • 1
    The thing after the colon in the error (.) tells you that it is trying to convert something (.) that is not something it recognizes as a number. Clean up your data or don't pass bad data into that function. Commented Aug 19, 2015 at 14:45
  • @chicks It's not recognizing the decimal point. If you remove all the data and just leave the very first number in the above sample: '21.9059', there is still an error. Any idea why? Commented Aug 19, 2015 at 15:35

1 Answer 1

1

Add this function to check if string can be converted to float:

def isfloat(string):
    try:
        float(string)
        return True
    except ValueError:
        return False

and then in parsePoint:

value1 = [map(float,i) for i in values if isfloat(i)]

By modifying the float line as follows

value1 = [float(i) for i in values]

and then parsing a string with only numeric values, we can get back the correct LabeledPoints. However the real problem is trying to make LabeledPoint objects from strings which cannot be converted to float like TA-00002 as in the join2_train object

Sign up to request clarification or add additional context in comments.

6 Comments

Interestingly, this doesn't work on the value '21.9059.' For some reason, it doesn't recognize the decimal point. isfloat will return True, but the map will still cause an error.
do you mean when values = '21.9059.' or i = '21.9059' ?
I mean when the csv file join2_train.csv (as in the OP's post) contains only one line with 21.9059 on it.
i.e. when values = [u'21.9059']
I modified the second line of the function ParsePoint as following value1 = [float(i) for i in values] . After this is done and a string with values as given here templp1 = '21.9059,0,0,6.35,0.71,137,8,19.05' is parsed through the function, the correct float objects are getting calculated. However the real problem is when trying to convert strings like TA-00002, which was part of the joint2_train as float.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.