9

I'm new to RxJava, here's my case,

  1. send request A and will get List<A> back
  2. for each A, send request AA and will get AA back, bind A and AA then
  3. there is B & BB with similar logic
  4. do something only after all requests complete

Example:

request(url1, callback(List<A> listA) {
    for (A a : listA) {
        request(url2, callback(AA aa) {
            a.set(aa);
        }
    }
}

A and B are independent

How to structure the code? I also used Retrofit as network client.

4
  • Handlers and a progress integer to control if you reach the end of the logic or not. Commented Oct 14, 2014 at 9:16
  • 2
    @Oliveira, yes, handler is one solution, but I want to know if RxJava could offer a better and simple solution Commented Oct 14, 2014 at 9:21
  • What exactly do you mean by "bind A and AA"? So you have one request that yields a List of As, and then for each A there will be exactly one AA? Also, do you still need the As when you have the AAs? What are you going to do with the AAs - is there one action for each or a combined action for all AAs together? Commented Oct 14, 2014 at 13:26
  • @david.mihola, there is one AA for every A, 1-1 map. I can't get both A & AA in one request because of limit of server. The "bind A and AA" is the action after AA retrieved, sth like A.set(AA) Commented Oct 15, 2014 at 1:08

1 Answer 1

14

OK, I think this should solve the first part of your problem:

Notice that the second call to flatMap is given 2 arguments - there is a version of flatMap that not only produces an Observable for each input item but that also take a second function which in turn will combine each item from the resulting Observable with the corresponding input item.

Have a look at the third graphic under this heading to get an intuitive understanding:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable

Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>, Observable<A>>() {

    @Override
    public Observable<A> call(List<A> listOfAs) {
        return Observable.from(listOfAs);
    }

)}
.flatMap(new Func1<A, Observable<AA>>() {

    @Override
    public Observable<AA> call(A someA) {
        return retrofitClient.getTheAaForMyA(someA);
    }

},
new Func2<A, AA, A>() {

    @Override
    public A call(A someA, AA theAaforMyA) {
        return someA.set(theAaforMyA);
    }

})
...

From here on I am still not sure how you want to continue: Are you ready to just subscribe to the resulting Observable of As? That way you could handle each of the As (onNext) or just wait until all are done (onCompleted).

ADDENDUM: To collect all Items into a single List at the end, that is turn your Observable<A> into an Observable<List<A>> use toList().

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist

So you have:

Observable<List<A>> observableOfListOfAs = observableOfAs.toList();

If you need more fine grained control over the construction of your list, you can also use reduce.

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce

For the Bs, simply duplicate the whole flow you used for the As.

You can then use zip to wait for both flows to complete:

Observable.zip(
    observableOfListOfAs,
    observableOfListOfBs,
    new Func2<List<A>, List<B>, MyPairOfLists>() {

        @Override
        public MyPairOfLists call(List<A> as, List<B> bs) {
            return new MyPairOfLists(as, bs);
        }
    }
)
.subscribe(new Subscriber<MyPairOfLists>() {

    // onError() and onCompleted() are omitted here

    @Override
    public void onNext(MyPairOfLists pair) {
        // now both the as and the bs are ready to use:

        List<A> as = pair.getAs();
        List<B> bs = pair.getBs();

        // do something here!
    }
});

I suppose you can guess the definition of MyPairOfLists.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks, Func2 for flatmap looks great! At the end, I would require collection A, I think there will be a merge? How's B part then?
So you need to wait until all As have their AA and then collect them into a list? I think the Bs should work just like the As? And there is a final action that should be executed when both the List<A> and the List<B> are ready?
thanks for your patient explanation, now everything worked! RxJava did make code clean and simple!
@david.mihola I'm making a list of async network request using rxjava-async,not use Retrofit,the exception says I have used it in main thread,but I've subscribeOn new thread.
If I understood correctly, the zip is emitting the elements in pairs. Instead of zip what operator can I use to get notified at the end of both lists? I have two lists of requests (with different sizes), and just want to execute some code at the end.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.