14

I am trying to implement an asynchronous task using RxJava in Android. I tried the following code and it didn't work. It executes on the UI thread. I am using the following version of RxAndroid 0.24.0.

try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

However, the following works asynchronously for me.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

I am trying to understand the following:

  1. What is the difference between them?
  2. What is the best practice while creating async tasks?

2 Answers 2

21
  1. What is the difference between them?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

This is equivalent to the following:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

As you can see this makes the synchronous method call first, then passes it to Observable.just to become an Observable.

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

This method, however, will run the code in the call block on subscription. You've told it you want to subscribe on a new thread (subscribeOn(Schedulers.newThread())), so the subscription happens on a new thread, and the code which gets run on subscription (the call block) gets run on that thread too. This is similar behaviour to calling Observable.defer.

  1. What is the best practice while creating async tasks?

Well, that's up to you and your desired behaviour. Sometimes you want the async code to begin running immediately (in which case you may want to cache it using one of the operators for that purpose). I'd definitely consider using the Async Utils library for this.

Other times, you'll want it to run only on subscription (which is the behaviour in the examples here) - for example if there are side-effects, or if you don't care when it's run and just want to use the built-ins to get something off the UI thread. Dan Lew mentions that Observable.defer is very handy for taking old code and getting it off the UI thread, during a conversion to Rx.

Sign up to request clarification or add additional context in comments.

4 Comments

No problem! The other answers gave you a solution, but didn't really give much of an explanation. I'd recommend going with the built-in defer over adding the extra library, especially as it's for Android - keep that APK size down :)
Thanks, I didn't realized that. I think this should be the recommended approach. :)
Async Utils library seems really old, and not updated for almost a year.
It's a small library and if it's stable there's no need to update it, though I understand the sentiment. They're currently accepting pull requests so it's definitely not dead! Everything that the library does you can build yourself anyway, but it's especially useful to help out when you're starting out with Rx.
11

Use Async.start() from RxJava Async Utils library. This will call the function you provide on another thread.

Example:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

As you note, checked exceptions must be wrapped into RuntimeExceptions.

See also https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

4 Comments

Thanks... this is really helpful. Would be great, if you have any pointers on the difference on my original code?
I can answer. Observable.just() allows creating an observable from an existing result. If you write Observable.just(myFunction()), myFunction() is is executed and then the result is passed to the just method (as in any Java expression). So this is not asynchronous at all. On the other hand, when you call Async.start(), your computation method is not called yet. It will be called later in another thread, when Rx will call func.call() (where func is the parameter provided to Async.start()).
One note on Async.start, it will immediately start the work, even before subscribe, which sometimes is exactly what you want. If you want to wait until subscribe, use Async.toAction(...).call() or Observable.defer. In this case, defer would handle the exception cleaner since you can return Observable.error(new MyException()); instead of throwing it in there, although a thrown exception would produce the same result.
@lopar, RxJavaAsyncUtil provides override start signature that gets also a Scheduler which is "Scheduler to run the function on". can I trust that detail?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.