0

I have a PySpark DataFrame containing a collection of books where each book can have one or more titles. Each title is classed as being either an original title, OT or an alternative title, AT. For simplicity, I'm omitting other title types. My validation needs to ensure that each book has exactly one OT title can have any number of AT titles.

What I'm trying to do is clean up the data so that:

  • If a book has more than one OT title, keep the first and change the rest to AT
  • If a book has no OT titles, change the first AT title to OT
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import collect_list, col, struct

data = ([
  (1, 'Title 1', 'OT'),
  (1, 'Title 2', 'OT'),
  (2, 'Title 3', 'AT'),
  (2, 'Title 4', 'OT'),
  (3, 'Title 5', 'AT'),
])

schema = StructType([ 
    StructField("BookID", IntegerType(), False),
    StructField("Title", StringType(), True), 
    StructField("Type", StringType(), True),
  ])

df = spark.createDataFrame(data, schema)
df = df.groupby('BookID').agg(collect_list(struct(col('Title'), col('Type'))).alias('Titles'))

display(df)

It sounds like it should be easy but I'm at a bit of a loss as to how to do it. Any help would be greatly appreciated.

I have tried using a udf like below but so far, that approach isn't working. I'm getting an error saying a lambda cannot contain assignment.

def process_titles(titles):
  x = list(filter(lambda t: t.Type == 'OT', titles))[1::]
  map(lambda t: t.Type = 'AT', x)
  
  return x

process_titles_udf = udf(lambda x: process_titles(x), titles)

df = df.withColumn('test', process_titles_udf('Titles'))

where the udf returns an object of type:

titles = ArrayType(StructType([ 
    StructField("Title", StringType(), True), 
    StructField("Type", StringType(), True)
  ]))
2
  • what is titles within process_titles_udf = udf(lambda x: process_titles(x), titles) ? Commented Jul 27, 2021 at 16:17
  • Sorry, that's a copy paste error. That would be a ArrayType() of a struct containing the Title and Type. I'll update the question now. Commented Jul 27, 2021 at 16:30

1 Answer 1

1

First thing first, when you say "keep the first", you have to know that collect_list is non-deterministic. So depending on your run, you may have different "first" OT.

If you want to continue with this non-deterministic behavior, here is your UDF :

@udf(titles)
def process_titles(titles):
    OTs = [x for x in titles if x["Type"] == "OT"]  # Collect all OT types
    if OTs:
        OT = OTs[0]  # Keep the first OT as only OT if it exists
    else:
        OT = {
            "Title": titles[0]["Title"],
            "Type": "OT",
        }  # otherwise, use the first AT as OT

    ATs = [
        {"Title": x["Title"], "Type": "AT"} for x in titles if x["Title"] != OT["Title"]
    ]  # Transform all other titles as AT
    return [OT] + ATs


df.select("titles", process_titles(F.col("Titles"))).show(truncate=False)
+------------------------------+------------------------------+                 
|titles                        |process_titles(Titles)        |
+------------------------------+------------------------------+
|[[Title 1, OT], [Title 2, OT]]|[[Title 1, OT], [Title 2, AT]]|
|[[Title 5, AT]]               |[[Title 5, OT]]               |
|[[Title 3, AT], [Title 4, OT]]|[[Title 4, OT], [Title 3, AT]]|
+------------------------------+------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.