2

I am trying to iterate over an array of Maps and do some asynchronous actions. I have tried a few things using the RxJava library, but everything I've tried seems to be synchronous. I am trying to avoid creating new threads manually and want to let RxJava handle it. This is what I've tried so far.

Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.computation())
            .forEach(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.computation())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

When I run unit tests with the code above I see the following output.

1
2
1
2
1
2
...

What I want to see is

1
1
1
... 
2
2
2

How do I iterate asynchronously over a Map array using RxJava?

1
  • Asyncronous does not mean out of order. When you apply Schedulers, the operations are executed on the Scheduler you specify, but the result is still a stream that has an order. Commented Apr 19, 2019 at 7:27

2 Answers 2

2

You can achieve it changing from Observable to Flowable and use parallel:

        Flowable.fromIterable(array)
                .parallel(3) // number of items in parallel
                .runOn(Schedulers.newThread()) // the desired scheduler
                .map(item -> {
                    try {
                        System.out.println(1);
                        Thread.sleep(3000);
                        System.out.println(2);
                    } catch (Exception e) {

                    }

                    return Completable.complete();
                })
        .sequential().subscribe();
Sign up to request clarification or add additional context in comments.

1 Comment

That did the trick. I'm using doOnNext, doOnError, and doOnComplete to do the actions and logging.
1

If you're stuck using RxJava 1.x then you wont have access to the Flowable class. This wasn't my case, but something like the below code can do parallel actions. There is more nesting, but it works.

    final ExecutorService executor = Executors.newFixedThreadPool(2);
    List<String> iterableList = new ArrayList<>();
    iterableList.add("one");
    iterableList.add("two");
    iterableList.add("three");
    iterableList.add("4");
    iterableList.add("5");
    iterableList.add("6");
    iterableList.add("7");
    iterableList.add("8");
    iterableList.add("9");
    Observable.from(iterableList)
            .flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.from(executor))
                    .doOnNext(numString -> {
                        try {
                            System.out.println(1);
                            Thread.sleep(500);
                            System.out.println(2);
                        } catch (Exception ex) {
                        }
                    })
            )
            .subscribe();

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.