2

I have a DataFrame with the following schema :

root
 |-- user_id: string (nullable = true)
 |-- user_loans_arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- loan_date: string (nullable = true)
 |    |    |-- loan_amount: string (nullable = true)
 |-- new_loan: struct (nullable = true)
 |    |-- loan_date : string (nullable = true)
 |    |-- loan_amount : string (nullable = true)

I want to use a UDF, which takes user_loans_arr and new_loan as inputs and add the new_loan struct to the existing user_loans_arr. Then, from user_loans_arr delete all the elements whose loan_date is older than 12 months.

Thanks in advance.

2 Answers 2

4

if spark >= 2.4 then you don't need UDF, check the example below-

Load the input data

 val df = spark.sql(
      """
        |select user_id, user_loans_arr, new_loan
        |from values
        | ('u1', array(named_struct('loan_date', '2019-01-01', 'loan_amount', 100)), named_struct('loan_date',
        | '2020-01-01', 'loan_amount', 100)),
        | ('u2', array(named_struct('loan_date', '2020-01-01', 'loan_amount', 200)), named_struct('loan_date',
        | '2020-01-01', 'loan_amount', 100))
        | T(user_id, user_loans_arr, new_loan)
      """.stripMargin)
    df.show(false)
    df.printSchema()

    /**
      * +-------+-------------------+-----------------+
      * |user_id|user_loans_arr     |new_loan         |
      * +-------+-------------------+-----------------+
      * |u1     |[[2019-01-01, 100]]|[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200]]|[2020-01-01, 100]|
      * +-------+-------------------+-----------------+
      *
      * root
      * |-- user_id: string (nullable = false)
      * |-- user_loans_arr: array (nullable = false)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- loan_date: string (nullable = false)
      * |    |    |-- loan_amount: integer (nullable = false)
      * |-- new_loan: struct (nullable = false)
      * |    |-- loan_date: string (nullable = false)
      * |    |-- loan_amount: integer (nullable = false)
      */

Process as per below requirement

user_loans_arr and new_loan as inputs and add the new_loan struct to the existing user_loans_arr. Then, from user_loans_arr delete all the elements whose loan_date is older than 12 months.

spark >= 2.4

    df.withColumn("user_loans_arr",
      expr(
        """
          |FILTER(array_union(user_loans_arr, array(new_loan)),
          | x -> months_between(current_date(), to_date(x.loan_date)) < 12)
        """.stripMargin))
      .show(false)

    /**
      * +-------+--------------------------------------+-----------------+
      * |user_id|user_loans_arr                        |new_loan         |
      * +-------+--------------------------------------+-----------------+
      * |u1     |[[2020-01-01, 100]]                   |[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
      * +-------+--------------------------------------+-----------------+
      */

spark < 2.4

 // spark < 2.4
    val outputSchema = df.schema("user_loans_arr").dataType

    import java.time._
    val add_and_filter = udf((userLoansArr: mutable.WrappedArray[Row], loan: Row) => {
      (userLoansArr :+ loan).filter(row => {
        val loanDate = LocalDate.parse(row.getAs[String]("loan_date"))
        val period = Period.between(loanDate, LocalDate.now())
        period.getYears * 12 + period.getMonths < 12
      })
    }, outputSchema)

    df.withColumn("user_loans_arr", add_and_filter($"user_loans_arr", $"new_loan"))
      .show(false)

    /**
      * +-------+--------------------------------------+-----------------+
      * |user_id|user_loans_arr                        |new_loan         |
      * +-------+--------------------------------------+-----------------+
      * |u1     |[[2020-01-01, 100]]                   |[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
      * +-------+--------------------------------------+-----------------+
      */
Sign up to request clarification or add additional context in comments.

5 Comments

Hi Someshwar Kale, Thanks for the answer. It really helped me a lot. Can you please help me with the below condition as well. For any user, if the user_loans_arr is null and that user got a new_loan, I need to create a new user_loans_arr Array and add the new_loan to it. As of now, I'm getting the value of user_loans_arr for that user as null.
can you please ask that que. again with sample data? by the way it should be on similar line give a try
@Someshwar kale - i don't know scala much. But in your example it looks like you have created a column containing array of structs. Is this right? Pyspark is not allowing me to do this. Is this some limitation?
@Raghu, Can you quote the statement here? it you are talking about the first statement of creating input then it should work in pyspark as well since it is spark sql
@SomeshwarKale - yes, creating the data part. I tried this in pyspark. tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,4),(5,6,7,5),(7,8,9,5)],schema=['col1','col2','col3','col4']) tst_s = tst.withColumn("test",F.struct('col1','col2')).withColumn("test2",F.struct('col2','col3')) tst_ar=tst_s.withColumn("arr",F.array('test','test2')) Then I get this error : u"cannot resolve 'array(test, test2)' due to data type mismatch: input to function array should all be the same type, but it's [struct<col1:bigint,col2:bigint>, struct<col2:bigint,col3:bigint>]
-1

You need to pass you array and structure column to the udf as an array or struct. I prefer passing it as struct. There you can manipulate the elements and return an array type.

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
tst_1=(tst.withColumn("arr",F.array('col1','col2'))).withColumn("str",F.struct('col3','col4'))
# udf to return array
@udf(ArrayType(StringType()))
def fn(row):
    if(row.arr[1]>row.str.col4):
        res=[]
    else:
        res.append(row.str[i])        
        res = row.arr+row.str.asDict().values()        
    return(res)
# calling udf with a struct of array and struct column
tst_fin = tst_1.withColumn("res",fn(F.struct('arr','str')))

The result is

tst_fin.show()
+----+----+----+----+------+------+------------+
|col1|col2|col3|col4|   arr|   str|         res|
+----+----+----+----+------+------+------------+
|   1|   2|   3|   4|[1, 2]|[3, 4]|[1, 2, 4, 3]|
|   3|   4|   5|   1|[3, 4]|[5, 1]|          []|
|   5|   6|   7|   8|[5, 6]|[7, 8]|[5, 6, 8, 7]|
|   7|   8|   9|   2|[7, 8]|[9, 2]|          []|
+----+----+----+----+------+------+----------

This example takes everything as int. Since you have strings as date , inside you udf you have to use datetime functions of python for the comparison.

1 Comment

Hi Raghu, Thank you for your Answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.