4

I have reading records from a kafka source to mydataframe spark dataframe. I want to pick some column from the row and do some operation. So to check if I am getting the correct index, I tried to print the index in the statement println(row.getFieldIndex(pathtoDesiredColumnFromSchema)) as shown below:

val pathtoDesiredColumnFromSchema = "data.root.column1.column2.field"
val myQuery = mydataframe.writeStream.foreach(new ForeachWriter[Row]() {

  override def open(partitionId: Long, version: Long): Boolean = true
  override def process(row: Row): Unit = {
    println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
  }

  override def close(errorOrNull: Throwable): Unit = {}
}).outputMode("append").start()

But the above code says that row has only one name as data, and there is no column name data.root.column1.column2.field.

What is the correct way to get columns values from the spark sql row by name paths?

4
  • 1
    Did you try row.getAs("columnName")? Commented Aug 1, 2018 at 17:37
  • @ManojKumarDhakd do you mean like row.getAs("data.root.column1.column2.field")? or just row.getAs("field")? Commented Aug 1, 2018 at 18:19
  • row.getAs("field") Commented Aug 1, 2018 at 18:22
  • row.getAs[String]("column_name") should get you (a casted to scala String version of) the value Commented Oct 8, 2021 at 14:19

2 Answers 2

4

You may use chain of getAs invocations for struct types, for example:

val df = spark.range(1,5).toDF.withColumn("time", current_timestamp())
.union(spark.range(5,10).toDF.withColumn("time", current_timestamp()))
.groupBy(window($"time", "1 millisecond")).count


df.printSchema
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

df.take(1).head
          .getAs[org.apache.spark.sql.Row]("window")
          .getAs[java.sql.Timestamp]("start")

Hope it helps!

Sign up to request clarification or add additional context in comments.

Comments

1

If all you want to do is to print the field of a DataFrame you can use

mydataframe.select(pathtoDesiredColumnFromSchema).foreach(println(_.get(0)))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.