I'm working on a service that needs to make some stream processing for products.
Given a Company we can use getProducts(Company company) to get List<Product>.
The next thing I'd like to do is to filter that list. For each product I make a query to a DB to check if I already processed the given product.
So we have something like:
companies
.stream()
.flatMap(this::getProducts)
.filter(this::alreadyProcessed)
The next thing I'd like to do is to map each Product to CompleteableFuture that will handle the rest of the processing, asynchronously.
Now for the problem:
It's not really a stream processing in the sense that the map() to a CompletableFuture won't happen until the filter() operation is done.
I think I can:
- use
parallelStream, but that doesn't really solve the problem - I can instead of the above code, map a
Productto aCompletableFuture<Optional<Product>>but this is quite annoying as the downstream would have to deal with a lot of emptyOptional's
What is the appropriate way to write this kind of stream processing in a way that won't block the chain of operations?
reactor,rxJavaorKotlin Coroutines?flatMap.map(p -> { CompletableFuture<CompletableFuture<Product>> nested = CompletableFuture.supplyAsync(() -> { if(isAlreadyProcessed(p)){ return CompletableFuture.failedFuture(new RuntimeException()); } return CompletableFuture.completedFuture(p); }); return nested.thenCompose(Function.identity()); })this way all subsequent code has to deal withCompletableFuture<Product>, of course some of those being failed.