5

I am applying many transformations on a Spark DataFrame (filter, groupBy, join). I want to have the number of rows in the DataFrame after each transformation.

I am currently counting the number of rows using the function count() after each transformation, but this triggers an action each time which is not really optimized.

I was wondering if there is any way of knowing the number of rows without having to trigger another action than the original job.

1
  • sample dataset and code snippet will be appreciated. :-) Commented May 17, 2019 at 15:40

3 Answers 3

5

You could use an accumulator for each stage and increment the accumulator in a map after each stage. Then at the end after you do your action you would have a count for all the stages.

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

myDataFrame
    .filter(col("x") === lit(3))
    .map(x => {
      filterCounter.add(1)
      x
    })        .groupBy(col("x"))
    .agg(max("y"))
    .map(x => {
      groupByCounter.add(1)
      x
    })
    .join(myOtherDataframe, col("x") === col("y"))
    .map(x => {
      joinCounter.add(1)
      x
    })
    .count()

print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")
Sign up to request clarification or add additional context in comments.

1 Comment

Is it possible to dynamically create a variable number of accumulators, and maybe have all the accumulators stored in a Map[(String, Long)] ? Because I dynamically add transformation to my DataFrame
2

Each operator in itself has couple of metrics. These metrics are visible in the spark UI,'s SQL tab.

If SQL is not used, we could introspect the query execution object of the data frame after execution, to access the metrics (internally accumulators).

Example: df.queryExecution.executedPlan.metrics will give the metrics of the top most node in DAG.

4 Comments

Using df.queryExecution.executedPlan.metrics I am always getting the result Map(numOutputRows -> SQLMetric(id: 891, name: Some(number of output rows), value: 0)) is that normal ?
Ypou might be doing one fo the following 1. Inspecting before query has executed .2. Or if your query actually returns zero rows. You can navigate throught the executedPlan(its is a tree) to find the metrics on each node. simple code df.queryExecution.executedPlan.foreach(println(_.metrics)) The above has to be done after the query is executed.
I am doing it after a write action, but my dataframe has more than zero rows.
` df.queryExecution.executedPlan.foreach(x => println(x))` will help in finding the node from which metrics loss starts happening. If u have access to Spark UI, we can the SQL tab after execution, which also displays these metrics.
0

Coming back to this question after a bit more experience on Apache Spark to complement randal's answer.

You can also use an UDF to increment a counter.

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

def countUdf(acc: LongAccumulator): UserDefinedFunction = udf { (x: Int) =>
  acc.add(1)
  x
}

myDataFrame
  .filter(col("x") === lit(3))
  .withColumn("x", countUdf(filterCounter)(col("x")))
  .groupBy(col("x"))
  .agg(max("y"))
  .withColumn("x", countUdf(groupByCounter)(col("x")))
  .join(myOtherDataframe, col("x") === col("y"))
  .withColumn("x", countUdf(joinCounter)(col("x")))
  .count()

print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")

This should be more efficient because spark will only have to deserialize the column used in the UDF, but has to be carefully used as catalyst can more easily reorder the operations (like pushing a filter before the call to the udf)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.