0

I am having trouble getting the sum (or any reduction) of an inner number property of an Observable within another Observable.

I have an Observable array of "Account" objects (Observable<AppAccount[]>).

export interface AppAccount {
    _id?: string;
    name: string;
}

And an Observable array of "Balance" objects, each with an accountId. many balances can be associated with an account (sorted/filtered by date, but that piece is removed for brevity)

export interface AccountBalance {
    _id?: string;
    accountId: string;
    amount: number;
}

I have a helper method which returns just the last Balance object's Amount for a given Account.

getLastAmount(account: AppAccount): Observable<number> {
    return this.balanceService.balances$.pipe(
      map(balances => {
        let last = balances.filter(balance => {
          return balance.accountId === account._id;
        }).sort().pop();
        //console.log(last)
        return last ? last.amount : 0;
      }),
      tap(amount => console.log(`getLastAmount() => ${amount}`)),
    );
  }

Now I am trying to write a method which will loop through the Accounts, call getLastAmount() for each, and then sum them all and return an Observable. This is what I have managed so far:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => from(accounts)),
      mergeAll(),
      mergeMap(account => this.getLastAmount(account)),
      reduce((sum, current) => {
        console.log(`${sum} + ${current}`);
        return sum + current;
      }, 0)
    );
  }

But this seems to never return, and gets stuck in an infinite loop??

With just one account and one balance associated, with the balance having an 'amount' of '10', I get this from my console log: '0 + 10' over and over, and the network log also confirm it is calling getBalances() continuously.

Am I on the right track? Is there a better way? Why does this RXJS pipe get stuck in a loop?

EDIT: I've made some changes based on picci's suggestions:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => accounts.map(account => this.getLastAmount(account))),
      concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
      tap(balances => console.log('balances', balances)),
      map(balances => balances.reduce(
        (amountSum, amount) => {
          console.log(`${amountSum} + ${amount}`)
          amountSum = amountSum + amount;
          return amountSum
        }, 0))
    );
  }

But this is still not returning, or the pipe is not completing? I've made a stackblitz here: https://stackblitz.com/edit/angular-rxjs-nested-obsv If you check the console output, it seems to not get any further than the forkJoin call...

1
  • 1
    Not sure about the infinite loop, but I think it does not return because you're using reduce, which will send the reduced value when the source(accounts$ in this case), completes. If you want to receive values at each reduce iteration, you might want to use scan. Commented Jul 21, 2020 at 18:13

2 Answers 2

1

If I understand right, you could proceed like this

// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()

// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
  // you transform an array of accounts in an array of Observable<number> representing the last Balance amount
  map((accounts: Account[]) => {
    // use the getLastAmount function you have coded
    return accounts.map(account => getLastAmount(account, myDate))
  }),
  // now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
  // and forkJoin which actually executes the Observables in parallel
  concatMap(accounts$ => forkJoin(accounts$)),
  // now that we have an array of balances, we reduce them to the sum using the Array reduce method
  map(balances => balances.reduce(
    (amountSum, amount) => {
      amountSum = amountSum + amount;
      return amountSum
    }, 0)
  )
)

// eventually you subscribe to the amountSum$ Observable to get the result
amountSum$.subscribe({
  next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
  error: console.err,
  complete: () => console.log("I am done")
})

There may be other combinations that bring to the same result, but this seems to work and can be checked in this stackblitz.

If you are interested in some frequent patterns of RxJS with http calls, you may want to read this blog.

Sign up to request clarification or add additional context in comments.

3 Comments

the line concatMap(accounts$ => forkJoin(accounts$)), is a useful trick, but the pipe does not seem to proceed past this rxjs function. I've created a stackblitz here: stackblitz.com/edit/angular-rxjs-nested-obsv
You simulate http with BehaviorSubject and not with the of function of RxJS. This causes the issue. Here the explanation. forkJoin emits when all its input Observables complete. The Observable returned by the http client emits one single value and then completes, so does the of function. A Subject never completes. So, if you pass a Subject into forkJoin, the result will never emit and the pipe will not proceed. Substitute get balances$() { return this._balances$.asObservable(); } with get balances$() { return of(this.balances); } in AccountBalanceService and it works.
Aha! the BehaviourSubjects never completing was exactly the issue. I didn't want to change the structure of my service, but from that hint I found that just using combineLatest instead of forkJoin works great!
1

Well - first I don't think you should be using obsevables like that.

If you only need to the totalBalance you could use something like this (:

  private appAcount$ = from<AppAccount[]>([
    { _id: '1', name: 'user-1' },
    { _id: '2', name: 'user-2' },
    { _id: '3', name: 'user-3' },
  ]);

  // this would be your http call
  public getBalances(accountId: string): Observable<AccountBalance[]> {
    const ab = [
      { _id: '1', accountId: '1', amount: 100 },
      { _id: '2', accountId: '2', amount: 200 },
      { _id: '3', accountId: '2', amount: 300 },
      { _id: '4', accountId: '3', amount: 400 },
      { _id: '5', accountId: '3', amount: 500 },
      { _id: '6', accountId: '3', amount: 600 },
    ];

    return of(ab.filter(x => x.accountId === accountId));
  }

  lastAmounts$: Observable<AccountBalance[]> = this.appAcount$
    .pipe(
      switchMap(appAccount => 
        this.getBalances(appAccount._id)
          .pipe(
            // replace this with your date filter
            map(balances => [balances[balances.length - 1]]) 
          )
      ),
      scan((acc, c) => [ ...acc, ...c ])
    );

  totalBalance$ = this.lastAmounts$
    .pipe(
      map(x => x.reduce((p, c) => p += c.amount, 0))
    )

If you only need the total balance, the you could just subscribe to the totalBalance$ observable.

Let me say though, that I wouldn't recommend doing an HTTP call for each appAccount if you can do a batch fetch of all AccountBalances of all the appAccounts you have - this way you can just use combineLatest for both appAccounts$ and balances$.

1 Comment

I ended up following the advice of batch fetching the balances, and using combineLatest in a tricky way ended up doing the trick. Thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.