RxJS for Beginners: Mastering Observables, Subjects, and Operators for Reactive Programming

cover
14 May 2024

If you find yourself frustrated about learning RxJS and it seems to be hard for you — you are not alone. At the beginning of my career, I felt the same. In this article, I will reveal some important concepts from RxJS and guide further learning.

If you develop an Angular application, one of the first things you hear about is the RxJS library. And it's true — I haven't seen an Angular application without the RxJS library and it's hard to imagine such an application i.e. if it's not just markup. So the question is: why do you need RxJS for Angular applications? Why is so important? Let's dig into it.

RxJS is a JavaScript library, specifically designed for reactive programming. It is used for managing asynchronous events and data transformation. RxJS has a huge community, and the library constantly improves. It introduces Observables, Subjects, and tons of operators to manipulate and manage data. In web development, we deal with a lot of asynchronous events, so it's crucial to properly manage them effectively to provide a seamless and smooth user experience. This is where RxJS comes in handy.

Pillars of RxJS: Observables, Subject & Operators

What is an Observable?

If you're familiar with the Observer pattern, then you're on the right track. An Observable is essentially an implementation of this pattern. It's a data stream, and you can manipulate it by operators, or get this data by subscribing to it. To understand it, let's get some examples from real life.

Users can click the 'Subscribe' button, and whenever new content is posted, they receive it in their email. This feature, commonly seen on many websites today, perfectly exemplifies the Observable pattern. In this case, the user is an Observer (like a person, who constantly looks at your news feed), and the news content represents the data stream. The user subscribed to the news and it arrives in their email inbox. If at any point the user decides they no longer wish to receive news, they can simply unsubscribe from the stream. In essence, this is how Observables function.

To create an Observable, you need data. This data can vary by its source, resulting in two different types of Observables: hot and cold. If your data is produced by the Observable itself, it's a cold Observable. and when your data is produced outside it - it's a hot one.

Now, back to our sample news website to illustrate this point further.

Consider the process of sending out a daily newsletter to your subscribers. If the newsletter is generated each time a user subscribes, and each subscriber receives a unique copy, this would be an example of a cold Observable. The data (in this case, the newsletter) is produced by the Observable itself upon each new subscription.

On the other hand, suppose there's a live news update feature on your website. When an important news event occurs, the update is instantly pushed to all current users on the website, regardless of when they started observing. This would be a hot Observable because the data is produced outside the Observable (the news event happens regardless of whether anyone is subscribed) and it is shared among all subscribers.

Subject is a special type of Observable, that allows to push the data to their observers.

There are four types of Subjects:

  • Subject. The base type allows multicasting to multiple observers. It doesn't hold any initial value/the current value and doesn't replay it to new subscribers:

    import { Subject } from 'rxjs';
    
    let subject = new Subject();
    subject.subscribe(data => console.log(data));
    subject.next('Hello');
    // Output => Subscriber A: Hello
    

  • BehaviorSubject. BehaviorSubject requires a default value at the time of its creation. This default value is emitted to any subscribers when they first subscribe. When new values are emitted, they are pushed to all current subscribers:

    import { BehaviorSubject } from 'rxjs';
    
    const subject = new BehaviorSubject('First');
    subject.subscribe(data => console.log(data));
    // Output => First
    subject.next('Second');
    // Output => Second
    

  • ReplaySubject. It allows to specify a buffer size, which determines how many recent values will be sent to new subscribers:

    import { ReplaySubject } from 'rxjs';
    
    const subject = new ReplaySubject(2);
    subject.next('First');
    subject.next('Second');
    subject.next('Third');
    subject.subscribe(data => console.log(data));
    // Output => Second, Third
    

  • AsyncSubject. AsyncSubject is designed to only deliver the last value emitted by the Observable, and only after the Observable completes. If the Observable is not complete, the AsyncSubject will not emit any values to its subscribers:

    import { AsyncSubject } from 'rxjs';
    
    const subject = new AsyncSubject();
    subject.next('First');
    subject.next('Second');
    subject.next('Third');
    subject.complete();
    subject.subscribe(data => console.log(data));
    // Output => Third
    

In total, we have 4 Subjects (Subject, BehaviorSubject, ReplaySubject, and AsyncSubject). Understanding the differences between these types of Subjects is crucial for determining which one to use in different scenarios. For instance, a BehaviorSubject might be used when you want to have a value always available (like user authentication status), while an AsyncSubject could be used when you need the final result of a series of computations or a network request, but not the intermediate results.

Operators are an integral part of RxJS, allowing us to transform, combine, manipulate, and work with our Observables in various ways. They are simply functions that take an input Observable, transform it in some way, and return a new Observable. You can chain operators together using the pipe() function on an Observable. The order in which you place your operators in the pipe does matter.

Let's explore some operators with examples.

  • map. This is the same as an array method in JavaScript — it transforms the data stream values with a given project function:

    import { of, map } from 'rxjs'; 
    
    const source = of(1, 2, 3);
    const example = source.pipe(map(val => val + 10));
    example.subscribe(val => console.log(val)); 
    // Output: 11, 12, 13
    

  • filter. Just like its array counterpart, filter only emits values from the source Observable that satisfy a specified condition:

    import { of, filter } from 'rxjs'; 
    
    const source = of(1, 2, 3, 4, 5);
    const example = source.pipe(filter(num => num % 2 === 0));
    example.subscribe(val => console.log(val)); 
    // Output: 2, 4
    

  • catchError. It catches the errors if they occur in your Observable

    import { of, throwError, catchError } from 'rxjs';
    
    const source = throwError(new Error('This is an error!')); 
    const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
    example.subscribe(val => console.log(val)); 
    // Output: I caught: This is an error!
    

  • of. This operator converts the arguments to an Observable sequence

    import { of } from 'rxjs'; 
    
    const source = of(1, 2, 3);
    source.subscribe(val => console.log(val)); 
    // Output: 1, 2, 3
    

  • from. This operator turns an array, promise, or iterable into an Observable

    import { from } from 'rxjs'; 
    
    const array = [1, 2, 3, 4, 5];
    const source = from(array);
    source.subscribe(val => console.log(val)); 
    // Output: 1, 2, 3, 4, 5
    

  • take. This operator only takes the first count values emitted by the source Observable, then completes.

    import { of, take } from 'rxjs'; 
    
    const source = of(1, 2, 3, 4, 5);
    const example = source.pipe(take(3));
    example.subscribe(val => console.log(val)); 
    // Output: 1, 2, 3
    

Caution! Failing to unsubscribe from Observables can lead to memory leaks. This usually happens when an Observable continues to produce values that are no longer needed but are still being consumed due to an active subscription. To prevent this, it's important to unsubscribe when the data is no longer needed. Operators like take and takeUntil can be used to automatically complete an Observable and unsubscribe from it once a certain condition is met. However, remember that not all Observables require manual unsubscribing, particularly those that complete or emit a finite number of values.

Usage examples in Angular

As was mentioned at the start of the article, RxJS is vital for Angular applications, and it's used in several key areas of the framework. Here are some real-world use cases:

  • HTTP Requests. Angular's HttpClient returns Observables, making it easy to handle asynchronous HTTP requests. You can use RxJS operators to transform and manipulate the response data:

    import { map } from 'rxjs';
    
    this.http.get('https://api.example.com/data')
      .pipe(map(({ results }) => results)
      .subscribe(data => console.log(data));
    

  • Manipulating form values. There are a lot of cases when it comes in handy — perform some logic based on a value in a form, transform the value, debouncing user input, and many other scenarios:

    import { debounceTime } from 'rxjs';
    
    this.form.get('name').valueChanges.subscribe(newValue => console.log(newValue));
    // will output value from 'name' control on every change
    
    this.form.get('query').valueChanges
      .pipe(debounceTime(500))
      .subscribe(query => this.performSearch(query));
    // will invoke 'performSearch' after 500ms of last typed value
    

  • Share the data between components. We can create a BehaviorSubject, one component can push values into the BehaviorSubject, and another component can subscribe to it and retrieve them:

    import { BehaviorSubject } from 'rxjs';
    
    // In a shared service
    private dataSubject = new BehaviorSubject(null);
    public data$ = this.dataSubject.asObservable();
    
    public updateData(newData) {
      this.dataSubject.next(newData);
    }
    
    // In component A
    this.sharedService.updateData('Hello from Component A');
    
    // In component B
    this.sharedService.data$.subscribe(data => console.log(data)); // 'Hello from Component A'
    

This article can help you complete the initial picture of RxJS and its usage. But it's only a starting point. Becoming proficient in technology requires time, effort, and consistency.

These links can help you master the RxJS library:

  • RxJS Official Documentation: The official documentation is always a great place to start. It provides a comprehensive guide to all the concepts, operators, and usage scenarios;
  • Learn RxJS: This is an excellent resource with clear examples and explanations of different operators and concepts;
  • RxJS Marbles: An interactive way to learn about different RxJS operators using marble diagrams.