1

I've written following snipped to mimic my problem:

    
    implicit val actorSystem: ActorSystem = ActorSystem()
    implicit val executionContext: ExecutionContext = actorSystem.dispatcher

    Source(1 to 1000000)
      .mapAsync(parallelism = 100) { _ =>
        Future {
          new Array[Byte](100 * 1024)
        }
      }
      .runWith(Sink.ignore)
      .onComplete(_ => actorSystem.terminate())

Based on my understanding, the mapAsync should have 100 futures in-flight at max. It will have an internal buffer for ordering but since all futures are taking same time here, it shouldn't have head of the line blocking problem.

In any case, once downstream consumes the payload, it should be garbage collected. But for some reason, the heap keeps grown indefinitely and I get an OOM. The problem gets resolved if I use mapAsyncUnordered or introduce an async boundary after mapAsync.

Can anyone explain what's going on here?

1
  • 1
    I created github.com/apache/pekko/issues/2412 - someone may be able to debug it. If possible, could you add version details to the GitHub issue? Commented Oct 31 at 21:28

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.