1

I tried converting my AsyncTask code below to Rxjava2, but apparently Rxjava2 doesn't deal with null values and hence my app crashes. Here's my code for AsyncTask:

new AsyncTask<Void, Void, Void>() {
            @Override
            protected Void doInBackground(Void... params) {
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                    Participant participant = entry.getValue();
                    participant.release();
                }
                return null;
            }

            @Override
            protected void onPostExecute(Void aVoid) {
                cb.event(new Spin.Event<Void>());
            }
        }.execute();

And here's the converted code to Rxjava (NOT Rxjava2) :

 Observable.defer(new Func0<Observable<Void>>() {
        @Override
        public Observable<Void> call() {
            Set<Map.Entry<String, Participant>> entries = pool.entrySet();
            for (Map.Entry<String, Participant> entry : entries) {
                Participant participant = entry.getValue();
                participant.release();
            }
            return Observable.just(null);
        }
    }).doOnCompleted(new Action0() {
        @Override
        public void call() {
            cb.event(new Spin.Event<Void>());
        }
    })
    .subscribeOn(Schedulers.computation())
    .subscribe();

What would be the best approach to convert it to Rxjava without it crashing on returning null. Also, how does the .execute() play with respect to Rxjava2? not sure if that even works in Rxjava?

Here's the crash log:

FATAL EXCEPTION: RxComputationThreadPool-3

                                                                           io.reactivex.exceptions.OnErrorNotImplementedException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
                                                                               at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
                                                                               at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:35)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
                                                                               at java.lang.Thread.run(Thread.java:818)
                                                                            Caused by: java.lang.NullPointerException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842) 
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
                                                                               at java.lang.Thread.run(Thread.java:818) 
4
  • It's not clear what here is null. Add stacktrace please Commented Dec 12, 2017 at 1:21
  • Please post your stack trace, aslo you need to add .observeOn(AndroidSchedulers.mainThread()) too. Commented Dec 12, 2017 at 1:22
  • I will post it shortly. Also, return Observable.just(null); is this allowed in rxjava 2? Commented Dec 12, 2017 at 1:24
  • thanks @Geros , please check out the crash log guys.(just edited my question above). Also, is there a better approach to convert my origiinal asynctask code to rxjava 2? Commented Dec 12, 2017 at 1:31

3 Answers 3

2

Since you don't have values to post back to the main thread, you can use Completable:

Completable.fromAction(() -> {
    Set<Map.Entry<String, Participant>> entries = pool.entrySet();
    for (Map.Entry<String, Participant> entry : entries) {
        Participant participant = entry.getValue();
        participant.release();
    }
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    () -> {
         cb.event(new Spin.Event<Void>());
    }, 
    error -> { /* show error toast */ }
);
Sign up to request clarification or add additional context in comments.

Comments

1
Observable.defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                   Participant participant = entry.getValue();
                   participant.release();
                }
                return Completable.complete().toObservable();
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.d("Complete", "Complete");
            }
        })
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread()).subscribe();

This code will work too. Calling subscribe() method will start the job.

9 Comments

is there no other way without using completeable to fix my code and get it to work? I tried adding subscribeon(androidschedulers.mainthread()) but that caused item is null crash. There must be a way to improve what i already have right?
You can replace Completeable of the above code with Observable<Void>
I cant really use cb outside the function in somethingdone as it is a part of the main function Callback<Void> cb inside which I am calling the observable. Cant I use everything in one place like I am currently doing, but with some form of error handling inside? it throws error not handled exception and item is null often.also how do i convert .execute() in my asyntask to rx?
keep getting this : io.reactivex.exceptions.OnErrorNotImplementedException: The item is null at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept
thanks! this seems to work, however just out of curiosity, what advantages does rxjava 2 have over the previous rxjava or even asynctask per say? will it improve my app performance?
|
0
Observable.defer(new Callable<ObservableSource<?>>() {

 //This method is replacing doInBackground
        @Override
        public ObservableSource<?> call() throws Exception {  
            Set<Map.Entry<String, Participant>> entries = pool.entrySet();
            for (Map.Entry<String, Participant> entry : entries) {
               Participant participant = entry.getValue();
               participant.release();
            }
            return Completable.complete().toObservable();
        }
    }).doOnComplete(new Action() {
     //This is onPostExecute
        @Override 
        public void run() throws Exception {
            Log.d("Complete", "Complete");
        }
    })
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread()).subscribe()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.