import org.apache.spark.sql._
import spark.implicits._
val list = List(("K1", 10, 20, 10, 20,50), ("K2", 20, 30, 20, 10, 60))
val yourDF = sc.parallelize(list).toDF("Key", "Today", "MTD", "QTD", "HTD", "YTD")
// yourDF.show()
// +---+-----+---+---+---+---+
// |Key|Today|MTD|QTD|HTD|YTD|
// +---+-----+---+---+---+---+
// | K1| 10| 20| 10| 20| 50|
// | K2| 20| 30| 20| 10| 60|
// +---+-----+---+---+---+---+
val newDataFrame = yourDF
.rdd
.flatMap(row => {
val key = row.getString(0)
val todayAmt = row.getInt(1)
val mtdAmt = row.getInt(2)
val qtdAmt = row.getInt(3)
val htdAmt = row.getInt(4)
val ytdAmt = row.getInt(5)
List(
(key, "today", todayAmt),
(key, "MTD", mtdAmt),
(key, "QTD", qtdAmt),
(key, "HTD", htdAmt),
(key, "YTD", ytdAmt)
)
})
.toDF("Key", "PRD", "Amt" )
// newDataFrame.show()
// +---+-----+---+
// |Key| PRD|Amt|
// +---+-----+---+
// | K1|today| 10|
// | K1| MTD| 20|
// | K1| QTD| 10|
// | K1| HTD| 20|
// | K1| YTD| 50|
// | K2|today| 20|
// | K2| MTD| 30|
// | K2| QTD| 20|
// | K2| HTD| 10|
// | K2| YTD| 60|
// +---+-----+---+