2

Help in composing multiple network calls and accumulate the result in Rxjava. (I am using in an Android application.)

State 
 -- List<City> cityList;

 City
 - cityId;

RestCall 1
Observable<State> stateRequest = restService.getStates();

RestCall 2
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId);

In UI i have to display list of cities after getting all the details of each city and then show in the listview. How do i achieve the parllel network calls and accumulate the result. ?

I want all the city detail results to be put in List in source State 'object'. As state object has some information which need to be dislayed as well.Is this possible ?

stateRequest ??? 
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(State result) {
    // Get city list and display
    }
});

I checked this example which shows how we can zip more tha one observable response. Below snippet shows 3 observables combined. But in my case i have to make 20 network calls parallel or sequential ( i mean in background but one after another). How do i achieve this. Any help or directions ?

https://gist.github.com/skehlet/9418379

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }

3 Answers 3

3

I think that your code can be simplified to something like this, as your use of the zip operator is close to the use of toList operator

 stateRequest
 .subscribe(State state ->  {
     Observable.from(state.getCityList())
               .flatMap(City city -> restService.getCityDetail(city.getId())
               .toList()
               .subscribe(List<City> cities -> {

                     state.clear();
                     state.addAll(cities);
               });
     });

As RxJava doesn't provide a throttle operator, you may build something similar like this :

Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c);

Using this, limiter is an observable that will emit a city each second.

So, with your code, if you want to limit call to getCityDetail for example :

 Observable<Object> limiter = Observable.interval(1, SECONDS);
 stateRequest
 .subscribe(State state ->  {
     Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c)
               .flatMap(City city -> restService.getCityDetail(city.getId())
               .toList()
               .subscribe(List<City> cities -> {

                     state.clear();
                     state.addAll(cities);
               });
     });
Sign up to request clarification or add additional context in comments.

1 Comment

Zip with interval is a good approach.Worked for me. Accepted your answer.
1
stateRequest
.flatMap(new Func1<State, Observable<State>>() {
    @Override
    public Observable<State> call(final State state) {

        List<Observable> cityObservablesList = new ArrayList<Observable>();

        for(City city: state.getCityList()) {
            cityObservablesList.add(restService.getCityDetail(city.getId());
        }

        Observable cityObservables = Observable.from(cityObservablesList);
        return Observables.zip(cityObservables, new FuncN<State>() {
            @Override
            public State call(Object... args) {
                List<City> cityList = state.getCityList();
                cityList.clear();
                for(Object object: args) {
                    cityList.add((City)object);
                }

                return state;
            }
        })
    })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(State result) {
    // Get city list and display
    }
});

I got it working with the help of zip operator and Iterable for city list as first parameter. But i face another issue. Since the zip executes the job in parallel, 10-15 network calls are executed in parallel and server rejecting with Maximum Query Per Second error (QPS - 403). How do i instruct the zip operator to execute the tasks one after another ?

I did solve this issue by adding a delay [delay(c*200, TimeUnit.MILLISECONDS))] to city observable. But doesn't seem like a proper solution.

Any advise ?

1 Comment

My advice is to ask this as a new question. Nevertheless, zip shouldn't execute the items in parallel - Rx has behaviour guarantees. I would look at your call to getCityDetail in the for loop. Is that what's causing the issue?
0

Take a look at flatMap(Function.., BiFunction..). Maybe that's what you need.

statesRepository.getStates()
   .flatMap(states-> Observable.fromIterable(states))
   .flatMap(
           state-> cityRepository.getStateCities(state),
           (state, cityList) -> {
               state.setCities(cityList);
               return state;
           })
   .subscribe(state-> showStateWithCity(state));

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.