0

I have multiple dependend calls that I would like to solve with RxJava:

  1. Get all entities that have a file that is not yet uploaded
  2. Upload the file to the server
  3. Update the entity with the received url
  4. Upload the entity to the server

I tried different approaches but coulden't make it work. I can't get my head around on how to wait until the file upload has been finished. Here is my current code:

Observable.fromIterable(repository.getFileNotUploaded()) // Returns a list of all entities that should be uploaded to the server
    .flatMap(entity ->
            restService.uploadFile(new File(directory.getPath(), entity.getLocalPath()))
                    .subscribe(fileUrl -> {
                        entity.setFileUrl(fileUrl);
                        repository.update(entity);
                    }));
// TODO: Wait until all files have been uploaded and the entities have been stored
// locally. Then upload the list of all entities. 

Rest call:

public Observable<String> uploadFile(File file) {

    return Observable.create(emitter -> {
        AsyncTask.execute(new Runnable() {
            @Override
            public void run() {
                commClient.sendFileRequest(URL, file,
                        response -> {
                        // ...
                        if(success){
                                emitter.onNext(new String(response.data));
                        }

                            emitter.onComplete();
                        }
            }
        });
    });
}

Also I read that calling subscribe inside a flat map is an anti pattern. How can I cascade my method calls? Should I use the range method and return the current positoin of my iteration?


Edit - Working solution thanks to Emanuel S

The solution of Emanuel S works. I had also to change my rest service. This service must return an Observable<Entity> and not an Observable<String>. Also note that one should not mix up AsyncTask and Rx.

public Observable<Entity> uploadFile(File file, Entity entity) {
//...
entity.setFileUrl(fileUrl);
emitter.onNext(entity);
//...

1 Answer 1

2

If repository.getFileNotUploaded() is an ArrayList/Collection you should create your Observable with just and iterate after.

This may work (untested, wrote it without IDE) and upload all your entites in a bunch.

As akarnokd wrote, you dont need to use just and flatmapIterable, since you can just use fromIterable.

Observable.just(repository.getFileNotUploaded()).flatMapIterable ( files -> file)
// OR 
Observable.flatmapIterable(repository.getFileNotUploaded())

.flatMap(entity -> rs.uploadFile(new File(yourPath, entity.getLocalPath())) // for each file not uploaded run another observable and return it
.map(entity -> { entity.setFieldUrl(fileUrl); return entity; }) // modify the entity and return it
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList))
.subscribe( 
    success -> Log.d("Success", "All entities and files uploaded"), 
    error -> Log.e("Error", "error happened somewhere")
);

If you want to upload each modified entity with a single call you may want to replace

.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList ))

with

.flatMapIterable( singleEntity -> uploadSingleEntity(singleEntity))

And dont mix asyntask with RxJava. You dont need AsyncTask if you have RXJava.

Take care: If you stream your Data and if your repository Observable emit data you need to use

repository.getFileNotUploaded() // Observable<Whatever> emit data in a stream.
.flatMapIterable ( files -> file) ...
Sign up to request clarification or add additional context in comments.

9 Comments

repository.getFileNotUploaded returns a List<Entity>. If I use Observable.just I get the whole list (entities) and not just one entity inside the flatMap and map.
Your'r right, tested and updated. Usually i stream my data (Kotlin). Anyway, should work now.
It's up to him. Ive learned: first create an Observable with the values, then flattern it. Can't remember why, but i'm sure there's a difference. Oh, and it's cool with Kotlin. list.toObservable() :P. Ah, i remember. Because usually i split Observables for readability and/or stream them.
Thanks for your awesome answer! I have a problem with with the .map part. The entity here is a string and not an object. Do I have a problem in my rest request, because I return there a string?
The returnValue of your uploadFile must be an Entity. you map/get the return Value and transform it to another value/type
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.