My goal is to create a dataframe dynamically from columns and values from an external dataframe. This is how the dataframe is created with manual schema and data definitions:
val columnSufix: String = "isNull"
val data = Seq(Row(
details.filter(col("DAY").isNull).count(),
details.filter(col("CHANNEL_CATEGORY").isNull).count(),
details.filter(col("SOURCE").isNull).count(),
details.filter(col("PLATFORM").isNull).count()
)
)
val schema: StructType = new StructType()
.add(s"DAY_$columnSufix", LongType)
.add(s"CHANNEL_CATEGORY_$columnSufix", LongType)
.add(s"SOURCE_$columnSufix", LongType)
.add(s"PLATFORM_$columnSufix", LongType)
val testDf: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
testDf.show(false)
The output when I execute the above script is the following:
columnSufix: String = isNull
data: Seq[org.apache.spark.sql.Row] = List([0,0,0,83845])
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DAY_isNull,LongType,true),StructField(CHANNEL_CATEGORY_isNull,LongType,true),StructField(SOURCE_isNull,LongType,true),StructField(PLATFORM_isNull,LongType,true))
testDf: org.apache.spark.sql.DataFrame = [DAY_isNull: bigint, CHANNEL_CATEGORY_isNull: bigint ... 2 more fields]
So far so good. The problem is when I use the below script that makes the creation of the DataFrame more dynamic, I get an ArrayIndexOutOfBoundsException error:
val cols = details.columns.toSeq.take(4)
val columnSuffix: String = "ISNULL"
val data = cols.map(column => Row(details.filter(col(column).isNull).count())).toList
val schema = StructType(cols.map(column => StructField(column + s"_$columnSuffix", LongType)))
val testDf: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
testDf.show(false)
The output when I compiled the above script is similar as the manual dataFrame creation, with the exception that when I run testDf.show(false), it gives me this error:
Caused by: RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, DAY_ISNULL), LongType, false) AS DAY_ISNULL#243076L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, CHANNEL_CATEGORY_ISNULL), LongType, false) AS CHANNEL_CATEGORY_ISNULL#243077L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, SOURCE_ISNULL), LongType, false) AS SOURCE_ISNULL#243078L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, PLATFORM_ISNULL), LongType, false) AS PLATFORM_ISNULL#243079L
Caused by: ArrayIndexOutOfBoundsException:
What's this happening? My feeling is that somehow Spark is not sending the values of details.filter(col(column).isNull).count()) straight to the invocation of createDataFrame.