1

I have 20TB of data. I tried to convert spark dataframe to spark matrix as follows (Solution used found here): My dataframe looks like this:

+-------+---------------+--------------------+
|goodsID|customer_group|customer_phone_number|
+-------+---------------+--------------------+
|    123|          XXXXX|            XXXXXXXX|
|    432|          YYYYY|            XXXXXXXX|
+-------+---------------+--------------------+

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

mat = IndexedRowMatrix(mydataframe.map(lambda row: IndexedRow(*row)))
mat.numRows()
mat.numCols()

but it gives me following error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft
    yield next(iterator)
  File "<stdin>", line 1, in <lambda>
TypeError: __init__() takes exactly 3 arguments (4 given)

So my question is

  1. How can I achieve this in spark?
  2. Also how can I convert my dataframe to numpy array?
  3. Is using pandas with spark is really bad?

1 Answer 1

2
  • Types of the input data are probably wrong. Vector values have to be Double (Python float).

  • You don't use IndexedRow the right way. It takes two arguments - index, and vector. If we assume that data is of correct type

    mat = IndexedRowMatrix(mydataframe.map(
     lambda row: IndexedRow(row[0], Vectors.dense(row[1:]))))
    
  • Is Pandas bad? For 20TB of data? Not the best choice but there exist distributed Python libraries with similar API.

Sign up to request clarification or add additional context in comments.

1 Comment

I am still working on the solution you gave, also there are lot of nested for loops(5 level) inside which a lot of matrix computation is happening, how much performance impact will be there because of these nested loops and do I need to re-write the code using spark foreach, map or flatmap to improve performance as currently to process the whole dataset it's taking 20 days.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.