iJS CONFERENCE Blog

Observables: Reactive with RxJS

Angular Tutorial: Part 4

Jul 12, 2022

Reactive programming is a paradigm that’s increasingly found its way into the world of front-end development in recent years. Essentially, it’s about processing data streams and reacting to processing. The Reactive Extensions for JavaScript (RxJS) library takes this idea and implements it. Since Angular relies on RxJS in many places, we need to take a closer look at the framework and its underlying principles.

Blog Series Overview

  1. Angular for beginners
  2. The building blocks of an Angular app
  3. Angular’s Data Layer
  4. Observables: Reactive with RxJS
  5. No Angular SPA without routing

 

In the previous part of our tutorial, we looked at the data layer of an Angular application. We understood what an Angular service is and how to create it. Important concepts such as dependency injection and inversion of control came into play, which are essential for registering services in Angular. A major role in Angular is which module you offer a service because the local singleton of the service class is created there. If a component requires this service, it’s passed to the respective component via constructor injection. In our application, we implemented this principle by making the book list available to our BookComponent via the BookApiService. Finally, we got to know the HttpClientModule, which also provides a service—the HttpClient—which helps us load the book list of our bookmonkey-api backend.

In order to understand the HttpClient’s functionality, we already familiarized ourselves with observables. An observable is an object that provides a stream of data from a source. Anyone interested in this data stream can subscribe to it and receive the data. Of course, the important thing is unsubscribing from the data stream.

Observables are a powerful instrument that Angular gets a lot of use out of. They are provided by the RxJS library [1] and are a main part of responsive programming for an Angular application. We encounter observables in all Angular modules, some of which we’ll get to know in future articles. But RxJS offers much more functionality than data streams you can subscribe to and unsubscribe from. For example, we can use RxJS operators to transform, filter, or even mix the data sent over the data stream with data from another data stream. A typical use case for this is the requirement to not only receive data from a single REST API, but to send multiple HTTP requests to different backends at the same time. As a result, we have to wait for the HTTP response of all requests in order to combine the data with each other to a larger data model.  RxJS also offers objects with which we can manage the entire state of our application—subjects.

This article will now take a closer look at all of this. But before that, I’d like to take another look at the exact functionality of an Observable and show further possibilities for subscribing to a data stream and unsubscribing from it.

A data stream

As previously mentioned, the basic principle of reactive programming is based on data streams. Let’s look at a simple example from the real world: We order a book from a trusted bookstore. A few days later, the doorbell rings and the mailman wants to give us the book, but we’re not home. A day later, he tries again. In this example, we subscribed to a stream of recurring events and are modifying it with actions.

A phone call with a good friend is another example. Depending on the transmitted message’s content, we’ll take further actions. For instance, if our friend on the phone gave us a good tip for a Christmas present, then we’ll go out shopping.

Every day we’re surrounded by events we can—or must—react to. However, there are also events that we ignore completely because they aren’t relevant to us. But regardless of the fact that certain events are irrelevant, they still happen.

Our Angular application is also shaped by a variety of events and influences: The user clicks a button, a timer expires, or we load data from a backend. We want to react appropriately to all events in our application and if needed, trigger further actions. This is exactly where observables support us by making all occurring events available as data streams. We decide if and when we react to these events by subscribing to a data stream or unsubscribing from it.

iJS Newsletter

Keep up with JavaScript’s latest news!

Basic observable principles

An observable is an object that returns the data stream from a source. This source can be an HTTP request, an event, or a user interaction with our application. Additionally, RxJS brings a well-defined framework for an observable and follows some rules:

  • A data stream can be called with three different functions: next(), complete(), and error().
  • The data stream is finished as soon as error() or complete() has been called. After that, no further elements can be sent over this data stream.
  • Data streams can be subscribed to with the help of the subscribe() function. These can be unsubscribed at any time.
  • Each observable also has a pipe() function that can be used to pass operators for processing the elements.

Creating observables yourself

In our application, we already subscribed to an existing observable and were able to receive the book list from the backend. When we work with Angular, we come into contact with existing observables in many places. Still, there are situations where we need to create observables ourselves, for instance, when testing our BookApiService. There, we want to emulate the behavior of an HTTP request during a test. But instead of returning an http.get(), we want to create our own observable that we’ll make available to the component.

The RxJS library offers various creation functions that can help us create observables from various data types:

import { of } from 'rxjs';
const obs$ = of(11,12,13)

The of function creates an observable that outputs the input values one at a time and finally terminates the data stream. The $ character at the end of a variable is also a common notation by which it is directly evident that this variable carries an observable, so you must subscribe to it in order to obtain values.

We usually don’t create observables ourselves, except in our unit tests. However, with the help of the Creation Functions, already existing Promises and Events can be converted into an Observable:

import { from } from 'rxjs';
const obs$ = from(11,12,13)

Promises are also a common design pattern in JavaScript for asynchronous tasks. Therefore, in the Angular world, you’ll often encounter libraries whose API returns a Promise. But a Promise only fires a value once, rather than multiple values over a data stream. You also can’t cancel a Promise, and implementing failed actions is very tedious. Therefore, RxJS provides the from function which converts a Promise into an Observable. This open data stream will only send a value, but we can now react much more easily to any error messages and simply repeat the call in case of an error. Here, implementation is done with RxJS operators.

Operators: Manipulate data streams

The RxJS library offers us the possibility to subscribe and unsubscribe to data streams. Operators are the core of the library, which can help us manipulate values within a data stream. We distinguish between two kinds of operators in RxJS:

  • Pipeable Operators
  • Creation Operators

We already learned about the latter. For example, of() and from() are typical creation operators. A Pipeable Operator is a function that receives an Observable and returns another, new Observable: Accordingly, the previous observable is not manipulated; this is also referred to as a pure operation. For this, each observable has a function called pipe(); within it we call the operators in succession:

obs$.pipe(operator1(), operator2(), operator3(), 
operator4()).subscribe()

Since each operator returns a new observable, we can continue subscribing to the stream unchanged at the end of the pipe chain.

RxJS offers over 150 operators [2]. For example there is the map operator

of(1,2,3).pipe(map(x => x * 10)).subscribe()

Similar to the Array.prototype.map, each input value is transformed by a function you implement yourself. RxJS documents all operators as marble diagrams, which allow us to see exactly what the input values look like after an operator’s function call (Fig. 1).

Fig. 1: Marble diagrams: Documentation of RxJS operators

The map operator is a transformation operator. In addition to transforming, we can also filter values that are sent to us via the data stream:

of(0,1,2,3,4).pipe(filter(x => x % 2 === 1)).subscribe()

The filter operator provides us with the ability to filter values based on a condition. Just as with the Array.prototype.filter function, only the values that match the predicate are forwarded. We can also apply this directly in our BookComponent (Listing 1).

Listing 1

import { Subscription } from 'rxjs';

@Component({..})

export class BookComponent implements OnInit {

  books: Book[];

  subscription: Subscription;

  // ...

  ngOnInit(): void {

    this.subscription = this.bookApiService.getAll().pipe(

      filter((books: Book[]) => books.length > 0)

    ).subscribe(

      (books: Book[]) => this.books = books

    );

  }

}

With the help of the filter operator, we make sure an empty array is never assigned to our local class variable books. Besides the filter operator, there are other filtering operators worth mentioning that I’d like to briefly describe:

  • take(count: number): Forwards count values and then terminates the observable. This can also be useful for canceling an observable. In many observables, you already know the number of values it will fire. So you can already use the take operator to initially determine how many values you want to take before closing the stream.
  • distinctUntilChanged(): If identical values are emitted in direct succession, the second value is not forwarded. It will be forwarded again only if the next emitted value is different.
  • debounceTime(dueTime: number): Once a value is emitted, further emitted values are ignored for the duration specified in dueTime. Once the time has expired, a value is forwarded again (Fig. 2).

Before we get into the question of a good error handling strategy, I want to talk about useful utility operators. We’re often given an array over the data stream, but then we only want to transform the elements within the array. We can easily implement this behavior as follows:

this.subscription = this.bookApiService.getAll().subscribe(
  (books: Book[]) => this.bookTitles = this.books.map(book => book.title)

);

However, we’ll also learn about other ways to subscribe to a stream without explicitly calling the subscribe function. We can achieve the same result using two pipeable operators (Listing 2).

Listing 2 

import { concatall. toArray, map } from 'rxjs/operators';

@Component({..})

export class BookComponent implements OnInit {

  bookTitles: string[];

  subscription: Subscription;

  // ...

  ngOnInit(): void {

    this.subscription = this.bookApiService.getAll().pipe(

      concatAll(),

      map((book: Book) => book.title),

      toArray()

    ).subscribe(

      (bookTitles: string[]) => this.bookTitles = bookTitles

    );

  }

}


Fig. 2: debounceTime

With concatAll, we make sure that all values of the array are passed on individually as a new observable. Then we can then transform it with the help of the map operator, which we’re already familiar with. Finally, we use the toArray operator again to return all the values already emitted as a whole array within the data stream

Another very useful operator is the tap utility operator. It is mostly used to trigger side effects as soon as a value is emitted via the data stream.

of(0,1,2,3,4).pipe(
  tap((n: number) => this.showDialog(n))
).subscribe()

For example, a small dialog box could open, a server request could be executed, or data could be saved to browser storage once a value crosses the data stream.

EVERYTHING AROUND ANGULAR

The iJS Angular track

Error handling for a data stream

Now let’s look at our application again. What happens when we stop our backend server and our REST API is no longer available? Presumably, we receive a (failed)net::ERR_CONNECTION_REFUSED in our BookApiService instead of the books. This leads to our data stream getting immediately destroyed and the web page remains empty. The best reaction would be to analyze the error and apply appropriate error handling. For instance, you can resend the request to our backend or leave a small message for the user stating that the books couldn’t be loaded.

In the previous article, we already learned that the subscribe function also provides an error callback, which helps us react to errors.

subscribe(
  next?: (value: T) => void, 
  error?: (error: any) => void, 
  complete?: () => void): Subscription;

The problem here is that we can only use this callback to the extent that we call the subscribe function. Besides that, we have to take care of rebuilding and subscribing to the data stream within the callback by ourselves.

To easier solve this problem, RxJS provides us with two operators: catchError and retry. With help from the retry operator, we can execute the data stream subscription repeatedly in case of an error. The number of attempts can be defined specifically for users:

this.bookApiService.getAll().pipe(retry(11)).subscribe()

Now, if our HTTP request returns an error, this request is executed up to eleven times before the data stream is finally destroyed. But since we want to avoid destroying the data stream in almost all cases, using the catchError operator is considered the best practice (Listing 3).

Listing 3

import { NEVER } from 'rxjs';

import { catchError } from 'rxjs/operators';

@Component({..})

export class BookComponent implements OnInit {

  bookTitles: string[];

  subscription: Subscription;

  // ...

  ngOnInit(): void {

    this.subscription = this.bookApiService.getAll().pipe(

      filter((books: Book[]) => books.length > 0),

      catchError(error => {

        // implement custom error handling

        return NEVER;

      })

    ).subscribe(

      (bookTitles: string[]) => this.bookTitles = bookTitles

    );

  }

}

All errors that occur when subscribing to the data stream or when executing the filter operator are directly caught in the catchError operator. This of course also means that if other operators are executed after the catchError operator, they will not be caught in the event of an error. The order of operators in the pipe function plays a role. Another peculiarity is that we need to forcibly return an observable because it is a pipeable operator. We have several possibilities to achieve this:

  • The creation of an observable using the of operator. For this, we also need data, such as a book list cached in the browser storage.
  • We use the NEVER observable included in RxJS (as demonstrated in Listing 3). This observable emits nothing, throws no error messages, and does not terminate. This allows us to handle the error without the user receiving a response.
  • Besides NEVER, we can also give the RxJS observable EMPTY. This also emits nothing, but it immediately terminates the entire data stream. This is useful if you find out during runtime error analysis that there’s a fatal runtime error and you want to destroy the data stream (but in this case, it’s autonomous).

By introducing the catchError operator in our application, data streams are now no longer automatically destroyed in case of an error. This makes it even more important that we make sure that we log out of a data stream at the right time, in the right place.  A pipe built into Angular (not to be confused with the Pipeable Operators) is considered to be the best practice here: the AsyncPipe.

Subscribe in the template: The AsyncPipe

As previously mentioned, there are other ways to subscribe and unsubscribe to a stream. In addition to the subscribe function, which we use to subscribe to a data stream within the component class, we can also subscribe to it in the template—that is, in the HTML. To do so, first we assign the observable to a class variable (Listing 4).

Listing 4
@Component({..})

export class BookComponent implements OnInit {

  books$: Observable<Book[]>

  // ...

  ngOnInit(): void {

    this.books$ = this.bookApiService.getAll().pipe(...);

  }

}

We bind this class variable to the template using the AsyncPipe and automatically subscribe to the data stream.

<app-book-card *ngFor="let book of books$ | async | filterBook:
 bookSearchTerm" [bookEntry]="book"></app-book-card>

Each AsyncPipe opens its own data stream. If you want to access the book data several times within the template, this can lead to unwanted behavior. For example, to find out if it’s already been loaded:

<div *ngIf="books$ | async">

  <app-book-card *ngFor="let book of books$ | async | filterBook:
 bookSearchTerm" [bookEntry]="book"></app-book-card>

</div>

Unfortunately, this leads to the HTTP request being fired twice to our backend, since we already subscribe to the data stream twice using the AsyncPipe. We can solve this problem with the help of a share operator [3], or by creating a template variable as an alias:

<div *ngIf="books$ | async as books">

  <app-book-card *ngFor="let book of books | filterBook:
 bookSearchTerm" [bookEntry]="book"></app-book-card>

</div>

With the help of the template variable, the value emitted by the data stream is stored and used in HTML nested nodes below. The AsyncPipe is considered the best practice because subscribing to the stream is very easy and unsubscribing happens automatically. That means that from now on, we don’t need to store the subscription for all data streams we subscribe to using AsyncPipe and we also no longer need to manually unsubscribe in the ngOnDestroy LifeCycleHook.

Managing the Application State with the help of RxJS Subjects

Often in a typical Angular application, we have the use case that we want to store data in a state for a long time and beyond that, every component should have unhindered access to it. For instance, in our application the book list could be kept in one state—namely in the BookComponent. As soon as the user wants to delete a book from the list, the corresponding BookCardComponent, its book.isbn, must be passed to its ParentComponent. Then the BookComponent would delete the book from its state. We implemented this similarly at the beginning using the input and output decorators.

But how can we send change of state to components that are not direct parent or child components? An implementation using input and output decorators would likely end up a huge mess, the larger the application and the more nested we make our component tree (Fig. 3).

Fig. 3: State Management with @ Input() and @ Output()

A common pattern here is state management with RxJS subjects [4]. A component can subscribe to the data stream of a subject; additionally, a subject has three observer methods: next, error, and complete. We can use this to pass data to the subject from the outside. Technically, a subject is a combination of observable and observer:

class Subject<T> extends Observable {
  next(value?: T) // Observer
  error(error: any)
  complete()
  subscribe(/*..*/) // Observable
  pipe(/*..*/)
}

Subjects that we manage the state of an application with are best created in a globally available ApplicationStateService (Fig. 4).

Fig. 4: ApplicationStateService with a subject

One interesting small application example is implementing a HeaderComponent. As the name suggests, this should display a corresponding title of the application at any time. For instance, if we’re on a book’s detail view, then the book title should also be visible within the HeaderComponent. To do so, first we create an ApplicationStateService with a Subject (Listing 5).

Listing 5

import { Injectable } from '@angular/core';

import { Observable, Subject } from 'rxjs';
 
@Injectable({
  providedIn: 'root'

})

export class ApplicationStateService {

  private headerTitle: Subject<string> = new Subject<string>();

  headerTitle$: Observable<string> = this.headerTitle.asObservable();

   setTitle(title: string) {

    this.headerTitle.next(title);

  }

}

Here it’s important to always declare the subject as a private class variable, otherwise any component that injects our ApplicationStateService via the constructor can call the observer function next directly, emitting uncontrolled values.

Of course, our HeaderComponent still needs access to the Observable so that it can subscribe to the data stream and the title—once the Subject emits a value—can be set. Using the asObservable function on a subject, we only get the observable without direct access to the observer methods. Finally, we implement a setter function for controlled setting of a title. After that we can use everything inside the HeaderComponent (Listing 6).

Listing 6

@Component({

  selector: 'app-header',

  templateUrl: './header.component.html',

  styleUrls: ['./header.component.scss']

})

export class HeaderComponent implements OnInit {

  headerTitle$: Observable<string>;
 
  constructor(private readonly applicationStateService: ApplicationStateService) { }
 
  ngOnInit(): void {

    this.headerTitle$ = this.applicationStateService.headerTitle$;

  }

}

We bind the headerTitle$ class variable to the corresponding template again using the AsyncPipe. Then components—regardless of where they are located in the component tree—can use the ApplicationStateService via Dependency Injection and the currently displayed web page’s title using the setTitle function.

Finally, I’d like to point out the other kinds of Subjects. The way we’ve used subject so far has some dangers. If a component subscribes to the data stream only after a value has been emitted via the subject, this value is lost for the component. As a solution, RxJS provides two more Subjects:

  • A BehaviourSubject persistently stores the last emitted value.
  • A ReplaySubject stores a user-defined number of last emitted values.

If a value is emitted via a BehaviourSubject, this value is stored internally. Any component that subscribes to the data stream of the BehaviourSubject with a time delay will still receive the last emitted value. A typical application example is a user’s access token that will persist for one browser session after the user has successfully logged in and received the token.

Outlook

In this article, we were able to get an insight into the powerful library RxJS. We learned more ways to subscribe and unsubscribe to data streams. The AsyncPipe is considered a best practice. We can manipulate a data stream at will with the help of pipeable operators. RxJS offers a variety of operators to transform or even filter data. Operators also make handling errors easier. With retry, we can subscribe to a data stream over and over again for a user-defined number of attempts in case of an error.

The catchError operator catches any kind of error occurring before a retry would even have to be executed. There we implement our error handling and—because it’s a pipeable operator—we also have to pass on our own observable. Finally, we learned about a variant that implements state management with the help of RxJS subjects.

As mentioned earlier, many Angular APIs offer their data as observables. RxJS offers more than what we could present in this article, by far. In the next parts of this series, we’ll frequently encounter different observables and learn more operators. But for now, we leave reactive programming and turn to the topic of routing an Angular application in the next issue.

Links & Literature 

[1] https://rxjs.dev 

[2] https://rxjs.dev/guide/operators 

[3] https://rxjs.dev/api/operators/share 

[4] https://rxjs.dev/guide/subject 

Sign up for the iJS newsletter and stay tuned to the latest JavaScript news!

 

BEHIND THE TRACKS OF iJS

JavaScript Practices & Tools

DevOps, Testing, Performance, Toolchain & SEO

Angular

Best-Practises with Angular

General Web Development

Broader web development topics

Node.js

All about Node.js

React

From Basic concepts to unidirectional data flows