1

Im trying to create to create a custom rxjs operator. I've already created a few custom operator (e.g. MonoTypeOperatorFunction or just regular Observable, that can take in input as a string, number etc.) and they work fine. My problem is that i want to create an operator that takes in a anonymous function. Like x => x.prop or a predicate.

In this example i want to create an operator that can flatten the elements in an object.

interface B {
  bid: number;
}

interface A {
  aid: number;
  bs: B[];
}

const b1: B = { bid: 1 };
const b2: B = { bid: 2 };

const a1: A = { aid: 1, bs: [b1, b2] };

const a1$ = of(a1);

// I want to combine map and concatMap into a single operator
const result = a1$.pipe(
  map(x => x.bs),
  concatMap(x => x)
).subscribe(x => console.log(x))
// OUTPUT: {bid:1}, {bid:2}

// what i want
// a1$.pipe(many(x => x.bs))

// How i tried to create an operator
// function many<T>(predicate: (input: T) => boolean) {
//   return function<T1>(source: Observable<T1>) {
//     return source.pipe(map(predicate),concatMap(x => x));
//   };
// }
4
  • What doesn't work in the snippet? It looks good to me, but I might be missing something Commented Feb 21, 2021 at 13:50
  • @AndreiGătej I want to combine map and concatMap into a single operator. In my example, i call it many. I will like to have a generic implementation that works for all objects that have a list property if that makes sense Commented Feb 21, 2021 at 15:31
  • If I uncomment the code it gives me this; Argument of type '(input: any) => T' is not assignable to parameter of type '(value: any, index: number) => ObservableInput<any>'. Type 'T' is not assignable to type 'ObservableInput<any>'. Type 'T' is not assignable to type 'ArrayLike<any>'.(2345) Commented Feb 21, 2021 at 15:32
  • Could you reproduce the error in a StackBlitz app or something similar? It would be easier to find a solution that way. Commented Feb 21, 2021 at 15:36

1 Answer 1

1

There's already an operator that combines map and concatMap into a single operator, it's called concatMap.

pipe(map(somefunc), concatMap(x => x)) is always the same as concatMap(somefunc). Which explains why it's called concatMap ;)


Your function:

The function you wrote can be rewritten as follows:

function many<T>(predicate: (input: T) => boolean) {
  return pipe(map(predicate),concatMap(x => x));
}

which is the same as

function many<T>(predicate: (input: T) => boolean) {
  return concatMap(predicate);
}

When you look at this, you should be able to see that you're transforming your stream of type T into a stream of type boolean. concatMap can't subscribe to a boolean. You'll need a function of type (input: T) => Observable<R>. Which is the same type signature that concatMap already takes.

Sign up to request clarification or add additional context in comments.

2 Comments

Yes, of course, thank you! But should it not be function many<T>(predicate: (input: T) => T[]) ?
Most high-order operators (concatMap, mergeMap, switchMap, defer, concatAll, mergeAll, etc) will subscribe to an array by turning it into a stream for you. So yes, someFunction: (input: T) => R[] will work! :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.