3

My question is relevant to my previous one: transform columns values to columns in pyspark dataframe

I have created a table "my_df" (a dataframe in pyspark):

+----+--------+---------------------------------+
|id  |payment        |shop                      |
+----+--------+---------------------------------+
|dapd|[credit, cash] |[retail, on-line]         |
|wrfr|[cash, debit]  |[supermarket, brand store]|
+----+--------+---------------------------------+

Now, I need to do clustering for the table such that I can find the similarity of the "id"s. I am trying k-means at first. So, I need to transform the categorical values to numerical values by one-hot encoding. I am referring How to handle categorical features with spark-ml?

my code:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

inputs, my_indx_list = [], []
for a_col in my_df.columns: 
  my_indx = StringIndexer(inputCol = a_col, outputCol = a_col + "_index")
  inputs.append(my_indx.getOutputCol())
  my_indx_list.append(my_indx)

  encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=[x + "_vector" for x in inputs])
  a_pipeline = Pipeline(stages = my_indx_list + [encoder])
  pipeline.fit(my_df).transform(my_df).show() # error here !

But, I got error:

A column must be either string type or numeric type, but got ArrayType(StringType,true)

So, how I can solve this?

My idea: sort the list value of each column, and concatenate each string in the list to a long string for each column.

But, for each column, the values are the answers for some survey questions and each answer has the same weight. I am not sure how to work it out ?

thanks

UPDATE

Based on the proposed solution, it works but it is very slow. It took about 3.5 hours on a cluster with 300 GB memory and 32 cores.

my code:

   from pyspark.ml.feature import CountVectorizer
   tmp_df = original_df # 3.5 million rows and 300 columns

   for a_col in original_df.columns: 
        a_vec = CountVectorizer(inputCol = a_col, outputCol = a_col + "_index", binary=True)
        tmp_df = a_vec.fit(tmp_df).transform(tmp_df)

  tmp_df.show()

The "original_df" has 3.5 million rows and 300 columns.

How can I speed up ?

thanks

5
  • How do you get the ouput like that, is it manual? the + - 's Commented Nov 22, 2019 at 0:56
  • Can you add the line of code that is returning the error? Commented Nov 22, 2019 at 0:57
  • Code added, thanks Commented Nov 22, 2019 at 1:04
  • 2
    For two array columns, use CountVectorizer with binary=True: spark.apache.org/docs/2.4.0/api/python/… Commented Nov 22, 2019 at 2:07
  • Please have a look at this: stackoverflow.com/questions/58303468/… Commented Nov 22, 2019 at 3:00

1 Answer 1

3

@jxc suggested the brilliant use of CountVectorizer for one-hot encoding in your case, which is usually used for counting tokens in natural language processing.

Using CountVectorizer saves you troubles in dealing with explode and collect_set with OneHotEncoderEstimator; or worse if you try to implement it using udf.

Given this dataframe,

df = spark.createDataFrame([
                            {'id': 'dapd', 'payment': ['credit', 'cash'], 'shop': ['retail', 'on-line']},
                            {'id': 'wrfr', 'payment': ['cash', 'debit'], 'shop': ['supermarket', 'brand store']}
                           ])
df.show()

+----+--------------+--------------------+
|  id|       payment|                shop|
+----+--------------+--------------------+
|dapd|[credit, cash]|   [retail, on-line]|
|wrfr| [cash, debit]|[supermarket, bra...|
+----+--------------+--------------------+

You can one-hot encode by treating the array of strings as tokens in natural language processing. Note the use of binary=True to force it to return only 0 or 1.

from pyspark.ml.feature import CountVectorizer

payment_cv = CountVectorizer(inputCol="payment", outputCol="paymentEnc", binary=True)
first_res_df = payment_cv.fit(df).transform(df)

shop_cv = CountVectorizer(inputCol="shop", outputCol="shopEnc", binary=True)
final_res_df = shop_cv.fit(first_res_df).transform(first_res_df)

final_res_df.show()

+----+--------------+--------------------+-------------------+-------------------+
|  id|       payment|                shop|         paymentEnc|            shopEnc|
+----+--------------+--------------------+-------------------+-------------------+
|dapd|[credit, cash]|   [retail, on-line]|(3,[0,2],[1.0,1.0])|(4,[0,3],[1.0,1.0])|
|wrfr| [cash, debit]|[supermarket, bra...|(3,[0,1],[1.0,1.0])|(4,[1,2],[1.0,1.0])|
+----+--------------+--------------------+-------------------+-------------------+
Sign up to request clarification or add additional context in comments.

5 Comments

It works but very slow. it took about 3 hours for the dataframe with 3.5 million rows and 300 columns. Is it possible to speed up ? thanks
Assuming you have setup executors properly, the most common reason for a slow Spark computation is due to bad data partitioning. Try to repartition your dataframe and check whether you are properly saturating your executors with tasks in your Spark UI (under default port of 4040).
Could you please let me know how to determine the optimal value of "numPartitions" ?
My current partition is 200.
An accurate answer ! This is just what I was looking for. Thank you @cylim

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.