1

I have something like this currently

onValueChange() {
  let someObj = //dynamically created
  this.processUsers(someObj);
}

processUsers(someObj) {
  from(Object.keys(someObj))
    .pipe(
      concatMap(key => this.getUsers(key)
    )
    .subscribe(res => ...)
}

onValueChange is a method called when something in a form changes and this triggers the method processUsers with an obj that was constructed when onValueChange is called. processUsers then uses this obj to do some async tasks.

This works OK if there is a single trigger but if when processUsers is called consecutively with different objects, the async tasks gets messed up.

So, how can I use rxjs to complete processUsers to be done with the current obj before taking in the next? My first thought is to use an array that I would push someObj into and processUsers should subscribe to the array but I'm not sure how to approach this.

1 Answer 1

1

It sounds like you want to handle all your invocations of onValueChange in a single stream. In this case, you could use a Subject to push your objects through. This allows you to subscribe to a single observable to process your items:

private obj$ = new Subject();

onValueChange() {
  let someObj = ...
  this.obj$.next(someObj);
}

private processUsersSub = this.obj$.pipe(
  switchMap(obj => this.processUsers(obj)) // depending on desired behavior you could
).subscribe();                             // use mergeMap, exhaustMap, concatMap

private processUsers(someObj) {
  return from(Object.keys(someObj)).pipe(
    concatMap(key => this.getUsers(key)
  );
}

Depending on the behavior you want when a new emissions of obj$ occurs before the previous one is completed, you can choose the appropriate operator:

  • switchMap - switch to new source and discard any remaining processing
  • exhaustMap - discard any new emissions that occur while processing is in progress
  • concatMap - continue processing old items until complete, then process new items
  • mergeMap - process all items old and new, all at the same time (can specify concurrency limit)
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you! this works for me, but I was wondering why processUsersSub assignment is being done here.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.