0

I am using Scala and Spark to create a dataframe. Here's my code so far:

 val df = transformedFlattenDF
.groupBy($"market", $"city", $"carrier").agg(count("*").alias("count"), min($"bandwidth").alias("bandwidth"), first($"network").alias("network"), concat_ws(",", collect_list($"carrierCode")).alias("carrierCode")).withColumn("carrierCode", split(($"carrierCode"), ",").cast("array<string>")).withColumn("Carrier Count", collect_set("carrierCode"))

The column carrierCode becomes an array column. The data is present as follows:

CarrierCode
1: [12,2,12]
2: [5,2,8]
3: [1,1,3]

I'd like to create a column that counts the number of distinct values in each array. I tried doing collect_set, however, it gives me an error saying grouping expressions sequence is empty Is it possible to find the number of distinct values in each row's array? So that way in our same example, there could be a column like so:

Carrier Count
1: 2
2: 3
3: 2
1
  • Could you please provide schema and sample data for transformedFlattenDF? Commented Jul 7, 2018 at 4:45

3 Answers 3

2

collect_set is for aggregation hence should be applied within your groupBy-agg step:

val df = transformedFlattenDF.groupBy($"market", $"city", $"carrier").agg(
    count("*").alias("count"), min($"bandwidth").alias("bandwidth"),
    first($"network").alias("network"),
    concat_ws(",", collect_list($"carrierCode")).alias("carrierCode"),
    size(collect_set($"carrierCode")).as("carrier_count")  // <-- ADDED `collect_set`
  ).
  withColumn("carrierCode", split(($"carrierCode"), ",").cast("array<string>"))

If you don't want to change the existing groupBy-agg code, you can create a UDF like in the following example:

import org.apache.spark.sql.functions._

val codeDF = Seq(
  Array("12", "2", "12"),
  Array("5", "2", "8"),
  Array("1", "1", "3")
).toDF("carrier_code")

def distinctElemCount = udf( (a: Seq[String]) => a.toSet.size )

codeDF.withColumn("carrier_count", distinctElemCount($"carrier_code")).
  show
// +------------+-------------+
// |carrier_code|carrier_count|
// +------------+-------------+
// | [12, 2, 12]|            2|
// |   [5, 2, 8]|            3|
// |   [1, 1, 3]|            2|
// +------------+-------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Hi thank you for your solution! Your second example works, but I'm afraid the first one doesn't work because originally this data is an array of arrays. I make it into a string and then concatenate it with a "," and then concatenate again with "," (I didn't show the first concatenation because I wanted to make a simple question). Knowing this information, is it possible to create something like your first solution?
Not sure I understand your described data structure. I would suggest that you assemble a separate question with sample data that better reflects your actual data structure, along with specific aggregation requirement.
0

Without UDF and using RDD conversion and back to DF for posterity:

import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
         ("A", 2, 100, 2), ("F", 7, 100, 1), ("B", 10, 100, 100)
         )).toDF("c1", "c2", "c3", "c4")

val x = df.select("c1", "c2", "c3", "c4").rdd.map(x => (x.get(0),  List(x.get(1), x.get(2), x.get(3)))  )
val y = x.map {case (k, vL) => (k, vL.toSet.size) }
// Manipulate back to your DF, via conversion, join, what not.

Returns:

res15: Array[(Any, Int)] = Array((A,2), (F,3), (B,2))

Solution above better, as stated more so for posterity.

Comments

0

You can take help for udf and you can do like this.

//Input
df.show
+-----------+
|CarrierCode|
+-----------+
|1:[12,2,12]|
|  2:[5,2,8]|
|  3:[1,1,3]|
+-----------+
//udf
 val countUDF=udf{(str:String)=>val strArr=str.split(":"); strArr(0)+":"+strArr(1).split(",").distinct.length.toString}

df.withColumn("Carrier Count",countUDF(col("CarrierCode"))).show

//Sample Output:
+-----------+-------------+
|CarrierCode|Carrier Count|
+-----------+-------------+
|1:[12,2,12]|          1:3|
|  2:[5,2,8]|          2:3|
|  3:[1,1,3]|          3:3|
+-----------+-------------+

1 Comment

If you provide schema and sample Input for your dataframe then I can help you better.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.