I have a PySpark job that reads data from table a, performs some transformations and filters, and then writes the result to table b.
Here’s a simplified version of the code:
import pyspark.sql.functions as F
spark = ... # initialization
df = spark.table("a").where(F.col("country") == "abc")
df_unique = df.distinct()
users_without_kids = df_unique.where(F.col("kid_count") == 0)
observation = Observation()
observed_df = users_without_kids.observe(observation, F.count(F.lit(1)).alias("row_count"))
observed_df.writeTo("b")
print(observation.get["row_count"])
This works fine — I get the count of records that were written to table b.
However, I’d also like to know:
- How many records there were right after the first filter (
df) - How many records there are after distinct() (
df_unique)
But I’d like to avoid triggering additional actions (e.g., not calling .count() multiple times) — ideally, I’d like to collect all metrics in a single action (the writeTo).
I tried adding multiple observe calls or adding multiple metrics to a single Observation, but it doesn’t seem to work when there’s only one action at the end.
Question:
Is there a way in PySpark to observe multiple DataFrames (or multiple metrics) in one action, so I can capture these counts (df, df_unique, and users_without_kids) without performing extra jobs?