1

Sorry for the title, I couldn't think of a better one.
I've got this piece of code, which basically:

  1. filter for valid (non-null) cron epressions' arrays
  2. map each cron expression to a call to a service

this.formGroup.valueChanges.pipe(
    op.filter(v => !!v.cronExpressions),
    op.map((v): string[] => v.cronExpressions),
    op.map((v: string[]) => v.map(cron =>
            this.cronService.getReadableForm(cron).pipe(
                op.map(this.toDescription),
                op.map((description): CronExpressionModel => ({ cron, description }))
            )
        )
    ),
    // What now?
).subscribe((cronExpressions: CronExpressionModel[]) => ...) // Expected result

I'd like to get, on subscribe(), the array of CronExpressionModel returned from all the services calls.

I can't wrap my head around this.


Current solution, as per Martin answer:

filter(v => !!v.cronExpressions),
map(v => v.cronExpressions),
map(cronExprs => cronExprs.map(c => this.invokeCronService(c))),
mergeMap(serviceCalls => forkJoin(serviceCalls).pipe(defaultIfEmpty([])))
0

2 Answers 2

2

To transform a stream into an array, you can use toArray operator.

Here's a suggestion:

this.formGroup.valueChanges.pipe(
    filter(v => !!v.cronExpressions),
    // transform [item1, item2...] into a stream ----item1----item2----> 
    concatMap((v): Observable<string> => from(v.cronExpressions).pipe(
        // for each of the items, make a request and wait for it to respond
        concatMap((cron: string) => this.cronService.getReadableForm(cron)),
        map(this.toDescription),
        map((description): CronExpressionModel => ({ cron, description })),
        // wait for observables to complete. When all the requests are made, 
        // return an array containing all responses
        toArray()
      )
    ).subscribe((cronExpressions: CronExpressions[]) => ...) // Expected result

Note :

You can use mergeMap instead of concatMap to parallelize the requests. But you need to know what you're doing ;)

Sign up to request clarification or add additional context in comments.

10 Comments

Uhh, that's starting to become complicated hahaha. Could you clarify why you're transforming the string array to an Observable array with the first ConcatMap?
if you use ...map( (sArray: string[ ]) => sArray.map( v=> anObservable)) you'll get something like : [Obs1, Obs2, Obs3...] as output. You don't want to create N streams, I guess. With this implementation, you will have to use a forkJoin, or subscribe to each element of your array of observables. I think that it's more elegant to rely on one single stream. ----RawItem1---RawItem2--- > that you transform into ---RaffinedItem1---RaffinedItem2--- etc. and wait for the stream to end. The idea of using the first concatMap is to always keep one single stream.
Ok, give me a couple minutes the process what you just wrote
So, I was trying out your solution, but apparently toArray is never called, because valueChanges never complete
@LppEdd I didn't know that your original Observable doesn't complete. I just updated my answer, I think now it's working.
|
2

You can just add forkJoin if you don't mind running all requests in parallel:

switchMap(observables => forkJoin(...observables))

Or if you want to run all of them in sequence:

switchMap(observables => concat(...observables).pipe(toArray()))

Instead of switchMap you might want to use concatMap or mergeMap depending on what behavior you want.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.