4

I have to make N REST API calls and combine the results of all of them, or fail if at least one of the calls failed (returned an error or a timeout). I want to use RxJava and I have some requirements:

  • Be able to configure a retry of each individual api call under some circumstances. I mean, if I have a retry = 2 and I make 3 requests each one has to be retried at most 2 times, with at most 6 requests in total.
  • Fail fast! If one API calls have failed N times (where the N is the configuration of the retries) it doesn't mater if the remaining requests hasn't ended, I want to return an error.

If I wish to make all the request with a single Thread, I would need an async Http Client, wouldn't?

Thanks.

2 Answers 2

2

You could use Zip operator to zip all request together once they ends and check there if all of them were success

 private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

/**
 * Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
 */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
            .concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));
}

private Observable<String> obAsyncString() {
    return Observable.just("Request1")
            .observeOn(scheduler)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "Hello");
}

private Observable<String> obAsyncString1() {
    return Observable.just("Request2")
            .observeOn(scheduler1)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> " World");
}

private Observable<String> obAsyncString2() {
    return Observable.just("Request3")
            .observeOn(scheduler2)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "!");
}

In this example we just concat the results, but instead of do that, you can check the results and do your business logic there.

You can also consider merge or contact also.

you can take a look more examples here https://github.com/politrons/reactive

Sign up to request clarification or add additional context in comments.

1 Comment

Creating more than 1 Schedulers.newThread() is unnecessary. It creates a single new thread for each worker that is created, so just reusing the same scheduler would have the same result.
0

I would suggest to use an Observable to wrap all the calls.

Let's say you have your function to call the API:

fun restAPIcall(request: Request): Single<HttpResponse>

And you want to call this n times. I am assuming that you want to call them with a list of values:

val valuesToSend: List<Request>

Observable
    .fromIterable(valuesToSend)
    .flatMapSingle { valueToSend: Request ->
        restAPIcall(valueToSend)
    }
    .toList() // This converts: Observable<Response> -> Single<List<Response>>
    .map { responses: List<Response> -> 
        // Do something with the responses
    }

So with this you can call the restAPI from the elements of your list, and have the result as a list.

The other problem is the retries. You said you wanted to retry when an individual cap is reached. This is tricky. I believe there is nothing out of the box in RxJava for this.

  • You can use retry(n) where you can retry n times in total, but that is not what you wanted.
  • There's also a retryWhen { error -> ... } where you can do something given an exception, but you would know what element throw the error (unless you add the element to the exception I think).

I have not used the retries before, nevertheless it seems that it retries the whole observable, which is not ideal.

My first approach would be doing something like the following, where you save the count of each element in a dictionary or something like that and only retry if there is not a single element that exceeds your limit. This means that you have to keep a counter and search each time if any of the elements exceeded.

val counter = valuesToSend.toMap()

yourObservable
    .map { value: String ->
        counter[value] = counter[value]?.let { it + 1 }?: 0 // Update the counter
        value // Return again the value so you can use it later for the api call
    }
    .map { restAPIcall(it) }
    // Found a way to take yourObservable and readd the element if it doesn't exceeds
    // your limit (maybe in an `onErrorResumeNext` or something). Else throw error

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.