1

I have an async task represented by Futures executing in a separate threadpool that I want to join using RxJava. The "old" way of doing it using Java 5 constructs would be something like this (omitting collecting the results):

final Future<Response> future1 = wsClient.callAsync();
final Future<Response> future2 = wsClient.callAsync();
final Future<Response> future3 = wsClient.callAsync();
final Future<Response> future4 = wsClient.callAsync();

future1.get();
future2.get();
future3.get();
future4.get();

This would block my current thread until all futures are completed, but the calls would be in parallell and the whole operation would only take the time equal to the longest call.

I want to do the same using RxJava, but I'm a bit noob when it comes to how to model it correctly.

I've tried the following, and it seems to work:

Observable.from(Arrays.asList(1,2,3,4))
            .flatMap(n -> Observable.from(wsClient.callAsync(), Schedulers.io()))
            .toList()
            .toBlocking()
            .single();

The problem with this approach is that I introduce the Schedulers.io threadpool which causes unnecessary thread switching as I'm already blocking the current thread (using toBlocking()). Is there any way I can model the Rx flow to execute the tasks in parallel, and block until all has been completed?

6
  • If your service can't do at least callback-style notifications, then your best option is that of blockingly wait on those Futures. Commented Feb 25, 2016 at 12:19
  • That's what I'm doing in both cases, and is precisely what I'm aiming for. But the question was how to not introduce the Schedulers.io threadpool as the Future's are already running in their own. Commented Feb 25, 2016 at 12:32
  • You can't omit the scheduler because from calls get() the same way you do in the first example. Commented Feb 25, 2016 at 13:00
  • @akarnokd - Are you sure? To me it seems that the calls are made each one in sequential order, meaning that each Future is created only once the preceding one has been constructed and returned it's get(). Commented Feb 25, 2016 at 14:40
  • The only way to make sure flatMap doesn't block on each input item is to make those sources subscribe on a different scheduler, like the overload you have in the second example. You can try and wrap the Executor of those Futures into a scheduler and use that instead of io(), but there is no telling what will happen. Commented Feb 25, 2016 at 14:51

1 Answer 1

1

You should use zip function. For example like this:

Observable.zip(
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        (response1, response2, response3, response4) -> {
            // This is a zipping function...
            // You'll end up here when you've got all responses
            // Do what you want with them and return a combined result
            // ...
            return null; //combined result instead of null
        })
        .subscribe(combinedResult -> {
            // Use the combined result
        });

Observable.zip can also work with an Iterable so you can wrap your Observable.from(wsClient.callAsync(), Schedulers.io()); around with one (that returns 4 of those).

Sign up to request clarification or add additional context in comments.

2 Comments

The original post stated that I wanted to avoid waiting on the futures on the Shedulers.io threadpool.
But here you're NOT using toBlocking(). so you're not blocking the current thread

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.