3

I am developing multiple parallel file upload feature with ability to delete/cancel ongoing http calls. Once all call gets completed/cancelled, I notify the consumer about it.

For this I am combining individual http observables using forkJoin. But in case user clicks on cancel button, I should not be waiting for the completion of actual http response.

takeUntill won't handle it elegantly as it will act only after receiving the next value from underlying source http stream.

this.uploadFiles().subscribe(data => {
  console.warn("Upload completed now!!!!!", data);
});

uploadFiles() {
  return forkJoin(
    this.files.map(file => // returns array of observable
        this.uploadFile(file).pipe( catchError(() => of("Error re-emitted as success to prevent early exit"))))
  ).pipe(map(() => {
       // logic for some user friendly statistic
      return data;
    }));
}
0

3 Answers 3

2

Use takeUntil with a Subject as notifier to complete Observables. You can pass a file id to the Subject and use filter in takeUntil to only cancel the file upload of a file with a given id.

Use defaultIfEmpty to provide a value that indicates a cancelled request. This also prevents the outer forkJoin from completing immediately when an inner empty request completes.

private cancelUpload$ = new Subject<number>();

uploadFiles() {
  let errorCount = 0, cancelledCount = 0, successCount = 0;
  return forkJoin(this.dummyFiles.map(file =>
    this.uploadFile(file).pipe(
      // react to CANCEL event
      map(response => response == 'CANCEL' ? ++cancelledCount : ++successCount),
      catchError(() => of(++errorCount))
    )
  )).pipe(map(() => ({ errorCount, successCount, cancelledCount })));
}

uploadFile(file: any) {
  http$.pipe(
    ...
    takeUntil(this.cancelUpload$.pipe(filter(id => id == file.id))), // cancel
    defaultIfEmpty('CANCEL'), // provide value when cancelled
  )
}

cancelUpload(file: any) {
  file.uploadStatus = "cancelled";
  this.cancelUpload$.next(file.id) // cancel action
}

https://stackblitz.com/edit/angular-zteeql-e1zacp

Sign up to request clarification or add additional context in comments.

4 Comments

Take until is for ignoring the source observable and will wait for source observable(http.post in this case) and thus we cant aggregate the cancel status immediately.
In my stackblitz, forkJoin always complete with success since I emit the individual observable error as success.
@Amitesh takeUntil completes the source observable immediately and does not wait for the http request to emit something. If takeUntil completes the source defaultIfEmpty immediately emits a default value indicating a cancel event. You get immeditally notified about cancel events and can act uppon them here this.uploadFile(file).pipe(tap(response => response == 'CANCEL' ? /* react to CANCEL event */ )). forkJoin will of course still only emit after all inner file uploads completed.
you are right about takeUntil. I was under misconception without verifying :P
1

Assign the subscription to the variable and cancel it with button:

$http: Subscription;
this.$http = this.http.post(this.uploadUrl, formData, {
      reportProgress: false
      // observe: 'events',
});

And in the cancel function:

cancelUpload(file: any) {
  file.uploadStatus = "cancelled"; 
  this.$http.unsubscribe();
}

Comments

0

Apart from takeUntil we can also use merge or race operator to handle this scenario. However it doesn't change the underlying logic.

Step 1: Create a Subject for individual upload

file.cancelUpload$ = new Subject();

Step 2: Merge this subject with actual http call

merge will complete the stream if any of the observable emits the error. i.e when we emit error from cancelUpload$ subject, http request will get cancelled automatically (look into network tab).

return merge(file.cancelUpload$, $http.pipe(...

Step 3: Actual Cancellation Code

cancelUpload(file: any) {
  file.uploadStatus = "cancelled";
  file.cancelUpload$.error(file.uploadStatus);// implicitly subject gets completed
}

Step 4: Complete the cancelUpload$ Subject in case of upload error/success

It will ensure that merge operation will gets completed as both stream is now complete. And therefore forkJoin will receive the response.

Refer https://stackblitz.com/edit/angular-zteeql?file=src%2Fapp%2Fhttp-example.ts

  uploadFiles() {
    let errorCount = 0,cancelledCount = 0, successCount = 0;
    return forkJoin(
      this.dummyFiles
        .map(file =>
          this.uploadFile(file).pipe(
            catchError(() => of("Error re-emitted as success")) // value doesn't matter
          )
        )
    ).pipe(
      map(() => { // map would receive array of files in the order it was subscribed
        this.dummyFiles.forEach(file => {
          switch (file.uploadStatus) {
            case "success": successCount++; break;
            case "cancelled": cancelledCount++; break;
            case "error": errorCount++; break;
          }
        });
        return { errorCount, successCount, cancelledCount };
      })
    );
  }

  uploadFile(file: any) {
    const formData = new FormData();
    const binaryContent = new Blob([Array(1000).join("some random text")], {
      type: "text/plain"
    }); // dummy data to upload
    formData.append("file", binaryContent);
    const $http = this.http.post(this.uploadUrl, formData, {
      reportProgress: false
      // observe: 'events',
      // withCredentials: true
    });
    file.cancelUpload$ = new Subject();
    file.uploadStatus = "inProgress";
    return merge(
      file.cancelUpload$,
      $http.pipe(
        tap(data => {
          file.uploadStatus = "uploaded";
          file.cancelUpload$.complete();
        }),
        catchError(event => {
          file.uploadStatus = "error";
          file.cancelUpload$.complete();
          return throwError("error");
        })
      )
    );
  }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.