I have reading records from a kafka source to mydataframe spark dataframe.
I want to pick some column from the row and do some operation. So to check if I am getting the correct index, I tried to print the index in the statement println(row.getFieldIndex(pathtoDesiredColumnFromSchema)) as shown below:
val pathtoDesiredColumnFromSchema = "data.root.column1.column2.field"
val myQuery = mydataframe.writeStream.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(row: Row): Unit = {
println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
}
override def close(errorOrNull: Throwable): Unit = {}
}).outputMode("append").start()
But the above code says that row has only one name as data, and there is no column name data.root.column1.column2.field.
What is the correct way to get columns values from the spark sql row by name paths?
row.getAs[String]("column_name")should get you (a casted to scala String version of) the value