2

I have an array of objects retrieved from my local DB and needs to be uploaded to server in sequence of API calls. For each local DB object I have to call two asynchronous API calls(methodA(),methodB()) subsequently. After going through the whole loop I need to call another API call as follows.

for(Object object: localDBObjects){
    methodA() -> methodB()
}
methodC()

My problem is how can I block the methodC() call until finishing the loop.

2 Answers 2

3

How about this:

public class Main {

public static CountDownLatch finishLatch = new CountDownLatch(1); 

public static Integer methodA(Integer obj) {
    try {
        Thread.sleep((int) (Math.random() * 1000)); //Simulate asynchronous call
    } catch (InterruptedException e) {
    }
    System.out.println("methodA for " + obj + " executed by " + Thread.currentThread().getName());
    return obj;
}

public static Integer methodB(Integer obj) {
    try {
        Thread.sleep((int) (Math.random() * 1000)); //Simulate asynchronous call
    } catch (InterruptedException e) {
    }
    System.out.println("methodB for " + obj + " executed by " + Thread.currentThread().getName());
    return obj;
}

public static void methodC() {
    System.out.println("methodC executed by " + Thread.currentThread().getName());
    finishLatch.countDown(); //Allow main to finish
}

public static void main(String[] args) throws IOException, InterruptedException {
    List<Integer> objectsFromDb = Arrays.asList(1, 2, 3, 4, 5); //List of objects from the DB

    Observable.from(objectsFromDb) 
            .flatMap(obj -> Observable.fromCallable(() -> methodA(obj)).subscribeOn(Schedulers.io())) //Asynchronously call method A
            .flatMap(obj -> Observable.fromCallable(() -> methodB(obj)).subscribeOn(Schedulers.io())) //Asynchronously call method B
            .doOnCompleted(() -> methodC()) //When finished, call methodC
            .subscribe();

    finishLatch.await(); //Wait for everything to finish
}

}

Sample output:

methodA for 5 executed by RxCachedThreadScheduler-5
methodA for 2 executed by RxCachedThreadScheduler-2
methodA for 1 executed by RxCachedThreadScheduler-1
methodB for 1 executed by RxCachedThreadScheduler-2
methodB for 2 executed by RxCachedThreadScheduler-5
methodB for 5 executed by RxCachedThreadScheduler-6
methodA for 3 executed by RxCachedThreadScheduler-3
methodA for 4 executed by RxCachedThreadScheduler-4
methodB for 3 executed by RxCachedThreadScheduler-1
methodB for 4 executed by RxCachedThreadScheduler-2
methodC executed by RxCachedThreadScheduler-2
Sign up to request clarification or add additional context in comments.

Comments

1

Due to I haven't got a lot of information about your project and concrete implementation of methods, namely their arguments and return types, I have 2 assumtions.

Note: I hope you don't mind if I'll use lambda-expressions.

1). Methods return Observable<Object> like Retrofit
In this case, they looks like this:

public Observable<Object> methodA(Object o){
    return null;
}

public Observable<Object> methodB(Object o){
    return null;
}

public Observable<Object> methodC(Object[] objects){
    return null;
}

For this case you may use something like this:

Object[] localDBObjects = new Object[10];
Observable.just(localDBObjects)
        .flatMap(objects -> Observable.from(objects)
                                .flatMap(object -> methodA(object))
                                .flatMap(resultFromMethodA -> methodB(resultFromMethodA))
                                .toList())
        .flatMap(listOfResultsFromMethodB -> methodC(listOfResultsFromMethodB.toArray(new Object[listOfResultsFromMethodB.size()])))
        .subscribe(resultFromMethodC -> {
            //do something
        }, t -> t.printStackTrace());

2). In other case, ,methods return the Object like this:

public Object methodA(Object o){
    return null;
}

public Object methodB(Object o){
    return null;
}

public Object methodC(Object[] objects){
    return null;
}

In this case you need to change operators flatMap( ) to map( ) in some places:

    Object[] localDBObjects = new Object[10];
    Observable.just(localDBObjects)
            .flatMap(objects -> Observable.from(objects)
                                    .map(object -> methodA(object))
                                    .map(resultFromMethodA -> methodB(resultFromMethodA))
                                    .toList())
            .map(listOfResultsFromMethodB -> methodC(listOfResultsFromMethodB.toArray(new Object[listOfResultsFromMethodB.size()])))
            .subscribe(resultFromMethodC -> {
                //do something
            }, t -> t.printStackTrace());

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.