4

I have a python dict as:

fileClass = {'a1' : ['a','b','c','d'], 'b1':['a','e','d'], 'c1': ['a','c','d','f','g']}

and a list of tuples as:

C = [('a','b'), ('c','d'),('e')]

I want to finally create a spark dataframe as:

Name (a,b) (c,d) (e)
a1     2     2    0
b1     1     1    1
c1     1     2    0

which simply contains the counts for the element in each tuples that appear in each item in dict A to do this I create a dict to mapping each element to col index

classLoc = {'a':0,'b':0,'c':1,'d':1,'e':2}

then I use udf to define

import numpy as np
def convertDictToDF(v, classLoc, length) :

    R = np.zeros((1,length))
    for c in v:
        try:
            loc = classLoc[c]
            R[loc] += 1
        except:
            pass 
    return(R)
udfConvertDictToDF = udf(convertDictToDF, ArrayType(IntegerType())) 

df = sc.parallelize([
    [k] + list(udfConvertDictToDF(v, classLoc, len(C)))
    for k, v in fileClass.items()]).toDF(['Name']+ C)

then I got error msg as

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-40-ab668a12838a> in <module>()
      1 df = sc.parallelize([
      2     [k] + list(udfConvertDictToDF(v,classLoc, len(C)))
----> 3     for k, v in fileClass.items()]).toDF(['Name'] + C)
      4 
      5 df.show()

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/functions.pyc in __call__(self, *cols)
   1582     def __call__(self, *cols):
   1583         sc = SparkContext._active_spark_context
-> 1584         jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
   1585         return Column(jc)
   1586 

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _to_seq(sc, cols, converter)
     58     """
     59     if converter:
---> 60         cols = [converter(c) for c in cols]
     61     return sc._jvm.PythonUtils.toSeq(cols)
     62 

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _to_java_column(col)
     46         jcol = col._jc
     47     else:
---> 48         jcol = _create_column_from_name(col)
     49     return jcol
     50 

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _create_column_from_name(name)
     39 def _create_column_from_name(name):
     40     sc = SparkContext._active_spark_context
---> 41     return sc._jvm.functions.col(name)
     42 
     43 

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JError(
    311                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 312                     format(target_id, ".", name, value))
    313         else:
    314             raise Py4JError(

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:360)
    at py4j.Gateway.invoke(Gateway.java:254)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

I don't understand what is wrong with my UDF leads to that error msg. Please help

1 Answer 1

3

I think it has to do with the way you are using this line

[k] + list(udfConvertDictToDF(v, classLoc, len(C)))

at the bottom.

when I do a simple python version of it I get an error as well.

import numpy as np

C = [('a','b'), ('c','d'),('e')]

classLoc = {'a':0,'b':0,'c':1,'d':1,'e':2}

import numpy as np
def convertDictToDF(v, classLoc, length) :

    # I also got rid of (1,length) for (length)
    # b/c pandas .from_dict() method handles this for me
    R = np.zeros(length)  
    for c in v:
        try:
            loc = classLoc[c]
            R[loc] += 1
        except:
            pass 
    return(R)


[[k] + convertDictToDF(v, classLoc, len(C))
    for k, v in fileClass.items()]

which produces these errors

TypeError: ufunc 'add' did not contain a loop with signature matching types dtype('S32') dtype('S32') dtype('S32')

If you were to change the list comprehension to a dict comprehension, you could get it to work.

dict = {k:convertDictToDF(v, classLoc, len(C))
    for k, v in fileClass.items()}

the output of which looks like this

> {'a1': array([ 2.,  2.,  0.]), 'c1': array([ 1.,  2.,  0.]), 'b1': array([ 1.,  1.,  1.])}

Without knowing what you're end use case is, I'm going to get you to the output you requested, but using a slightly different way, which may not scale how you'd like, so I'm sure there's a better way.

The following code will get you the rest of the way to a dataframe,

import pandas as pd
df = pd.DataFrame.from_dict(data=dict,orient='index').sort_index() 
df.columns=C

which produces your desired output

    (a, b)  (c, d)    e
a1     2.0     2.0  0.0
b1     1.0     1.0  1.0
c1     1.0     2.0  0.0

And this will get you a Spark dataframe

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df_s = sqlContext.createDataFrame(df)
df_s.show()

+----------+----------+---+
|('a', 'b')|('c', 'd')|  e|
+----------+----------+---+
|       2.0|       2.0|0.0|
|       1.0|       1.0|1.0|
|       1.0|       2.0|0.0|
+----------+----------+---+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.