0

I am trying to solve a problem using tail recursion. The use case is:

I have list of folders and each folder has list of files and each file has several records. I want to perform some transformation the records and write them to kinesis in batches.

val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))

I want to write let's say two records at a time in kinesis. So far I have tried:

listOfFolders.map { folder =>
    val files = fetchAllFilesFromFolder(folder)
    if (files.nonEmpty) {
      sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
    } else {
      logger.info(s"No files are present in folder")
    }
  }

  @scala.annotation.tailrec
  def sendBatch(
    files: Seq[Files],
    buffer: Seq[(ByteBuffer, String)],
    numberOfRecordsToSend: Int
  ): Unit =
    files match {
      case Nil => {
        if (buffer.nonEmpty) {
          sendToKinesis(streamName, buffer) map { putDataResult =>
            val putDataList = putDataResult.getRecords.asScala.toList
            logger.info(
              s"Successfully Sent"
            )
          }
        } else {
          logger.info(s"Successfully sent")
        }
      }
      case head :: tail => {
        val fileData = readFileData()
        val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)

        val currentBatch = buffer ++ byteData
        if (currentBatch.size >= numberOfRecordsToSend) {
          sendToKinesis(streamName, buffer)  map { putRecordRes =>
            val putDataList = putRecordRes.getRecords.asScala.toList
            logger.info(
              s"Sent successfully" 
            )
          }
          sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
        } else {
          sendBatch(tail, currentBatch, 2)
        }
      }
    }

sendToKinesis uses KCL putRecords.

Problems with the above code is:

  • Reads all the data from one file. So if file has 5 records will send 5 records to kinesis but the batch size is 2.

    Can't call the tail recursive method from map.

    Also to be taken care if - If file1 has 3 records it should send 2 records RF11, RF12 together and then RF13,RF21 together and at last RF22.

I do not want to use any var in my code. Can it be solved using the tail rec?

1 Answer 1

2

You have two subproblems there

  1. How to send batches of fixed size
@scala.annotation.tailrec
def sendBatch(file: Option[File], buffer: Seq[(ByteBuffer, String)], numbersOfRecrodsToSend: Int): Seq[(ByteBuffer, String)] = {
  if (buffer.length < numbersOfRecrodsToSend) {
    // case 1: too few records to be sent 
    file match {
      // case 1.1: file was not yet read
      case Some(f) => sendBatch(None, buffer ++ getByteData(f), numbersOfRecrodsToSend)
      // case 1.2: too few records, file was already read, return leftover records
      case None => buffer
    }
  } else {
    // case 2: we can send numbersOfRecrodsToSend to Kinesis
    val (toSend, newBuffer) = buffer.splitAt(numbersOfRecrodsToSend)
    sendToKinesis(streamName, toSend)
    sendBatch(file, newBuffer, numbersOfRecrodsToSend)
  }
}
  1. How to iterate over the list and send batches of fixed size
// start with empty list of files to send and for each folder
// add it's files to the buffer and send as many records as you can
// the leftover is going to be passed to next iteration for both files and directories
val partial = listOfFolders.foldLeft(Seq.empty[(ByteBuffer, String)]) { case (acc, folder) =>
  fetchAllFilesFromFolder(folder).foldLeft(acc) { case (acc2, file) => 
    sendBatch(Some(file), acc2, numbersOfRecrodsToSend)
  }
}

// if any records have left - send them too
if (partial.nonEmpty) {
  sendToKinesis(streamName, partial)
}

Hopefully you got the idea.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank You. Really helped a lot

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.