3

I have a PySpark job that reads data from table a, performs some transformations and filters, and then writes the result to table b.

Here’s a simplified version of the code:

import pyspark.sql.functions as F


spark = ...  # initialization

df = spark.table("a").where(F.col("country") == "abc")
df_unique = df.distinct()
users_without_kids = df_unique.where(F.col("kid_count") == 0)

observation = Observation()
observed_df = users_without_kids.observe(observation, F.count(F.lit(1)).alias("row_count"))

observed_df.writeTo("b")
print(observation.get["row_count"])

This works fine — I get the count of records that were written to table b.

However, I’d also like to know:

  1. How many records there were right after the first filter (df)
  2. How many records there are after distinct() (df_unique)

But I’d like to avoid triggering additional actions (e.g., not calling .count() multiple times) — ideally, I’d like to collect all metrics in a single action (the writeTo).

I tried adding multiple observe calls or adding multiple metrics to a single Observation, but it doesn’t seem to work when there’s only one action at the end.

Question: Is there a way in PySpark to observe multiple DataFrames (or multiple metrics) in one action, so I can capture these counts (df, df_unique, and users_without_kids) without performing extra jobs?

2
  • 1
    So you would like to get the results without calculating it? Unfortunately there's no free lunch. Commented Oct 22 at 18:31
  • @Frank right idea, but not for the reason OP might expect :)) Commented Oct 23 at 15:47

1 Answer 1

3

As stated in the online docs,

pyspark.sql.DataFrame.observe

DataFrame.observe(observation, *exprs)[source]

Define (named) metrics to observe on the DataFrame. This method returns an ‘observed’ DataFrame that returns the same result as the input, with the following guarantees ...

Specifically, note the "this method returns" part implying that observe() creates a new dataframe. Which in turn means that in order to have multiple observations applied in a single action, you should use observed "source" dataframes in all transformations leading to the "target" df. Then, an action on the target will cause all observations in a transformation chain to be evaluated. For example:

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.observation import *
>>> df0 = spark.table("sparktable")
>>> o0 = Observation()
>>> dfo0 = df0.observe(o0, count(lit(1)).alias("input_row_count"))
>>> df1 = dfo0.filter("id > 1000")
>>> o1 = Observation()
>>> dfo1 = df1.observe(o1, count(lit(1)).alias("filtered_row_count"))
>>> dfo1.explain()
== Physical Plan ==
CollectMetrics 2e2fa016-f1da-4719-81b9-4799fc84e13c, [count(1) AS filtered_row_count#8L]
+- *(2) Filter (isnotnull(id#0) AND (id#0 > 100))
   +- CollectMetrics e6fe19f0-57de-42c8-b623-fde4141fe160, [count(1) AS input_row_count#5L]
      +- *(1) ColumnarToRow
         +- FileScan parquet spark_catalog.sparkschema.sparktable[id#0,...] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,...>

This may cause some (un?)expected consequences though. For example, in the above snippet a filter that would've been pushed down to the datasource without observe() can no longer be pushed since we request a full count()on the input.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.