0

I need that createUser function returns Observable<UserEntity> but in this function I also have to make 2 queries to DB and check if this user exists. The code below uses async/await and looks pretty good and clean. But the problem is that I use rxjs everywhere in this project and would like to write it somehow using rxjs. Can it be as clean as now but with Observables?

async create(user: CreateUserDTO): Promise<UserEntity> {
    const userByEmail = await this.getUserByEmail(); 

    const userByLogin = await this.getUserByLogin(); 
    if (userByLogin || userByEmail)
        // retrun error here

    return await this.createUser(user);
}
1
  • You should probably rewrite getUserByEmail, getUserByLogin and createUser functions to Observables too, but it's hard to help with that unless we have the code for them. Commented Mar 23, 2022 at 20:07

2 Answers 2

2

I am using RxJs 6.5

forkJoin will emit result when both async functions getUserByEmail & getUserByLogin complete their execution

If getUserByEmail & getUserByLogin returns Promise,for that using from to convert a promise into an observable

mergeMap to subscribe the inner observable.In our case createUser returns observable

create(user: CreateUserDTO): Observable < UserEntity > {

    //If getUserByEmail & getUserByLogin returs Promise

    const getUserByEmail$ = from(this.getUserByEmail());
    const getUserByLogin$ = from(this.getUserByLogin());

    //If Both returns Observable
    //const getUserByEmail$ = this.getUserByEmail();
    //const getUserByLogin$ = this.getUserByLogin();


    return forkJoin({
        userByEmail: this.getUserByEmail(),
        userByLogin: this.getUserByLogin(),
    }).pipe(
        tap((res) => {
            if (res.userByEmail || res.userByLogin) {
                throw 'User exists!';
            }
        }),
        mergeMap(() => {
            return from(this.createUser(user));
            //If createUser returns Observable,then
            //return this.createUser(user);
        })
    );
}
Sign up to request clarification or add additional context in comments.

Comments

2

Assuming that this.getUserByEmail(), this.getUserByLogin() and this.createUser(user) return Promises, the code could look like this

create(user: CreateUserDTO): Observable<UserEntity> {
    // with the rxjs from function we turn a Promise into an Observable
    const userByEmail$ = from(this.getUserByEmail()); 
    const userByLogin$ = from(this.getUserByLogin()); 

    // with forkjoin we create an Observable which notifies when all the 
    // Observables which have been passed in as parameters notify
    return forkJoin([userByEmail$, userByLogin$]).pipe(
       // with concatMap you wait for the upstream Observable (i.e. the 
       // Observable created by forkJoin) to notify and complete, and then
       // you return the next Observable in the chain, which is, in this case,
       // the Observable which (when subscribed) creates the user
       concatMap(([userByLogin, userByEmail]) => 
          if (userByLogin || userByEmail) {
              // throw error here
          }
          return from(this.createUser(user))
       })
    )
}

Otherwise, if this.getUserByEmail(), this.getUserByLogin() and this.createUser(user) return Observables you do not need to use the from rxjs function and the code would be slightly simpler, like this

create(user: CreateUserDTO): Observable<UserEntity> {
    return forkJoin([this.getUserByEmail(), this.getUserByLogin()]).pipe(
       concatMap(([userByLogin, userByEmail]) => 
          if (userByLogin || userByEmail) {
              // throw error here
          }
          return from(this.createUser(user))
       })
    )
}

3 Comments

Why you used concatMap operator in this case?
concatMap is the go-to operator any time you have standard sequential async calls to be made. When I say "standard" I mean that you do not have the requirement to kill the request on fly if a new notification comes from upstream (e.g. if you run a query when you click a button and you want to kill that query, if still on fly, if the button is clicked again - in this case the operator to use is switchMap). What may dangerous is mergeMap which keeps all the request and can end up returning not the freshest result. Anyways, it depends on the requirements.
You may find interesting this article which talks about typical patterns of use of rxjs operators with http.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.