0

I have stream of objects that I have to process in parallel (it is long running task), however, at same time I have to keep sequence of results. I am trying to use RxJava for this, and could not find any good solution.

Basic idea, is translate every object to function of future and then call get on each future in sequence. In case of future completed with Exception I need to retry it, till it is actually get completed successfully.

public class RxTest {

    private static CompletableFuture<String> futureFunction(int i) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (ThreadLocalRandom.current().nextInt(1, 100) < 10) {
                throw new RuntimeException("failed.. sorry");
            }

            return Thread.currentThread().getName() + " " + String.valueOf(i);
        });
    }

    public static void main(String[] args) throws InterruptedException {

        Observable.range(1, 100)
                .flatMap(i ->
                        Observable.just(i).map(RxTest::futureFunction).flatMap(Observable::from).retry()
                )

                .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

                .subscribe(Subscribers.create((x) -> {
                    System.out.println(Thread.currentThread().getName() + " " + x);
                }));

    }
}

In this example I am receive sequential output

pool-1-thread-1 ForkJoinPool.commonPool-worker-9 1
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 2
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 3
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 4
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 5
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 6

But each future is created and called only after previous complete, not in parallel as I expected.

I was playing around with observeOn() and subscribeOn(), nothing works well.

1 Answer 1

1

You can use concatMapEager or concatEager which will run the sources but keeps the item order when emitting:

Observable.range(1, 100)
.concatMapEager(v -> 
    Observable.fromCallable(() -> { ... })
    .subscribeOn(Schedulers.computation())
    .retry()
)
.subscribe(...)
Sign up to request clarification or add additional context in comments.

3 Comments

Works like a charm :) Thank you!
@akarnokd how much of an experimental API is concatMapEager?
I don't know about any plan for major API restructuring for the 1.x branch for a year.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.