3

I'm struggling with RxJava2. I want to perform a function on each item of a list. This function :

public void function(final Result result) {

    FirebaseFirestore.getInstance().collection(COLLECTION_NAME).document(result.getId()).get().addOnSuccessListener(new OnSuccessListener<DocumentSnapshot>() {
        @Override
        public void onSuccess(DocumentSnapshot documentSnapshot) {
            // do some operation
        }
    });
}

This function is async and use FirebaseFirestore.

So I tried to use RxJava2 on my list to call the function for every item:

Observable.fromIterable(resultList)
                    .concatMap(result -> Observable.fromCallable(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            function(result);
                            return "ok";
                        }
                    }))
                    .subscribe(r -> {
                        // do some operation when all firebase async tasks are done
                    });

The concatMap works and the function is called for every item of the list. The problem is that I need a callback when all firebase async tasks are done.

Any help would be much appreciated.

5
  • any feedback from the suggested solution below? Commented Jul 29, 2018 at 10:56
  • @AnthonyFillion-Maillet: When all tasks are done onComplete should be called? Commented Jul 30, 2018 at 5:29
  • @Lino Thank you for your answer, I tried it but unfortunately it seems like onComplete is not called Commented Jul 30, 2018 at 14:15
  • @AmitVikramSingh yes exactly Commented Jul 30, 2018 at 14:15
  • @AnthonyFillion-Maillet make sense. I updated the answer trying to embed into a custom object the information related to the last request Commented Jul 30, 2018 at 14:47

1 Answer 1

1

I'll try to draw a possible solution:

public class Callback implements OnSuccessListener<DocumentSnapshot> {
     private final ObservableEmitter<DocumentSnapshot> emitter;
     private final boolean last;

     public Callback(boolean lastvalue, ObservableEmitter<DocumentSnapshot> e) {
        this.last = lastvalue;
        this.emitter = e;
     }

     @Override
     public void onSuccess(DocumentSnapshot value) {
         emitter.onNext(value);
         if (last) {
             emitter.onComplete();
         }
     }
}


Observable<DocumentSnapshot> observable = Observable.create(new ObservableOnSubscribe<DocumentSnapshot>() {
        @Override
        public void subscribe(ObservableEmitter<DocumentSnapshot> e) throws Exception {
            int i = 1;
            for (Result result : resultList) {
                /* callback object now knows which is the last request so it can emit the onComplete */
                Callback callbackInstance = new Callback(resultList.size() == i, e);
                i++;
                FirebaseFirestore.getInstance().collection(COLLECTION_NAME)
                            .document(result.getId()).get().addOnSuccessListener(callbackInstance);
                }
            }
        });

then when the subscriber's onComplete action is hit all the requests to Firebase should be completed.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.