Mastering RxJS Operators: A Developer's Guide to Reactive Programming
Learning RxJS operators using Arithmetic operations (+ - x / %)
Introduction 🤠
Hi, I'm Sahil K., MTS1 at Fyle! As a developer, I’ve often found myself tangled in the complexities of asynchronous programming. Async JS is the only reason why I fell in love with JavaScript. Back in my second year of college, I stumbled upon this JS conference video by Philip Roberts titled “What the heck is the event loop anyway?” and was blown away by the concept of asynchronous programming.
That video opened up a whole new world for me. I explored promise chaining, fought through callback hell, and got excited about the endless possibilities. But when I joined Fyle, something even more mind-boggling entered my life—RxJS.
When I started working with the Fyle-mobile-app codebase I initially struggled with it. It was an entirely new way of writing code for me, as it uses RxJS heavily. So, a few months ago I was discussing how to learn RxJS in a 1x1 call with Dimple, she came up with the idea of learning RxJS with the chain of simple mathematical operations like addition, multiplication, square, etc. I started learning RxJS like this, without any YouTube tutorial or Udemy course and it worked. Now I at least know how exactly each operator works when to use which operator and how to write better manageable code. So I decided to write this blog and share what I have learned in the past few months. Hopefully, you’ll learn something new from it!
So, what exactly is RxJS? 🤔
RxJS isn't just another tool for handling async code. It introduces a new paradigm of reactive programming, allowing you to work with events and data streams in ways that traditional async patterns can’t match. In this post, I'll walk you through how I learned RxJS using something simple and relatable: arithmetic operations.
At its core, RxJS (Reactive Extensions for JavaScript) is a library that helps you handle asynchronous operations in a structured way. You might already be familiar with JavaScript’s native ways of dealing with async code, like using promises or async/await. RxJS takes this concept a step further by providing tools that allow you to work with data streams—continuous sequences of data that might come from a variety of sources (like user inputs, API responses, or even time-based events). Instead of getting buried under a mountain of callbacks or then()
and catch()
, RxJS lets you handle all that chaos in a cleaner, more organized way.
Why care about RxJS? 🤷♂️
Do you know JavaScript is all about waiting for things—like data loading or user actions? RxJS turns those "waiting times" into something you can work with smoothly.
Asynchronous code can get messy fast. Imagine you're dealing with:
Multiple simultaneous HTTP requests.
Handling real-time data feed like Visa Real-Time Feed.
Managing multiple user input events like typing and button clicks.
The best thing is, that you can use RxJS anywhere in JavaScript land! Be it Node.js, Angular, React, Vue.js, or even good old VanillaJS—it’s there to handle your asynchronous tasks like a pro.
What makes RxJS different? 🤨
RxJS introduces something called Observables. Now, think of an Observable as a way to keep track of how data arrives over time. It's like setting up a live stream, where data flows continuously, and you can react to it in real-time. This is different from Promises, which just deal with a single future value (like a one-time response from a server).
To make this clearer, let’s take an example we all know, Imagine you're planning a movie night. You have two options: buying a DVD or subscribing to a streaming service like Netflix. These two scenarios perfectly illustrate the difference between Promises and Observables in programming. ! 🍿
Promises: The DVD Purchase
Promises are like ordering a DVD online. Here's how it works:
You place the order: This is creating a Promise in code.
The wait begins: The DVD is being processed and shipped. This represents the asynchronous operation.
Two possible outcomes:
The DVD arrives (Promise resolved) ✔️
You get a notification that it's out of stock (Promise rejected) ❌
It's a one-time deal: Once you get the DVD or the out-of-stock notice, that's it. The transaction is complete.
It represents a single, complete operation (getting the entire movie at once). You get one final result: either the DVD or a reason why you didn't get it. Once you have the result, the process is over. You can't "reorder" using the same order, you have to place a new order.
In the programming world, Promises work similarly. They're great for one-off operations where you're waiting for a single result, like fetching a user's profile.
Observables: Streaming like Netflix
On the other hand, Observables are perfect for scenarios like Netflix, where data keeps coming in over time. Think about how you watch a movie:
You sign up: This is creating an Observable.
You hit play: This is when you subscribe to the Observable.
The movie streams: You receive the movie in chunks (data packets) over time.
Continuous flow: As long as you're watching, new bits of the movie keep coming.
You're in control: You can pause, resume, or stop watching altogether.
It delivers multiple pieces of data over time (movie frames, audio packets). You have control: pause, resume, or cancel at any time. If you stop watching and come back later, you can start a new stream.
In programming, Observables are perfect for scenarios involving multiple values over time, like monitoring mouse movements or handling WebSocket messages.
Key Concepts in RxJS 🔑
Now that we’ve got a handle on the basics of Observables, Data Streams, a little knowledge of Subscriptions, and how RxJS differs from promises, let's move on to the really fun part—CODING!
In RxJS, operators are functions that allow you to transform, filter, combine, or manipulate streams of data. If we go back to the Netflix analogy, you can think of operators as the settings on your TV that let you adjust the picture, and sound, or even switch between different shows. They help you shape how your data (stream) behaves before it reaches you.
Note: For this blog, I’ve used Replit to code and test the RxJS examples. Feel free to use it or any other coding environment you prefer to try out the examples on your own.
Install RxJS
First, make sure you have RxJS installed in your environment. If you're using Node.js:
npm install rxjs
Creating Observables and Subscribing
Here's a simple example to demonstrate the creation and subscription of an Observable in RxJS:
CODE:
import { Observable } from "rxjs";
// Create a new observable to emit data
const observable = new Observable((subscriber) => {
// Emit the data
subscriber.next(10);
subscriber.next(20);
subscriber.next(30);
// Complete the observable
subscriber.complete();
});
// Observer to handle emitted values
const observer = {
next: (value) => {
console.log("Observer received value:", value);
},
error: (err) => {
console.log("ERROR:", err);
},
complete: () => {
console.log("COMPLETE status");
},
};
// Connect observable with observer
observable.subscribe(observer);
Output:
When you run the above code, the output will be:
Observer received value: 10
Observer received value: 20
Observer received value: 30
COMPLETE status
Explanation:
Create Observable: Using the Observable constructor, we create an observable that emits values.
Subscriber: Inside the observable, we use subscriber.next() to emit values.
Observer: We define an observer object with next, error, and complete methods to handle the emitted values, errors, and completion notifications respectively.
Subscribe: We connect the observable with the observer by calling observable.subscribe(observer), which starts the data emission.
Understanding RxJS pipe
The pipe method in RxJS is used to compose operators to transform, filter, and manipulate streams of data emitted by an Observable.
Key Concepts of Pipe
:
Composition: You can chain multiple operators together to create a data processing pipeline.
Immutability: Each operator returns a new Observable, leaving the original Observable unchanged.
Readability: Using pipe enhances the readability of your code by clearly showing the sequence of operations applied to the data stream.
Understanding the Basic Syntax of pipe
import { pipe } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Create an observable
const observable = new Observable(subscriber => {
// Emit values
});
// Use pipe to compose operators
const processedObservable = observable.pipe(
map(value => {
// Transformation logic
return transformedValue;
}),
filter(value => {
// Filtering logic
return condition;
}),
// More operators can be added here
);
// Subscribe to the processed observable
processedObservable.subscribe({
next: (value) => {
// Handle emitted values
},
complete: () => {
// Handle completion
},
});
Diving into RxJS Operators with Arithmetic Operations 🎯
Understanding RxJS Operators
Operators allow us to manipulate the data emitted by Observables, making it easier to process and respond to various events.
Each operator serves a specific purpose and can significantly improve the readability and efficiency of our code. We will start with the tap
operator, a fundamental tool in the RxJS toolbox.
Note: For this blog, I have selected a few operators that we usually use in our day-to-day coding from the Fyle codebase. Each operator will be explained with simple examples, focusing on how they can be applied in real-world scenarios. Let's get started!
Operator 1: tap
for the side effects 🚰
The tap operator is used to perform side effects for notifications from the source Observable without changing the emitted values.
When to Use?
⇒ Use tap
for logging, debugging, or any side-effect actions without modifying the data stream.
CODE:
import { of } from 'rxjs';
import { tap } from 'rxjs/operators';
// Observable emitting numbers
const numbers$ = of(1, 2, 3);
// Using tap to log values before processing
numbers$
.pipe(
tap((num) => console.log("Data to log:", num)), // Log the emitted data
)
.subscribe((result) => console.log("Emitted data:", result)); // Log the emitted result
OUTPUT:
// Output:
Data to log 1
Emitted data: 1
Data to log 2
Emitted data: 2
Data to log 3
Emitted data: 3
Operator 2: map
to transform emitted values 🗺️
The map operator is used to transform the items emitted by an Observable by applying a specified function to each item.
When to Use?
⇒ Use map
when you want to modify or transform the data that an Observable emits without changing the order of emissions.
CODE:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// Observable emitting numbers
const numbers$ = of(1, 2, 3, 4, 5);
// Using map to square each number
const squares$ = numbers$.pipe(map((num) => num * num));
squares$.subscribe((result) => console.log("Squared:", result));
OUTPUT:
// Output:
Squared: 1
Squared: 4
Squared: 9
Squared: 16
Squared: 25
Operator 3: filter
to include only certain items 🫥
The filter operator is used to emit only those items from an Observable that meet a specified condition.
When to Use?
⇒ Use the filter
operator when you want to include only certain items based on a condition.
CODE:
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
// Observable emitting numbers
const numbers$ = of(1, 2, 3, 4, 5, 6);
// Using filter to get even numbers
const evenNumbers$ = numbers$.pipe(
filter(num => num % 2 === 0)
);
evenNumbers$.subscribe(result => console.log('Even:', result));
OUTPUT:
// Output:
Even: 2
Even: 4
Even: 6
Operator 4: zip
to combine each value in a pairwise fashion 🫂
The zip operator is used to combine the values from multiple Observables into a single emitted value.
When to Use?
The zip
operator is best suited for scenarios where you want to combine the emitted values from multiple Observables such that each emitted value is paired with the corresponding emitted value from the other Observables.
Example Overview:
We have two observables:
observableA$
: Emits the values[1, 2, 3]
with specific delays.observableB$
: Emits the values[10, 20, 30]
with specific delays.
The zip
operator is used to combine the values emitted by observableA$
and observableB$
. It waits for each observable to emit a value, then pairs them together and performs an operation (in this case, summing the two values).
CODE:
import { concat, of, zip } from "rxjs";
import { delay, map } from "rxjs/operators";
// Two observables emitting numbers
// ObservableA: [1 (1st emit), 2 (3rd emit), 3 (4th emit)]
const observableA$ = concat(
of(1).pipe(delay(0)), // Emit 1 immediately
of(2).pipe(delay(200)), // Emit 2 after 0.2 seconds
of(3).pipe(delay(300)), // Emit 3 after 0.3 second
);
// Observable2: [10 (2nd emit), 20 (5th emit), 30 (6th emit)]
const observableB$ = concat(
of(10).pipe(delay(100)), // Emit 10 after 0.1 second
of(20).pipe(delay(400)), // Emit 20 after 0.4 seconds
of(30).pipe(delay(500)), // Emit 30 after 0.5 second
);
// Using zip to add the values from both observables
zip([observableA$, observableB$])
.pipe(map(([a, b]) => a + b))
.subscribe((result) => console.log("Zipped sum:", result));
OUTPUT:
// Output:
Zipped sum: 11
Zipped sum: 22
Zipped sum: 33
Emission Sequence:
Here is the order in which the values are emitted:
Time 0ms:
observableA$
emits1
immediately.zip
waits forobservableB$
to emit.Time 100ms:
observableB$
emits10
. Nowzip
has a pair of values. It combines1
fromobservableA$
and10
fromobservableB$
, sums them (1 + 10
), and emits11
.Time 200ms:
observableA$
emits2
.zip
waits for the next value fromobservableB$
.Time 500ms:
observableA$
emits3
(300ms delay after the previous emission).Almost simultaneously,
observableB$
emits20
(400ms delay after its previous emission).zip
now has the second pair of values. It combines2
fromobservableA$
(the value that was waiting) and20
fromobservableB$
, sums them (2 + 20
), and emits22
.
Time 1000ms:
observableB$
emits30
(500ms delay after its previous emission).zip
already has3
waiting fromobservableA$
, so it immediately pairs these values, sums them (3 + 30
), and emits33
.
Operator 5: combineLatest
to combine the Latest Value of each stream ⛙
The combineLatest operator combines the latest values from multiple Observables into a single Observable. It waits for all input Observables to emit at least one value, then emits an array containing the latest value from each.
When to Use?
Use combineLatest
when you need to combine the most recent values from multiple Observables, and you want an update whenever any of the source Observables emits a new value.
Let’s understand combineLatest
with the same example as above:
CODE:
import { combineLatest, concat, of } from "rxjs";
import { delay, map } from "rxjs/operators";
// Two observables emitting numbers
// ObservableA: [1 (1st emit), 2 (3rd emit), 3 (4th emit)]
const observableA$ = concat(
of(1).pipe(delay(0)), // Emit 1 immediately
of(2).pipe(delay(200)), // Emit 2 after 0.2 seconds
of(3).pipe(delay(300)), // Emit 3 after 0.3 second
);
// Observable2: [10 (2nd emit), 20 (5th emit), 30 (6th emit)]
const observableB$ = concat(
of(10).pipe(delay(100)), // Emit 10 after 0.1 second
of(20).pipe(delay(400)), // Emit 20 after 0.4 seconds
of(30).pipe(delay(500)), // Emit 30 after 0.5 second
);
// Using combineLatest to combine values from both observables
combineLatest([observableA$, observableB$])
.pipe(map(([a, b]) => a + b))
.subscribe((result) => console.log("Combined sum:", result));
OUTPUT:
// Output:
Combined sum: 11
Combined sum: 12
Combined sum: 13
Combined sum: 23
Combined sum: 33
Emission Sequence:
Here’s the order in which the values are emitted:
Time 0ms:
observableA$
emits1
.combineLatest
doesn't emit yet because it's waiting forobservableB$
to emit its first value.
Time 100ms:
observableB$
emits10
.Now that both observables have emitted at least once,
combineLatest
emits its first value:1 + 10 = 11
.
Time 200ms:
observableA$
emits2
.combineLatest
immediately emits a new value using the latest from each:2 + 10 = 12
.
Time 500ms:
observableA$
emits3
(300ms after its previous emission).combineLatest
emits:3 + 10 = 13
.Almost immediately after,
observableB$
emits20
(400ms after its previous emission).combineLatest
emits again with the latest values:3 + 20 = 23
.
Time 1000ms:
observableB$
emits30
.combineLatest
emits using the latest from each:3 + 30 = 33
.
Operator 6: forkJoin
to combine the last value of each stream 🍴
forkJoin waits for all provided Observables to complete, then emits a single array containing the last emitted value from each Observable. It is useful when you want to wait for multiple Observables to finish before processing their final results.
When to Use?
Use forkJoin
when you need to act only after all input Observables have been completed, and you're only interested in the final emitted values of each Observable. This is typically used for tasks like making multiple API requests and processing their responses once all requests are completed.
Let’s understand forkJoin
with the same example as before:
CODE:
import { forkJoin, concat, of } from "rxjs";
import { delay, map } from "rxjs/operators";
// Two observables emitting numbers
// ObservableA: [1 (1st emit), 2 (3rd emit), 3 (4th emit)]
const observableA$ = concat(
of(1).pipe(delay(0)), // Emit 1 immediately
of(2).pipe(delay(200)), // Emit 2 after 0.2 seconds
of(3).pipe(delay(300)) // Emit 3 after 0.3 second
);
// ObservableB: [10 (2nd emit), 20 (5th emit), 30 (6th emit)]
const observableB$ = concat(
of(10).pipe(delay(100)), // Emit 10 after 0.1 second
of(20).pipe(delay(400)), // Emit 20 after 0.4 seconds
of(30).pipe(delay(500)) // Emit 30 after 0.5 second
);
// Using forkJoin to combine the final values from both observables
forkJoin([observableA$, observableB$])
.pipe(map(([a, b]) => a + b))
.subscribe((result) => console.log("ForkJoin sum:", result));
}
OUTPUT:
// Output:
ForkJoin sum: 33
Emission Sequence:
Here’s the order in which the values are emitted:
Time 0ms:
observableA$
emits1
.forkJoin
doesn't emit yet because it's waiting for both observables to complete.
Time 100ms:
observableB$
emits10
.forkJoin
still doesn't emit.
Time 200ms:
observableA$
emits2
.forkJoin
still doesn't emit.
Time 500ms:
observableA$
emits3
(300ms after its previous emission) and completes.observableB$
emits20
(400ms after its previous emission).forkJoin
still doesn't emit becauseobservableB$
hasn't been completed.
Time 1000ms:
observableB$
emits30
and completes.Now that both observables have been completed,
forkJoin
emits the combination of the last values from each observable:3 + 30 = 33
.
fig. This explains how exactly zip
, combineLatest
, forkJoin
work.
Operator 7: switchMap
to switch to a New Observable 🔀
The switchMap operator maps each value to an Observable, then flattens all of these inner Observables, emitting values from the most recently mapped Observable.
When to use?
Use switchMap
when you want to create a new Observable based on the value emitted by the source Observable, but you only care about the emissions from the most recent inner Observable.
CODE:
import { interval, of } from "rxjs";
import { delay, switchMap, take } from "rxjs/operators";
function calculateFactorial(n) {
if (n <= 1) {
return 1;
}
return n * calculateFactorial(n - 1);
}
export function milestoneSwichMap() {
const source$ = interval(1000).pipe(
take(5), // This will make the interval stop after 5 emissions
switchMap((count) => {
const n = count + 1; // Emit values 1, 2, 3, 4, 5
console.log(`Calculating factorial of ${n}`);
return of(calculateFactorial(n)).pipe(delay(500)); // Simulating a delay
}),
);
source$.subscribe(
(result) => console.log(`Factorial result: ${result}`),
);
}
OUTPUT:
// Output
Calculating factorial of 1
Factorial result: 1
Calculating factorial of 2
Factorial result: 2
Calculating factorial of 3
Factorial result: 6
Calculating factorial of 4
Factorial result: 24
Calculating factorial of 5
Factorial result: 120
Why use switchMap
here?
Handling new emissions:
switchMap
is ideal when you want to switch to a new inner observable every time the source observable emits, canceling any ongoing operations from previous emissions.Preventing overlap: In this case, if a new interval emission occurs before the previous factorial calculation is complete,
switchMap
will cancel the previous calculation and start a new one. This prevents overlapping calculations and ensures we're always working with the most recent value.Resource management: If the factorial calculations were more resource-intensive or represented cancelable operations (like HTTP requests),
switchMap
would help manage these resources efficiently by canceling obsolete operations.
Emission Sequence:
Time 0ms:
The interval starts.
Time 1000ms:
Interval emits 0.
switchMap
starts calculating factorial(1).
Time 1500ms:
Factorial(1) calculation completes and emits 1.
Time 2000ms:
Interval emits 1.
Previous calculation (if still ongoing) would be canceled.
switchMap
starts calculating factorial(2).
Time 2500ms:
Factorial(2) calculation completes and emits 2.
Time 3000ms:
Interval emits 2.
switchMap
starts calculating factorial(3).
Time 3500ms:
Factorial(3) calculation completes and emits 6.
Time 4000ms:
Interval emits 3.
switchMap
starts calculating factorial(4).
Time 4500ms:
Factorial(4) calculation completes and emits 24.
Time 5000ms:
Interval emits 4 (last emission due to
take(5)
).switchMap
starts calculating factorial(5).
Time 5500ms:
Factorial(5) calculation completes and emits 120.
The observable completes as there are no more emissions from the source interval.
Operator 8: concatMap
for Mapping and Concatenating in Order 📏
The concatMap operator maps each value to an Observable, then flattens all of these inner Observables in order, waiting for each one to complete before moving to the next.
When to use?
Use concatMap
when you need to perform operations in sequence, ensuring that each operation completes before the next one starts.
Let's create an example where we use concatMap
to perform sequential calculations:
CODE:
import { of } from "rxjs";
import { concatMap, delay } from "rxjs/operators";
const source$ = of(1, 2, 3);
source$
.pipe(
concatMap((n) => {
console.log(`Starting calculation for ${n}`);
return of(`Result: ${n * 2}`).pipe(delay(2000));
}),
)
.subscribe((result) => console.log(result))
}
// Output
Starting calculation for 1
(2 seconds later)
Result: 2
Starting calculation for 2
(2 seconds later)
Result: 4
Starting calculation for 3
(2 seconds later)
Result: 6
switchMap
vs concatMap
Both concatMap
and switchMap
are operators in RxJS that help transform and flatten Observables, but they have different use cases based on how they handle emissions from the source Observable. Here's a breakdown of when to use each operator:
switchMap
Definition: switchMap
maps each value from the source Observable to a new inner Observable. If a new value arrives from the source Observable while the previous inner Observable is still emitting, switchMap
will unsubscribe from the previous inner Observable and subscribe to the new one.
When to Use?
Latest Value Only: Use
switchMap
when you are only interested in the most recent value emitted by the inner Observable.Cancel Previous Requests: This operator is ideal for scenarios like user input (e.g., search boxes) where the user may type quickly and want to cancel the previous request and only respond to the latest input.
concatMap
Definition: concatMap
also maps each value from the source Observable to a new inner Observable. However, it queues the inner Observables, subscribing to each one sequentially. It waits for each inner Observable to complete before moving on to the next one.
When to Use?
Maintain Order: Use
concatMap
when the order of emissions is important, and you want to ensure that each inner Observable completes before the next one starts.Sequential Processing: This operator is suitable for scenarios where you need to perform actions in a sequence, such as chaining API calls where each call depends on the result of the previous one.
Avoid Overlapping: Use it in cases where you want to prevent overlapping emissions and ensure that each operation completes before starting the next.
Conclusion: The RxJS Journey Continues 🌈
We've come a long way from our simple arithmetic operations, haven't we? RxJS is a powerful tool that can simplify complex asynchronous operations, making your code more readable, maintainable, and fun to write!
Remember, like any powerful tool, RxJS has its learning curve. But with practice and curiosity, you'll find yourself reaching for it more and more. Whether you're handling user inputs, managing API calls, or processing streams of data, RxJS has got your back. So, the next time you find yourself tangled in a web of callbacks or drowning in a sea of promises, remember: there's probably an RxJS operator for that!
Keep exploring, keep coding, and most importantly, keep having fun with reactive programming! 🚀🎉 Happy coding…!