2

i'm working with a pyspark dataframe which is:

+----+----+---+---+---+----+
|   a|   b|  c|  d|  e|   f|
+----+----+---+---+---+----+
|   2|12.3|  5|5.6|  6|44.7|
|null|null|  9|9.3| 19|23.5|
|   8| 4.3|  7|0.5| 21| 8.2|
|   9| 3.8|  3|6.5| 45| 4.9|
|   3| 8.7|  2|2.8| 32| 2.9|
+----+----+---+---+---+----+

To create the above dataframe:

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
                (None,None,9,9.3,19,23.5), 
                (8,4.3,7,0.5,21,8.2),
                 (9,3.8,3,6.5,45,4.9),
                  (3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()

I want to create another column 'g' whose values are list of tuples based on existing non null columns. The list of tuples are of form :

((column a, column b),(column c, column d),(column e, column f))

Requirements for output col: 1) Only consider the non null columns while creating the list of tuples. 2) Return the list of tuples.

So the final dataframe with column 'g' would be:

+---+----+---+---+---+----+--------------------------+
|  a|   b|  c|  d|  e|   f|                   g      |
+---+----+---+---+---+----+--------------------------+
|  2|12.3|  5|5.6|  6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul|  9 |9.3| 19|23.5|[[9,9.3],[19,23.5]        |
|  8| 4.3|  7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
|  9| 3.8|  3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
|  3| 8.7|  2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+

In column "g", the second row tuple has only two pairs as opposed to three, because for second row, we omit column 'a' and 'b' values since they are nulls.

I'm not sure how to dynamically omit the null values in columns and form the tuple list

I tried to partially achieve the final column by a udf:

l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
    l=[]
    for i in range(len(l1)):
        l.append((r1[i],r2[i]))
    return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()

I tried declaring the udf as ArrayType, it did not work. Any help would be much appreciated. I'm working with pyspark 1.6. Thank you!

1
  • you did not find any of the answers useful? Commented Feb 21, 2018 at 9:09

3 Answers 3

0

I think UDFs should work just fine.

import pyspark.sql.functions as F
from pyspark.sql.types import *

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
            (None,None,9,9.3,19,23.5), 
            (8,4.3,7,0.5,21,8.2),
             (9,3.8,3,6.5,45,4.9),
              (3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))

def combine(a,b,c,d,e,f):

    combine_ = []
    if None not in [a,b]:
        combine_.append([a,b])
    if None not in [c,d]:
        combine_.append([c,d])
    if None not in [e,f]:
        combine_.append([e,f])
    return combine_

combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
               F.col('d'),F.col('e'),F.col('f')))
df.show()
Sign up to request clarification or add additional context in comments.

3 Comments

ArrayType declaration as float or double would cause the first element of the tuple to be null since columns a, c and e are of integertype. If i end up declaring it as StringType, then I won't be to access the elements individually via getItem since str object would not have this attribute. Is there a way to achieve this without the WrappedArray object. I also want to avoid rdd operations and just stick with dataframe operations.
You can cast all columns to float as i did above (df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))) if that is convenient? You can treat each wrapped array as individual list and need not worry about it.
yep casting to another datatype would work for this purpose. Thanks!
0

You can try something like this:

df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(), 
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h", 
when(col("c").isNotNull() & col("d").isNotNull(), 
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i", 
when(col("e").isNotNull() & col("f").isNotNull(), 
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat", 
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)

Resulting df:

+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a   |b   |c  |d  |e  |f   |concat                                                                    
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0, 
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3), 
WrappedArray(19.0, 23.5)]        |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5), 
WrappedArray(21.0, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5), 
WrappedArray(45.0, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8), 
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+

Comments

0

Another solution using udf,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a   |b   |c  |d  |e  |f   |g                                                                   |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)]                      |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.