3

I work with Data Frame in PySpark I have the following task: check how many "times" values from each column was > 2 for all columns. For u1 it is 0, for u2 => 2 and etc

user    a   b   c   d   times
   u1   1   0   1   0   0
   u2   0   1   4   3   2
   u3   2   1   7   0   1

My solution below. It works, I'm not sure that it is the best way and didn't try on real big data yet. I don't like transform to rdd and back to data frame. Is there anything better? I thouth in the beginning to claculate by UDF per columns, but didn't find a way to accamulte and sum all results per row:

def calculate_times(row):
    times = 0
    for index, item in enumerate(row):
        if not isinstance(item, basestring):
           if item > 2:
             times = times+1

return times    

def add_column(pair):
    return dict(pair[0].asDict().items() + [("is_outlier", pair[1])])   

def calculate_times_for_all(df): 
    rdd_with_times = df.map(lambda row: (calculate_times(row))
    rdd_final = df.rdd.zip(rdd_with_times).map(add_column)

    df_final = sqlContext.createDataFrame(rdd_final)
    return  df_final

for this solution i used this topic How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?

Thanks!

1 Answer 1

6

It is just a simple one-liner. Example data:

df = sc.parallelize([
    ("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
]).toDF(["user", "a", "b", "c", "d"])

withColumn:

df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))

and the result:

+----+---+---+---+---+-----+
|user|  a|  b|  c|  d|times|
+----+---+---+---+---+-----+
|  u1|  1|  0|  1|  0|    0|
|  u2|  0|  1|  4|  3|    2|
|  u3|  2|  1|  7|  0|    1|
+----+---+---+---+---+-----+

Note:

It columns are nullable you should correct for that, for example using coalesce:

from pyspark.sql.functions import coalesce

sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.