1

I have a dataframe as follows. The following is for just 1 patient and 1 particular test.It can have multiple other tests with similar look.

ptid,blast_date,test_name,result_date,test_result,date_diff
PT381201021,2019-08-22,Albumin,2019-08-14,4.3,8
PT381201021,2019-05-17,Albumin,NA,NA,0
PT381201021,2019-05-18,Albumin,NA,NA,0
PT381201021,2019-05-21,Albumin,NA,NA,0
PT381201021,2019-05-23,Albumin,NA,NA,0
PT381201021,2019-05-16,Albumin,NA,NA,0
PT381201021,2019-05-19,Albumin,NA,NA,0
PT381201021,2019-05-22,Albumin,NA,NA,0
PT381201021,2019-05-20,Albumin,NA,NA,0

I want the result_date, test_result for "Albumin" in this case to be populated from a previous blast_date if it is under certain threshold lets assume 3 months in this case. So I want the following row to be populated as follows:

PT381201021,2019-05-23,Albumin,2019-08-14,4.3,0

You can leave the date_diff colm as it is.

So the final dataframe expected as follows:-

ptid,blast_date,test_name,result_date,test_result,date_diff
PT381201021,2019-08-22,Albumin,2019-08-14,4.3,8
PT381201021,2019-05-17,Albumin,NA,NA,0
PT381201021,2019-05-18,Albumin,NA,NA,0
PT381201021,2019-05-21,Albumin,NA,NA,0
PT381201021,2019-05-23,Albumin,2019-08-14,4.3,0
PT381201021,2019-05-16,Albumin,NA,NA,0
PT381201021,2019-05-19,Albumin,NA,NA,0
PT381201021,2019-05-22,Albumin,NA,NA,0
PT381201021,2019-05-20,Albumin,NA,NA,0

I tried to use the lag function but have some difficulties in that. Looking for a pyspark way to solve this.

1
  • in your example, what if blast_date=2019-05-23 does not exist? Commented Apr 28, 2020 at 11:40

2 Answers 2

2

You should use window functions, with rangeBetween on seconds.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("ptid","test_name").orderBy(F.to_timestamp("blast_date","yyyy-MM-dd").cast("long")).rangeBetween(Window.currentRow,86400*91)

df.withColumn("collect", F.collect_list(F.array("result_date","test_result")).over(w))\
  .withColumn("collect", F.expr("""filter(collect,x-> array_contains(x,'NA')!=True)""")[0])\
  .withColumn("result_date", F.when((F.col("result_date")=='NA')&(F.col("collect").isNotNull()),F.col("collect")[0]).otherwise(F.col("result_date")))\
  .withColumn("test_result", F.when((F.col("test_result")=='NA')&(F.col("collect").isNotNull()),F.col("collect")[1]).otherwise(F.col("test_result"))).drop("timestamp","collect").show(truncate=False)

+-----------+----------+---------+-----------+-----------+---------+
|ptid       |blast_date|test_name|result_date|test_result|date_diff|
+-----------+----------+---------+-----------+-----------+---------+
|PT381201021|2019-05-16|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-17|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-18|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-19|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-20|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-21|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-22|Albumin  |NA         |NA         |0        |
|PT381201021|2019-05-23|Albumin  |2019-08-14 |4.3        |0        |
|PT381201021|2019-08-22|Albumin  |2019-08-14 |4.3        |8        |
+-----------+----------+---------+-----------+-----------+---------+
Sign up to request clarification or add additional context in comments.

Comments

0

Hope this approach will help, though not very optimized and with the flow of execution, it can be further optimized.

df = spark.read.csv("/Users/61471871.csv", header=True, inferSchema=True)
df2 = df.withColumn("start_date",     F.to_date(df.blast_date)).withColumn("end_date",     F.add_months(F.to_date(df.blast_date),3)).sort(df.start_date.desc())
df_right = df2.sort(df.blast_date.desc())
df3.createOrReplaceTempView("tbl")
spark.sql("select * from tbl").show()
'''
|       ptid|         blast_date|test_name|result_date|test_result|date_diff|
+-----------+-------------------+---------+-----------+-----------+---------+
|PT381201021|2019-08-22 00:00:00|  Albumin| 2019-08-14|        4.3|        8|
|PT381201021|2019-05-23 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-22 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-21 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-20 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-19 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-18 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-17 00:00:00|  Albumin|         NA|         NA|        0|
|PT381201021|2019-05-16 00:00:00|  Albumin|         NA|         NA|        0|
+-----------+-------------------+---------+-----------+-----------+---------+
'''
df2 = df.sort(df.blast_date.desc).withColumn("90_days_back" ,F.add_months(to_date(df.blast_date), 3)).show()
df2 = df.select(F.add_months(df.blast_date, 3).alias('third_month'))


df_left = spark.sql("select ptid, max(start_date) as range_dt from tbl group by ptid ")
df_one = df_right.crossJoin(df_left)

df_right.join(df_left, df_left.ptid = df_right.ptid).show()
df_two = df_one.withColumn("date_diff", F.datediff(df_one.start_date,     df_one.range_dt))
'''
+-----------+-------------------+---------+-----------+-----------+---------+----------+----------+-----------+----------+
|       ptid|         blast_date|test_name|result_date|test_result|date_diff|start_date|  end_date|       ptid|  range_dt|
+-----------+-------------------+---------+-----------+-----------+---------+----------+----------+-----------+----------+
|PT381201021|2019-08-22 00:00:00|  Albumin| 2019-08-14|        4.3|        0|2019-08-22|2019-11-22|PT381201021|2019-08-22|
|PT381201021|2019-05-23 00:00:00|  Albumin|         NA|         NA|      -91|2019-05-23|2019-08-23|PT381201021|2019-08-22|
|PT381201021|2019-05-22 00:00:00|  Albumin|         NA|         NA|      -92|2019-05-22|2019-08-22|PT381201021|2019-08-22|
|PT381201021|2019-05-21 00:00:00|  Albumin|         NA|         NA|      -93|2019-05-21|2019-08-21|PT381201021|2019-08-22|
|PT381201021|2019-05-20 00:00:00|  Albumin|         NA|         NA|      -94|2019-05-20|2019-08-20|PT381201021|2019-08-22|
|PT381201021|2019-05-19 00:00:00|  Albumin|         NA|         NA|      -95|2019-05-19|2019-08-19|PT381201021|2019-08-22|
|PT381201021|2019-05-18 00:00:00|  Albumin|         NA|         NA|      -96|2019-05-18|2019-08-18|PT381201021|2019-08-22|
|PT381201021|2019-05-17 00:00:00|  Albumin|         NA|         NA|      -97|2019-05-17|2019-08-17|PT381201021|2019-08-22|
|PT381201021|2019-05-16 00:00:00|  Albumin|         NA|         NA|      -98|2019-05-16|2019-08-16|PT381201021|2019-08-22|
+-----------+-------------------+---------+-----------+-----------+---------+----------+----------+-----------+----------+
'''

Now you have the date difference flag, you can apply a filter and then do a join to get the expected result.

The code can be further optimized to run on large data set.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.