0

With RxJava, how to sequentially execute asynchronous methods on a list of data?

Using the Node.js Async module it is possible to call a function over an array of objects sequentially in the following fashion:

var dataArray = ['file1','file2','file3']; // array of arguments
var processor = function(filePath, callback) { // function to call on dataArray's arguments
    fs.access(filePath, function(err) { // perform an async operation
        // process next item in dataArray only after the following line is called
        callback(null, !err)
    }
};
async.every(dataArray, processor, function(err, result) {
    // process results
});

What's nice about this is that code executed within the processor function can be asynchronous and the callback can be run only once the async task is finished. This means that each object in dataArray will be processed one after another, not in parallel.

Now, in RxJava I can call a processing function over dataArray by calling:

String[] dataArray = new String[] {"file1", "file2", "file3"};
Observable.fromArray(dataArray).subscribe(new Consumer<String>() { // in RxJava 1 it's Action1
    @Override
    public void accept(@NonNull String filePath) throws Exception {
        //perform operation on filePath
    }
});

However, if I perform an asynchronous operation on filePath, how can I ensure the sequential execution on items of dataArray? What I'd be looking for is something along the lines of:

String[] dataArray = new String[] {"file1", "file2", "file3"};
Observable.fromArray(dataArray).subscribe(new Consumer<String>() {
     // process next item in dataArray only after the callback is called
    @Override
    public void accept(@NonNull String filePath, CallNext callback) throws Exception {
         // SomeDatabase will call callback once
         // the asynchronous someAsyncOperation finishes
         SomeDatabase.someAsyncOperation(filePath, callback);
    }
});

And furthermore, how do I call some code once all items of dataArray have been processed? Sort of a 'completion listener'?

Note: I realise I'm probably getting the RX concept wrong. I'm asking since I can't find any guideline on how to implement the Node.js' Async pattern I mentioned above using RxJava. Also, I'm working with Android hence no lambda functions.

1
  • Have you looked at concatMap()? Please try something like this Observable.from(fileNameList).concatMap((fileName) -> processFile(fileName).susbscribeOn(Schedulers.io())).subscribe() Commented Mar 14, 2017 at 15:43

3 Answers 3

1

After a couple of days of trial and error, the solution I used for this question is as follows. Any suggestions on improvement are most welcome!

private Observable<File> makeObservable(final String filePath){
    return Observable.create(new ObservableOnSubscribe<File>() {
        @Override
        public void subscribe(final ObservableEmitter<File> e) throws Exception {
            // someAsyncOperation will call the Callback.onResult after
            // having finished the asynchronous operation.
            // Callback<File> is an abstract code I put together for this example
            SomeDatabase.someAsyncOperation(filePath, new Callback<File>(){
                @Override
                public void onResult(Error error, File file){
                    if (error != null){
                        e.onError(error);
                    } else {
                        e.onNext(file);
                        e.onComplete();
                    }
                }
            })
        }
    });
}

private void processFilePaths(ArrayList<String> dataArray){
    ArrayList<Observable<File>> observables = new ArrayList<>();
    for (String filePath : dataArray){
        observables.add(makeObservable(filePath));
    }
    Observable.concat(observables)
            .toList()
            .subscribe(new Consumer<List<File>>() {
                @Override
                public void accept(@NonNull List<File> files) throws Exception {
                    // process results.
                    // Files are now sequentially processed using the asynchronous code.
                }
            });
}

In short what's happening:

  1. Turn dataArray into an array of Observable's
  2. Each Observable performs the asynchronous operation at the time of subscription and feeds the data to onNext after the async operation finishes
  3. Use Observable.concat() on the array of Observable's to ensure sequential execution
  4. Use .toList() to combine the results of Observable's into one List

While this code does what I require, I'm not entirely satisfied with this solution for a few reasons:

  1. I'm not sure if executing the asynchronous code within the Observable.create()...subscribe(){ is the right way to use Observable's.
  2. I read that using Observable.create should be used with caution (ref docs)
  3. I'm not sure the behaviour I'm trying to achieve requires creating a new Observable for each item in the dataArray. It seems more natural to me to have one observable that is capable of releasing data sequentially - but again, that may be just my old style of thinking.
  4. I've read that using concatMap should do the trick, but I could never figure out how to apply it to this example, and found concat doing just the trick. Anyone caring to explain the difference between the two?

I also tried using .zip function on the array of Observable's before, and while I managed to get the List of results at the end, the async operations were executed in parallel not sequentially.

Sign up to request clarification or add additional context in comments.

Comments

0

One very simple and easy way to perform asynchronous operations (calculations etc.) is to use the RxJava Async Utils library (not written by me, just used in our code).

  • Easiest way to run a method in a background thread is to use Async.start(this::methodToCall, Schedulers.io()) which returns an Observable<T> that produces the return value of the passed method after the method call completes.
  • There are many other alternative methods that allow you to e.g. report intermediate results via an Observer<T>

If you want to use Observable.from() and concatMap() (or similar ways) remember to first move the processing to a background scheduler with .observerOn(Schedulers.io()) or any other scheduler you want.

2 Comments

concatMap() lead me to the right path thanks! As a side note: RxJavaAsyncUtils I've seen before yet couldn't find any documentation on it hence I decide to avoid using it for now. Also, this::methodToCall wouldn't work, it's Java 7 I'm needing to use. Thanks for suggesting either way!
You can use the old way of using an anonymous class implementation of Func0<T>, you don't have to use the lambda syntax. Or even better - you can use Retrolambda which converts Java 8 lambda syntax to Java 7 classes during compile time. I use that myself for Android apps. Async Utils also has a Javadoc which explains the interface, it's just not linked in the project page, which they should fix.
0

To call a function over an array of objects sequentially, you need not any async facilities. Just make a for-loop over the array. If you need that loop to execute asynchronously, then please describe what asynchrony you need: either to start the loop after array is asynchronously filled, or you want to react asynchronously to the result of the computation, or both. In either case, class CompletableFuture may help.

1 Comment

Well correct, calling a function over an array sequentially could use a simple for-loop, but the code executed within the loop is asynchronous and after the loop I need to process the updated objects from the array. I've been needing this functionality in various places, but one I'm working on right now is a function that given a list of Post objects fetches appropriate images from the server with Glide without causing timeouts that parallel fetching causes with a higher number of simultaneous requests. I don't mind losing speed for sake of robustness here.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.