I've written following snipped to mimic my problem:
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
Source(1 to 1000000)
.mapAsync(parallelism = 100) { _ =>
Future {
new Array[Byte](100 * 1024)
}
}
.runWith(Sink.ignore)
.onComplete(_ => actorSystem.terminate())
Based on my understanding, the mapAsync should have 100 futures in-flight at max. It will have an internal buffer for ordering but since all futures are taking same time here, it shouldn't have head of the line blocking problem.
In any case, once downstream consumes the payload, it should be garbage collected. But for some reason, the heap keeps grown indefinitely and I get an OOM. The problem gets resolved if I use mapAsyncUnordered or introduce an async boundary after mapAsync.
Can anyone explain what's going on here?