4

I listened to this talk https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274

And eared that I should avoid creating an Observable using the create method, because it doesn't handle unsubscription and backpressure automatically, but I can't find an alternative to use in the code bellow.

compositeSubscription.add(
    Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() {
        @Override
        public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) {

            modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() {
                @Override
                public void onResult(DTOCompaniesCallback dtoCompaniesCallback) {
                    try {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(dtoCompaniesCallback);
                            subscriber.onCompleted();
                        }
                    } catch (Exception e) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(e);
                        }
                    }
                }
            });

        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<DTOCompaniesCallback>() {
        @Override
        public void call(DTOCompaniesCallback dtoCompaniesCallback) {
            Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size());
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            throw new OnErrorNotImplementedException("Source!", throwable);
        }
    })
);

And I call clear the CompositeSubscription in the OnDestroy method

@Override
public void onDestroy() {
    if (compositeSubscription != null) {
        compositeSubscription.clear();
    }
}

Do you see any alternative to the create method that I could use here? Do you see any potential danger or is this approach safe? Thanks

1 Answer 1

6

You can use defer + AsyncSubject:

Observable.defer(() -> {
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
    modelTrainStrike.getCompaniesFromServer(v -> {
        async.onNext(v);
        async.onComplete();
    });
    return async;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
...

In case the getCompaniesFromServer supports cancellation, you can:

Observable.defer(() -> {
    AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
    Closeable c = modelTrainStrike.getCompaniesFromServer(v -> {
        async.onNext(v);
        async.onComplete();
    });
    return async.doOnUnsubscribe(() -> {
        try { c.close(); } catch (IOException ex) { }
    });
})
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.