2

I have a foreach loop in which I might need an http request to get some info. I tried using a forkjoin inside the foreach loop to 'make the foreach loop wait for the observables' (in this example there's only one observable, but I will have more in reality...). BUT with this code, the foreach each keeps running without waiting for the observables to complete, and I cannot find any solution to prevent this.

So your help is much appreciated!!!

    ...
    let wishes: Wish[] = [];
    response.wishes.forEach((wish) => {
          let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
                   
          let personObservables: Observable<any>[] = [];
          if (wish.reservation){
            personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
          } else {
            personObservables.push(of(null));
          }
          forkJoin(personObservables).subscribe( ([reservedBy]) => {
            if (reservedBy) {
              newWish.reservation = {
                ...wish.reservation,
                reservedBy
              };
            }
            wishes.push(newWish);
          } );
        });
    ...

EDIT: Full blown working solution without foreach loop. It's much easier to use map operator in pipe and map function on array. I learned that it's easier to split this kind of logic over several operators instead of trying to fix it all in 1 map operator...

    public getWishlist ( receiver : Person) : Observable<Wish[]> {
        return this.http$.get<IWishlistResponse[]>(
          environment.apiUrl + 'wishlist/' + receiver.id
        ).pipe(
          
          // Create wish instances from each wish in API response and save reservation for later use
          map( wishlistResponse => 
            wishlistResponse[0].wishes.map(wish => ({
              wish: this.createWishInstanceFromResponse(wish, receiver),
              reservation: wish.reservation
            }))),
          
          // For each wish with reservation: get person info for 'reservedBy' id
          map( wishesAndReservationObjects => wishesAndReservationObjects.map( ({wish, reservation}) => 
            !reservation ? 
            of(wish) : 
            this.peopleService.getPersonById(reservation.reservedBy)
            .pipe(
              map ( reservedBy => {
                if (reservedBy) wish.reservation = { 
                  ...reservation, 
                  reservedBy: new Person(reservedBy.id, reservedBy.firstName, reservedBy.lastName)
                }
                return wish;
              })
            )
           )),
          
          // forkJoin all observables, so the result is an array of all the wishes
          switchMap(reservedByObservables => reservedByObservables.length !== 0 ? forkJoin(reservedByObservables) : of(<Wish[]>[])), //https://stackoverflow.com/questions/41723541/rxjs-switchmap-not-emitting-value-if-the-input-observable-is-empty-array
          
          // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
          map ( wishes => wishes.map( wish => {
            wish.setUserIsFlags(this.userService.currentUser);
            return wish;
          })),
          
          // For each wish: get state via API call
          map ( wishesWithoutState => wishesWithoutState.map( wishWithoutState => 
            this.http$.get<wishStatus>(environment.apiUrl + 'wish/' + wishWithoutState.id + '/state')
            .pipe(
              catchError(() => of(null)),
              map( state => {
                wishWithoutState.status = state;
                return wishWithoutState;
              })
            )
          )),
          
          // Combine all stateObservables into 1 array
          switchMap(stateObservables => stateObservables.length !== 0 ? forkJoin(stateObservables) : of(<Wish[]>[]))
        )
      }
    
      private createWishInstanceFromResponse ( wishResponse : IWishResponse, receiver: Person ) : Wish {
        let wish : Wish = new Wish (
          wishResponse._id,
          wishResponse.title,
          wishResponse.price,
          ...
        );
        return wish;
      }
1
  • Maybe remove the forkjoin from the loop altogether? So first build the array, then run the fork? Commented Dec 22, 2020 at 23:48

3 Answers 3

3

As a general rule, never call subscribe on an observable unless you are the final consumer and you're ready to throw the observable away (no more transformations to data required). This usually only happens when displaying to a user or writing to a database.

What your code is doing (what your various calls return) isn't very clear to me. So this is an approximation of how I would do what you appear to be trying.

Something along these lines should work.

Notice how I never subscribe until I'm ready to log the final list of wishes to the console? That's by design, so the information follows as expected.

return this.http$.get<IWishlistResponse[]>(
  environment.apiUrl + 
  'wishlist/' + 
  receiver.id
).pipe(
  map(wishlist => wishlist.wishes.map(wish => ({
    oldWish: wish,
    newWish: new Wish( wish._id, wish.title, wish.price)
  }))),
  map(wishes => wishes.map(({oldWish, newWish}) => 
    !oldWish.reservation ? 
    of(newWish) :
    this.peopleService.getPersonById(oldWish.reservation.reservedBy).pipe(
      map(([reservedBy]) => reservedBy ?
        {
          ...newWish, 
          reservation: {
            ...oldWish.reservation,
            reservedBy
          }
        } :
        {
          ...newWish,
          reservation: oldWish.reservation
        }
      )
    )
  )),
  switchMap(newWishObservables => forkJoin(newWishObservables))
).subscribe(newWishes => console.log(newWishes));

Update # 1: Cleaning up code

Some things that make code easier to maintain, error-check, etc is to outsource the creation of observables into separate functions and keep pipelines "clean(er)" of transformation logic.

Once you've done that, this pattern:

map(things => things.map(thing => observableThing)),
mergeMap(observableThings => forkJoin(observableThings))

can be combined without creating too much clutter

mergeMap(things => forkJoin(
  things.map(thing => observableThing)
))

Here is how I might de-clutter the pipe in your updated code (This is just done quickly on the fly, not tested).

You'll notice I got rid of your code to handle empty forkJoins. I don't think that's really a problem, you can solve the issue later in the pipeline as the observable completes.

You'll also notice that I did some nieve quick error handling in both functions that create observables. Both times I just substituted a default value (Although in different parts of the pipe, to slightly different effect). In practice, you'll probably want to check the error's name or type and respond accordingly.

I just ignore the errors, but some errors shouldn't be quietly ignored. That's up to you.

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response and save reservation for later use
    map( wishlistResponse => 
      wishlistResponse[0].wishes.map(wish => ({
        wish: this.createWishInstanceFromResponse(wish, receiver),
        reservation: wish.reservation
      }))
    ),
    
    // For each wish with reservation: get person info for 'reservedBy' id
    map(wishesAndReservationObjects => 
      wishesAndReservationObjects.map(({wish, reservation}) => 
        this.addReservationToWish(wish, reservation)
      )
    ),
    
    // forkJoin all observables, so the result is an array of all the wishes
    switchMap(reservedByObservables => 
      forkJoin(reservedByObservables)
    ),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    map(wishesWithoutState => wishesWithoutState.map(wishWithoutState => 
      this.addStatusToWish(wishWithoutState)
    )),
    
    // Combine all stateObservables into 1 array
    switchMap(stateObservables =>
      forkJoin(stateObservables)
    ),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}

private createWishInstanceFromResponse( 
  wishResponse : IWishResponse, 
  receiver: Person 
): Wish {
  let wish : Wish = new Wish (
    wishResponse._id,
    wishResponse.title,
    wishResponse.price
  );
  return wish;
}

/***
 * Create an observable that tries to add reservation and 
 * reservedBy to the wish
 ***/ 
private addReservationToWish(wish: Wish, reservation): Observable<Wish>{
  return this.peopleService.getPersonById(reservation?.reservedBy).pipe(
    // if peopleService.getPersonById fails (Say, because reservation was 
    // null), convert the error into a null response, the filter will Ignore
    // the failed call and just return wish.
    catchError(_ => of(null)),
    // Since errors become null, this filters errors. Furthermore if peopleService
    // doesn't error and just emits a null/undefined value instead, this will
    // filter that situation as well
    filter(reservedBy => reservedBy != null),
    // Now we know reservedBy isn't null, so map doesn't need to check
    map(reservedBy => { 
      wish.reservation = reservation;
      wish.reservation.reservedBy = new Person(
        reservedBy.id, 
        reservedBy.firstName, 
        reservedBy.lastName,
        ...
      );
      return wish;
    }),
    // If reservedBy was null (due to error or otherwise), then just emit
    // the wish without a reservation
    defaultIfEmpty(wish)
  );
}

/***
 * Create an observable that tries to add status to the wish
 ***/ 
private addStatusToWish(wish: Wish): Observable<Wish>{
  return this.http$.get<WishStatus>(
    environment.apiUrl + 
    'wish/' + 
    wishWithoutState.id + 
    '/state'
  ).pipe(
    map(state => ({
      ...wish,
      status: state
    })),
    // If http$.get<WishStatus> failed, then just return the wish
    // without a status
    catchError(_ => of(wish)),
  );
}

Now that the map to forkJoin(mapped) transitions look clean enough, I'd probably merge them, but that's up to you.

Here's how that would look:

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response
    // For each wish with reservation: get person info for 'reservedBy' id
    switchMap(wishlistResponse => forkJoin(
      wishlistResponse[0].wishes.map(wish => 
        this.addReservationToWish(
          this.createWishInstanceFromResponse(wish, receiver), 
          wish.reservation
        )
      )
    )),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    switchMap(wishesWithoutState => forkJoin(
      wishesWithoutState.map(wishWithoutState => this.addStatusToWish(wishWithoutState))
    )),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}
Sign up to request clarification or add additional context in comments.

4 Comments

I improved my code based on your answer and it works perfect now. I was also able to clean up my code following your code style. So this was a great learning experience! I added my full solution in my post above. Could you check it and give some tips & tricks for cleaner code?
Could you just tell me where it is good practice to add error handling in this pipe?
I've made an update, error handling is so case-specific that where you add it is hard to give rules of thumb for. In general, I like to do some error handling for common errors each API might return and then do some general handling for unknown/unexpected errors later on.
Wow, thanks a lot for the update! I've gained so much observable knowledge thanks to you. Really appreciate your effort and time to explain it so clearly (and to rewrite my solution into cleaner code :-D). Please follow me so you can answer all my questions ;-)
0

You use from and concatMap to accomplish this.

let wishes: Wish[] = [];
from(response.wishes).pipe(concatMap(wish) => {
      let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
               
      let personObservables: Observable<any>[] = [];
      if (wish.reservation){
        personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
      } else {
        personObservables.push(of(null));
      }
      return forkJoin(personObservables).pipe(tap([reservedBy]) => {
        if (reservedBy) {
          newWish.reservation = {
            ...wish.reservation,
            reservedBy
          };
        }
        wishes.push(newWish);
      }));
    }).subscribe();

3 Comments

Thanks for your answer, @FanCheung . I tried to implement it, but I can't figure out how I can make in work in my full code... This code is part of a pipe of another observable: this.http$.get(...).map(response => code above).switchMap(wishes => ...). If I implement the above, the wishes in switchMap are of type void. So I'm not returning anything from the map function it seems... Full code added in my question...
Need to know the context of the code. seems like you are return all operation from a function. In such case subscribe shouldn't be there.
Yeah I know, I tried without the subscribe, but then I ran into new problems. I managed to get wishes through the whole pipe, but I wasn't able to get them ALL through the pipe. In the subscribe in my component, several wishes 'arrived' but others didn't. I guess that has something to do with all the async calls I'm performing.
0

you can create this function to handle your process:

completeData(items): Observable<any> {
    let $apiCallItems: Observable<any>[] = [];
    const itemsNeedServiceCall: Array<any> = [];
    const itemsCompleted: Array<any> = [];
    items.forEach((item) => {
      if (item.reservation) {
        $apiCallItems.push(this.peopleService.getPersonById(item.reservation.reservedBy));
        itemsNeedServiceCall.push(item);
      } else {
        itemsCompleted.push(item);
      }
    });
    return forkJoin($apiCallItems).pipe(
      map((r) => {
        for (let i = 0; i < r.length; i++) {
          itemsNeedServiceCall[i] = {
            ...itemsNeedServiceCall[i],
            ...r[i],
          };
        }
        return [...itemsCompleted, ...itemsNeedServiceCall];
      })
    );
  }

and then anywhere you want to use you can do like:

this.completeData(response.wishes).subscribe(r => {
    //some code
})

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.