3

I'm working on a service that needs to make some stream processing for products.

Given a Company we can use getProducts(Company company) to get List<Product>.

The next thing I'd like to do is to filter that list. For each product I make a query to a DB to check if I already processed the given product.

So we have something like:

        companies
                .stream()
                .flatMap(this::getProducts)
                .filter(this::alreadyProcessed)

The next thing I'd like to do is to map each Product to CompleteableFuture that will handle the rest of the processing, asynchronously.

Now for the problem:

It's not really a stream processing in the sense that the map() to a CompletableFuture won't happen until the filter() operation is done.

I think I can:

  1. use parallelStream, but that doesn't really solve the problem
  2. I can instead of the above code, map a Product to a CompletableFuture<Optional<Product>> but this is quite annoying as the downstream would have to deal with a lot of empty Optional's

What is the appropriate way to write this kind of stream processing in a way that won't block the chain of operations?

5
  • 1
    There might be a third option, executing a single SQL query to get all processed product of the given company Commented Aug 21, 2021 at 9:43
  • 3
    If you want to write a "chain of operations" with support for asynchronous behaviour, you should switch to more reactive libraries like reactor, rxJava or Kotlin Coroutines? Commented Aug 21, 2021 at 10:06
  • @AlexRudenko, yeah, my bad. It's supposed to be flatMap Commented Aug 21, 2021 at 10:08
  • 1
    what if you do : .map(p -> { CompletableFuture<CompletableFuture<Product>> nested = CompletableFuture.supplyAsync(() -> { if(isAlreadyProcessed(p)){ return CompletableFuture.failedFuture(new RuntimeException()); } return CompletableFuture.completedFuture(p); }); return nested.thenCompose(Function.identity()); }) this way all subsequent code has to deal with CompletableFuture<Product>, of course some of those being failed. Commented Aug 22, 2021 at 2:46
  • Thanks @Eugene, I'll try this approach! Commented Aug 22, 2021 at 7:51

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.