With RxJava, how to sequentially execute asynchronous methods on a list of data?
Using the Node.js Async module it is possible to call a function over an array of objects sequentially in the following fashion:
var dataArray = ['file1','file2','file3']; // array of arguments
var processor = function(filePath, callback) { // function to call on dataArray's arguments
fs.access(filePath, function(err) { // perform an async operation
// process next item in dataArray only after the following line is called
callback(null, !err)
}
};
async.every(dataArray, processor, function(err, result) {
// process results
});
What's nice about this is that code executed within the processor function can be asynchronous and the callback can be run only once the async task is finished. This means that each object in dataArray will be processed one after another, not in parallel.
Now, in RxJava I can call a processing function over dataArray by calling:
String[] dataArray = new String[] {"file1", "file2", "file3"};
Observable.fromArray(dataArray).subscribe(new Consumer<String>() { // in RxJava 1 it's Action1
@Override
public void accept(@NonNull String filePath) throws Exception {
//perform operation on filePath
}
});
However, if I perform an asynchronous operation on filePath, how can I ensure the sequential execution on items of dataArray? What I'd be looking for is something along the lines of:
String[] dataArray = new String[] {"file1", "file2", "file3"};
Observable.fromArray(dataArray).subscribe(new Consumer<String>() {
// process next item in dataArray only after the callback is called
@Override
public void accept(@NonNull String filePath, CallNext callback) throws Exception {
// SomeDatabase will call callback once
// the asynchronous someAsyncOperation finishes
SomeDatabase.someAsyncOperation(filePath, callback);
}
});
And furthermore, how do I call some code once all items of dataArray have been processed? Sort of a 'completion listener'?
Note: I realise I'm probably getting the RX concept wrong. I'm asking since I can't find any guideline on how to implement the Node.js' Async pattern I mentioned above using RxJava. Also, I'm working with Android hence no lambda functions.