0

I have a bunch of observables that I'm running/subscribing to in a method (about 9 to be exact), that itself returns an observable. I narrowed it down to 2 observables for the purpose of this question. Here is my method that I created that returns an Observable that contains the other observables.

public static Observable<MyCustomObject> runAll(Service networkService) {
        return Observable.create(subscriber -> {

            networkService.getOne().subscribe(response -> {
                Request request = response.raw().request();
                MyCustomObject case = new MyCustomObject(request);
                subscriber.onNext(case);
            }, exception -> {
                throw new OnErrorNotImplementedException(exception);
            });


            networkService.getTwo().subscribe(response -> {
                Request request = response.raw().request();
                MyCustomObject case = new MyCustomObject(request);
                subscriber.onNext(case);
            }, exception -> {
                throw new OnErrorNotImplementedException(exception);
            });

            subscriber.onComplete();
        });
    }

I then use the Observable that's returned...

       runAll(networkService)
.subscribeOn(Schedulers.io())
.subscribe(case -> {
           //do stuff

        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                //handle error
            }
        });

I'm not sure if I'm creating an observable correctly. I'm essentially replacing a listener/interface that I had here before with an Observable, but I know using Observable.create() is something that is not easy. Should I do it another way? Is there anything wrong with doing what I'm doing?

EDIT: getOne() and getTwo() are network calls so they return Observables.

EDIT 2: Currently have this

public static Observable<MyCustomObject> quickRun(Service networkService) {
    return Observable.concat(
            networkService.getOne().map(response -> {
                Request request = response.raw().request();
                MyCustomObject case = new MyCustomObject(request);
                return case;
            }),
            networkService.getTwo().map(response -> {
                Request request = response.raw().request();
                MyCustomObject case = new MyCustomObject(request);
                return case;
            })
    );
}
2
  • Do all 9 Observables return the same type? Do they depend on each other, or can run independently? Commented Jul 5, 2017 at 2:15
  • There may be one observable that returns a different type, but for the most part I can make it work (I can probably wrap both results in another type or something), and each one can run independently. Commented Jul 5, 2017 at 2:28

1 Answer 1

1

Yes, you should do it in a different way.
Whenever you're treating Observable like regular async callback this is a smell that you are not acting 'reactive'-ly.
As you said Observable.create() is non-trivial way with some pitfalls for creating Observable, that being said, you probably use older version of RxJava 1, with newer version (1.3+ I think), and with RxJava2, create is based on emitter and is safer. you can read here about the pitfalls of Create and the emitter approach. (as a side note with RxJava2 has another way of extending Observable).

All of those ways are to bridge between the async callback world to Reactive world, and wrap it any kind of async operation with Observable.

As for your case, as you have already Observables at your hand, what you need is to compose them together to single stream - Observable. Rx has many operators for this purpose, according to your example Observable.merge seems the appropriate operator, all your Observable will run in asynchronously (note, you will need to apply to each of them the IO scheduler for that), and each Observable will emit its result on the merged stream, when all of the Observable will finish - onCompleted will be called on the merged Observable, which is by the way wrong at your example as it's called at the very start just after you've been fire all tasks.

Observable.merge(
        networkService.getOne()
                .subscribeOn(Schedulers.io()),
        networkService.getTwo()
                .subscribeOn(Schedulers.io())
)
        .map(response -> {
            Request request = response.raw().request();
            return new MyCustomObject(request);
        })
        .subscribe(customObject -> {
                    //do stuff
                }, throwable -> {
                    //handle error
                }
        );
Sign up to request clarification or add additional context in comments.

8 Comments

Actually, in my case, I want them all to run synchronously... how does that change things?
the same approach applies, just use a different operator - Observable.concat()
So I would do Observable.concat( networkService.getOne() .map(), networkService.getTwo() .map() )? Not really sure about your order of defining via a merge, and then mapping...
yes, assuming converting the response is the same for each service, you can apply it to globally to each emission, if not you should apply it separately to each network service observable.
Aha. So every response is different (response is the response from a network call), so I would need to have every individual response map to a MyCustomObject. If the mapping is identical for each, is there an easier way to do it so I don't keep repeating myself?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.