I have stream of objects that I have to process in parallel (it is long running task), however, at same time I have to keep sequence of results. I am trying to use RxJava for this, and could not find any good solution.
Basic idea, is translate every object to function of future and then call get on each future in sequence. In case of future completed with Exception I need to retry it, till it is actually get completed successfully.
public class RxTest {
private static CompletableFuture<String> futureFunction(int i) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (ThreadLocalRandom.current().nextInt(1, 100) < 10) {
throw new RuntimeException("failed.. sorry");
}
return Thread.currentThread().getName() + " " + String.valueOf(i);
});
}
public static void main(String[] args) throws InterruptedException {
Observable.range(1, 100)
.flatMap(i ->
Observable.just(i).map(RxTest::futureFunction).flatMap(Observable::from).retry()
)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(Subscribers.create((x) -> {
System.out.println(Thread.currentThread().getName() + " " + x);
}));
}
}
In this example I am receive sequential output
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 1
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 2
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 3
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 4
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 5
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 6
But each future is created and called only after previous complete, not in parallel as I expected.
I was playing around with observeOn() and subscribeOn(), nothing works well.