0

I need some help in implementing parallel asynchronous calls using RxJava2 & Retrofit2. My requirements are;

1) I have multiple Insurer(for now I take only two),and I need to send multiple parallel requests using that insurer name.

2)If any of them give server error then remaining requests should not gets block.

Following is what I tried until now;

ArrayList<String> arrInsurer = new ArrayList<>();
        arrInsurer.add(AppConstant.HDFC);
        arrInsurer.add(AppConstant.ITGI);

        RequestInterface service = getService(ServiceAPI.CAR_BASE_URL);
        for (String insurerName : arrInsurer) {
            service.viewQuote(Utils.getPrefQuoteId(QuoteListActivity.this), insurerName)
                    .subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ViewQuoteResDTO>() {
                        @Override
                        public void accept(@NonNull ViewQuoteResDTO viewQuoteResDTO) throws Exception {
                            Log.e("Demo", viewQuoteResDTO.getPremiumData().getIDV()+"");
                             updateList();
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e("Demo", throwable.getMessage());
                        }
                    });
        }

private RequestInterface getService(String baseUrl) {      
    Gson gson = new GsonBuilder()
            .setLenient()
            .create();

    return new Retrofit.Builder()
            .baseUrl(baseUrl)                
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create(gson))
            .build().create(RequestInterface.class);

}

Now, Above code works fine only if both request give successful response. But when any of request give response as a internal server error then rest of request also gets block.

following Log error which I get when any of request give Failure response;

E/Demo: HTTP 500 Aww Snap, Some thing happened at server. Please try back again later.
E/Demo: unexpected end of stream on Connection{100.xxx.xxx.xx:portNo, proxy=DIRECT@ hostAddress=/100.xxx.xxx.xx:portNo cipherSuite=none protocol=http/1.1}

How to handle this error?

1 Answer 1

1

I guess like any other Rx related question this has multiple answers. I will give you mine which I use in our app and solves exactly this use case. Hope it helps.

Short version - This relies on mergeDelayError. Check it out here

Why merge? Because unlike concat it will execute the observables in parallel. Why mergeDelayError? It delays the error... essentially it will execute every observable and deliver the error when everything finishes. This makes sure that even if one or several error, the others will still be executed.

You have to be careful with some details. The order of events is no longer preserved, meaning the merge operator may interleave some of the observable events (Given how you were doing things before, this shouldn't be an issue). As far as I know, even if multiple observables fail, you'll only get one onError call. If both of these are ok, then you could try the following:

List<Observable<ViewQuoteResDTO>> observables = new ArrayList<>();
for (String insurerName : arrInsurer) {     
   observables.add(service.viewQuote(
         Utils.getPrefQuoteId(QuoteListActivity.this), insurerName));
}

Observable.mergeDelayError(observables)
          .subscribeOn(Schedulers.computation())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(/* subscriber calls if you need them */);

The idea is to create all observables that you're going to run and then use mergeDelayError to trigger them.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.