0

Trying to send list of column one by one in UDF using for loop but getting error i.e Data frame not find col_name. currently in list list_col we have two column but it can be change .So i want to write a code which work for every list of column.In this code i am concatenating one row of column at a time and row value is in struct format i.e list inside a list . For every null i have to give space .

    list_col=['pcxreport','crosslinediscount']
    def struct_generater12(row):
    list3 = []
    main_str = ''
    if(row is None):
        list3.append(' ')
    else:
        for i in row:
            temp = ''
            if(i is None):
                temp+= ' '
            else:
                for j in i:
                    if (j is None):
                        temp+= ' '
                    else:
                        temp+= str(j)
            list3.append(temp)
    for k in list3:
        main_str +=k
    return main_str


    A = udf(struct_generater12,returnType=StringType())
    # z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport))
    for i in range(0,len(list_col)-1):
        struct_col='Concate_'
        struct_col+=list_col[i]
        col_name=list_col[i]
        z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name))
        struct_col=''

    z.show()

1 Answer 1

1

addlinterestdetail_FDF1.col_name implies the column is named "col_name", you're not accessing the string contained in variable col_name.

When calling a UDF on a column, you can

  • use its string name directly: A(col_name)
  • or use pyspark sql function col:

    import pyspark.sql.functions as psf
    z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name)))
    

You should consider using pyspark sql functions for concatenation instead of writing a UDF. First let's create a sample dataframe with nested structures:

import json
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}}
jsonRDD = sc.parallelize([json.dumps(j)])
df = spark.read.json(jsonRDD)
df.printSchema()
df.show()

    root
     |-- crosslinediscount: struct (nullable = true)
     |    |-- c: string (nullable = true)
     |    |-- d: string (nullable = true)
     |    |-- e: string (nullable = true)
     |-- pcxreport: struct (nullable = true)
     |    |-- a: string (nullable = true)
     |    |-- b: string (nullable = true)

    +-----------------+---------+
    |crosslinediscount|pcxreport|
    +-----------------+---------+
    |       [c,null,e]|    [a,b]|
    +-----------------+---------+

We'll write a dictionary with nested column names:

list_col=['pcxreport','crosslinediscount']
list_subcols = dict()
for c in list_col:
    list_subcols[c] = df.select(c+'.*').columns

Now we can "flatten" the StructType, replace None with ' ', and concatenate:

import itertools
import pyspark.sql.functions as psf
df.select([c + '.*' for c in list_col])\
    .na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))})\
    .select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()])\
    .show()

    +---------+-----------------+
    |pcxreport|crosslinediscount|
    +---------+-----------------+
    |       ab|              c e|
    +---------+-----------------+
Sign up to request clarification or add additional context in comments.

1 Comment

in a list i have many data frame how should i merge all data frame in one data frame. length of list is not fixed...................thanks is advance

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.