I am trying to solve a problem using tail recursion. The use case is:
I have list of folders and each folder has list of files and each file has several records. I want to perform some transformation the records and write them to kinesis in batches.
val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))
I want to write let's say two records at a time in kinesis. So far I have tried:
listOfFolders.map { folder =>
val files = fetchAllFilesFromFolder(folder)
if (files.nonEmpty) {
sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
} else {
logger.info(s"No files are present in folder")
}
}
@scala.annotation.tailrec
def sendBatch(
files: Seq[Files],
buffer: Seq[(ByteBuffer, String)],
numberOfRecordsToSend: Int
): Unit =
files match {
case Nil => {
if (buffer.nonEmpty) {
sendToKinesis(streamName, buffer) map { putDataResult =>
val putDataList = putDataResult.getRecords.asScala.toList
logger.info(
s"Successfully Sent"
)
}
} else {
logger.info(s"Successfully sent")
}
}
case head :: tail => {
val fileData = readFileData()
val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)
val currentBatch = buffer ++ byteData
if (currentBatch.size >= numberOfRecordsToSend) {
sendToKinesis(streamName, buffer) map { putRecordRes =>
val putDataList = putRecordRes.getRecords.asScala.toList
logger.info(
s"Sent successfully"
)
}
sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
} else {
sendBatch(tail, currentBatch, 2)
}
}
}
sendToKinesis uses KCL putRecords.
Problems with the above code is:
Reads all the data from one file. So if file has 5 records will send 5 records to kinesis but the batch size is 2.
Can't call the tail recursive method from map.
Also to be taken care if - If file1 has 3 records it should send 2 records RF11, RF12 together and then RF13,RF21 together and at last RF22.
I do not want to use any var in my code. Can it be solved using the tail rec?