Since this requires a more complex transformation, I've used datasets. This might not be as performant, but it will get what you want.
Setup
Creating some sample data to mimic your data.
val arrayData = Seq(
Row(1,List(1, 2, 3, 4, 5, 6, 7)),
Row(2,List(1, 2, 3, 4)),
Row(3,List(1, 2)),
Row(4,List(1, 2, 3))
)
val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
+---+---------------------+
|id |values |
+---+---------------------+
|1 |[1, 2, 3, 4, 5, 6, 7]|
|2 |[1, 2, 3, 4] |
|3 |[1, 2] |
|4 |[1, 2, 3] |
+---+---------------------+
*/
Transformations
// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]
// Here we are using a sliding window of size 3 and step 3.
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
val id = r.getInt(0)
val a = r.getSeq[Int](1).toArray
val arrays = a.sliding(3, 3).toArray
(id, arrays)
})
/*
+---+---------------------------------------------------------------+
|_1 |_2 |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
val df3 = df2
.withColumnRenamed("_1", "id")
.withColumnRenamed("_2", "values")
/*
+---+---------------------------------------------------------------+
|id |values |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
Use explode
Expode will create a new element for each array entry in the second column.
val df4 = df3.withColumn("values", functions.explode($"values"))
/*
+---+---------+
|id |values |
+---+---------+
|1 |[1, 2, 3]|
|1 |[4, 5, 6]|
|1 |[7] |
|2 |[1, 2, 3]|
|2 |[4] |
|3 |[1, 2] |
|4 |[1, 2, 3]|
+---+---------+
*/
Limitations
This approach is not without limitations.
Primarily, it will not be as performant on larger datasets since this code is no longer using dataframe built-in optimizations. However, the dataframe API might require the use of window functions, which can also have limited performance based on the size of the data. If it's possible to alter this data at the source, this would be recommended.
This approach also requires defining an encoder for something more complex. If the data schema changes, then different encoders will have to be used.