2

First, I want to say that I am very new at Scala and I am having problems with basic conversion formats... I hope I can get better in functional programming in the near future, so sorry if this is a dumb question

Using Spark SQL I do a query and I get the results in a variable called "probesGroupby"

  val probesGroupby = sqlContext.sql("SELECT id_counter as id_counter, co_mac as co_mac, ts_timestamp as ts_timestamp, max(qt_rssi) as qt_rssi, count(*) as qt_tracks " +
                                     " FROM probes GROUP BY id_counter, co_mac, ts_timestamp")

All right until here. After this, I need to write data into a InfluxDB Database and the API demands this format:

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  Array(
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    ...
  )
)
assert(None == client.writeSeries(Array(probeRequest)))

How may I create the variable "probeRequest" with this format so I have an Array with values for each Row returned by the query? I have tried some stuffs but it doesn't seem to work :(

Thank you in advance,

2 Answers 2

1

You'll have to wrap it using Series but otherwise it is as simple as that:

probesGroupby.map(_.toSeq.toArray).collect

or if you prefer more explicit approach you can use pattern matching:

rdd.map { case Row(idCounter, coMac, time, qtRssi, tTracks) => 
    Array(idCounter, coMac, time, qtRssi, tTracks)
} collect
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you very much, Could you please tell me which is the difference between doing toSeq.toArray before/after Collect() ¿?¿?
Well, difference is fundamental. If you collect first and then map as in accepted answer then everything is processed sequentially on a driver side. When mapping first, and collecting afterwards creating arrays is done in parallel on the worker nodes and send to the driver.
Thank you very much for the explanation. I should work more on functional programming. Thanks
1

First you collectdata from RDD to Array on a driver, and then transform each individual Row. If you now the types of columns you can use getInt, getLong etc instead of just get

val probesGroupby: RDD[Row] = ...

val payload: Array[Array[Any]] = probesGroupby.collect().map { row =>
   val idCounter = row.get(0);
   val coMac = row.get(1)
   val time = row.get(2)
   val qtRssi = row.get(3)
   val qtTracks = row.get(4)
   Array(idCounter, coMac, time, qtrs., qtTracks)
   // or just: row.toArray()
}

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  payload,
  ...
 )
 )

1 Comment

Thank you very much, Finally I did like this: val influxData = new ArrayBuffer[Array[Any]]() probesGroupbySecond.collect().foreach(row => // id_counter, co_mac, ts_timestamp(Long), qt_rssi, qt_tracks influxData += Array(row.getInt(0),row(1).toString,row.getLong(2),row.getInt(3),row.getLong(4))) But your way is much cleaner! =)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.