RxJS Guide for Beginners ๐
Introduction
I am Suyash Patil, a front-end developer intern at Fyle. I stumbled upon Angular when I started my internship and have been learning it since then. The mobile app uses RxJS heavily, a library for reactive programming. For those who are not familiar with reactive programming, it is a way to do asynchronous programming in Angular. In this guide, I will cover some of the basic concepts of RxJS including Observables, Streams, Pipe, Subscriptions, and various map operators like map, concatMap, mergeMap, exhaustMap, and switchMap. I will also discuss forkJoin, which is used for parallel requests. I will also cover Subjects which is used to create hot observables.
What are Observables and Streams?
Streams and observables are fundamental concepts in reactive programming. Reactive Programming is when an event is emitted by a component, then the changes will be propagated to other components that are subscribed to the event. Streams are values that change over time, and they can be created using functions like setInterval that return an incremental value at specified intervals.
Observables, on the other hand, are a way to work with these streams. They are functions that allow you to operate on the streams, including combining multiple streams, subscribing to them, and more. Observables make it easier to manage the flow of data in real-time applications by providing a consistent and flexible way to handle asynchronous data streams.
To create an observable, you can assign a basic stream to it. For example, here's how you can create an observable using a simple interval stream that emits a value every second:
const observable$ = of(1,2,3,4,5);
// of is a method to define a basic stream. Note the '$' at the end denotes that it is observable.
// You can also define an observable that emits value per second.
const time$ = interval(1000);
Pipe
The pipe method is a powerful feature in RxJS that allows developers to perform multiple operations on an observable. These operations can include mapping, filtering, flattening, and more, and they are performed in a chain.
Here's an example of how the pipe
method is used to apply multiple operators to an observable:
const observable1$ = of(1,2,3,4,5);
observable1$.pipe(
map((value) => value * 2)
).subscribe(console.log);
pipe
is heavily used inside the Fyle mobile application codebase. It is used in add-edit-expense in multiple instances.
One such method that uses this is getActiveCategories()
.
getActiveCategories() {
const allCategories$ = this.categoriesService.getAll();
return allCategories$.pipe(map((categories) => this.categoriesService.filterRequired(categories)));
}
Subscription
Now that we have created an observable, we need to attach a method to it to listen for emitted values. The subscribe
method allows us to do just that. Here's an example:
time$.subscribe(console.log)
/*
* 0
* 1
* 2
* 3
* .. every 1 second
*/
By using the subscribe
method, developers can easily listen to emitted values from an observable and perform actions based on those values. This makes it easier to manage the flow of data in real-time applications and create powerful reactive workflows.
It used in add-edit-expense in the mobile application repository:
setCategoryFromVendor(defaultCategory) {
this.categoriesService.getCategoryByName(defaultCategory)
.subscribe((category) => {
if (category) {
this.trackingService.setCategoryFromVendor(category);
this.fg.controls.category.patchValue(category);
}
});
}
A Problem with Subscriptions - shareReplay
While subscribing to an API multiple times is possible, it can be an expensive operation as it results in multiple calls to the API. Fortunately, there is a more efficient way to handle this situation. By using the shareReplay()
method, we can replay the last values instead of making additional calls to the API.
To clarify, the shareReplay()
method allows us to buffer and replay the last emitted value to new subscribers. For example, if we specify 1
as the argument for shareReplay()
, only the latest emitted value will be buffered and replayed to new subscribers.
This can help reduce the number of API calls we need to make, saving time and resources.
const observable1$ = of(1,2,3,4,5);
const multipleOfTwo$ = observable1$.pipe(
map((value) => value * 2),
shareReplay(1)
);
// On the first time, it will subscribe to all the values
multipleOfTwo$.subscribe(console.log);
/*
* 2
* 4
* 6
* 8
* 10
*/
multipleOfTwo$.subscribe(console.log);
/*
* It will only subscribe to the last value of the stream
* 10
*
* You can specify how many values to subscribe to the arguments of shareReplay
*/
We have used shareReplay on the sign-in page to avoid calling the API multiple times.
async checkIfEmailExists() {
if (this.fg.controls.email.valid) {
this.emailLoading = true;
const checkEmailExists$ = this.routerAuthService.
checkEmailExists(this.fg.controls.email.value).pipe(
shareReplay(1),
finalize(async () => {
this.emailLoading = false;
})
);
const saml$ = checkEmailExists$.
pipe(filter((res) => (res.saml ? true : false)));
const basicSignIn$ = checkEmailExists$.
pipe(filter((res) => (!res.saml ? true : false)));
basicSignIn$.subscribe({
next: () => (this.emailSet = true),
error: (err) => this.handleError(err),
});
saml$.subscribe({
next: (res) => this.handleSamlSignIn(res),
error: (err) => this.handleError(err),
});
} else {
this.fg.controls.email.markAsTouched();
}
}
By using shareReplay()
, we ensure that API is called only once. We can then operate on saml$
and basicSignIn$
ensuring that the API is called only once.
Take
The take operator can be used within the pipe
method to limit the number of values emitted by an observable to a specific count. This allows us to control the output of the observable and avoid unnecessary emissions.
For example, we can use take(5)
to emit only the first 5 values from the observable. This is especially useful when we have a large dataset or a continuous stream of data and want to limit the number of items processed at once.
By using the take
operator, we can reduce the load on the system and improve the overall performance of our application.
const obs$ = of(1,2,3,4,5);
const take2$ = obs$.pipe(take(2));
take2$.subscribe(console.log);
/*
* Only returns first values
* 1
* 2
*/
take
is used in add-edit-expense
component to extract only the required values from an observable.
this.txnFields$
.pipe(
distinctUntilChanged((a, b) => isEqual(a, b)),
switchMap((txnFields) =>
forkJoin({
isConnected: this.isConnected$.pipe(take(1)),
// only the first value is considered
orgSettings: this.orgSettingsService.get(),
costCenters: this.costCenters$,
taxGroups: this.taxGroups$,
isIndividualProjectsEnabled: this.isIndividualProjectsEnabled$,
individualProjectIds: this.individualProjectIds$,
filteredCategories: this.filteredCategories$.pipe(take(1)),
}).pipe(...)
)
)
.subscribe(...);
Map operators
map
The map operator is used to transform each value emitted by an observable into a new value. It applies a projection to each value and emits the transformed value as a new observable.
For example, we can use the map
operator to convert a stream of numeric values into their corresponding string representation or to extract a specific property from a stream of objects.
By using the map
operator, we can modify the values emitted by an observable to match our desired format or structure, enabling us to process the data more efficiently and effectively.
const observable1$ = of(1,2,3,4,5);
const multipleOfTwo$ = observable1$.pipe(
map((value) => value * 2)
shareReplay(2)
);
multipleOfTwo$.subscribe(console.log);
// We can map each value from the stream and multiply it by 2 and emit it
map
is used in new-password
component in multiple instances. One such instance is shown below:
this.lengthValidationDisplay$ = this.fg.controls.password.valueChanges.pipe(
map((password) => password && password.length >= 12 && password.length <= 32)
);
concatMap
The concatMap operator is used to sequentially map each value emitted by an observable. It ensures that the observable values are processed in order, one at a time, before moving on to the next value.
This operator is particularly useful in scenarios where we need to update data on a server in a sequential manner. For instance, if we have a form where user input needs to be saved to a server after each entry, we can use concatMap
to ensure that each entry is processed in sequence.
Consider the example of a form where user input is saved to a server. By using concatMap
, we can ensure that each input is processed and saved to the server in the order it was entered. This helps to avoid issues with data consistency and ensures that the form data is accurately saved on the server.
saveChanges(changes) {
...call API to save data
}
this.form.valueChanges.pipe(
concatMap((changes) => this.saveChanges(changes))
).subscribe(console.log);
For instance, if a user types "Example" in a form, the concatMap
operator will save each character one at a time, starting with 'E', then 'x', and so on until the entire input has been processed. One such use case of concatMap is in transaction service inside the mobile application.
deleteBulk(txnIds: string[]): Observable<Transaction[]> {
const chunkSize = 10;
const count = txnIds.length > chunkSize ?
txnIds.length / chunkSize : 1;
return range(0, count).pipe(
concatMap((page) => {
const filteredtxnIds = txnIds.slice(chunkSize * page,
chunkSize * page + chunkSize);
return this.apiService.post('/transactions/delete/bulk', {
txn_ids: filteredtxnIds,
});
}),
reduce((acc, curr) => acc.concat(curr), [] as Transaction[])
);
}
The concatMap
operator waits for the inner Observable to complete before it emits the next number from the range
Observable. This ensures that the API calls are made sequentially and in order. Once all the requests have been completed, the reduce
operator combines all the emitted values into a single array and returns an Observable of the deleted transactions.
mergeMap
The mergeMap operator is used to map each value emitted by an observable in parallel. This means that all values are processed concurrently, without waiting for the previous value to complete processing.
saveChanges(changes) {
...call API to save data
}
this.form.valueChanges.pipe(
mergeMap((changes) => this.saveChanges(changes))
).subscribe(console.log)
If a user types 'Example' in a form and mergeMap
is used to process the input, API calls will be generated in parallel for each character as soon as the user types it. Unlike concatMap
, which waits for each character to be saved before processing the next one, mergeMap
processes all characters concurrently.
const numbers$ = of(1,2,3,4,5);
const alphabets$ = of('a','b','c','d');
alphabets$.pipe(
mergeMap(x => {
return numbers$.pipe(
map(i => i + x)
)
})
).subscribe(console.log);
/*
1a
2a
3a
4a
5a
1b
2b
...
5d
*/
exhaustMap
The exhaustMap operator is used to ignore projected observables that start before their preceding observable has been completed. This means that if a new observable is emitted before the previous one has been completed, it will be ignored until the previous one finishes processing.
Consider the following scenario: we have a button that calls a server and saves data every time it is clicked. If a user clicks the button multiple times in quick succession, it can result in multiple server calls being made before the previous ones have finished processing. This can lead to data inconsistencies and errors.
To prevent this, we can use exhaustMap
to ignore any new observable emissions that occur before the previous one has completed processing. This ensures that each server call is processed sequentially and prevents data inconsistencies.
switchMap
The switchMap operator is used to emit values only from the most recently projected observable. It is useful when we only care about the latest value from the inner observable and want to cancel any previous inner observables.
Consider the following code example:
const source$ = interval(1000);
source$.pipe(
switchMap(value => of(value * 2))
).subscribe(console.log);
/*
0
2
4
6
...
*/
In this example, switchMap
is used to process only the latest value emitted by the observable. Any previous inner observables that have not completed processing will be canceled, and only the most recent ones will be processed.
switchMap
is useful in scenarios where we need to prioritize the latest data and ensure that only the most recent observable is processed. This can be useful in real-time data processing scenarios, such as chat applications or real-time analytics.
switchMap
is used in fy-select-project-modal
. The filteredOptions$
and recentrecentlyUsedItems$
observables are updated based on the input provided by the user. The switchMap()
operator is used to switch to the new Observable returned by the getProjects()
and getRecentlyUsedItems()
methods, thus canceling any previous subscriptions to the Observable.
ngAfterViewInit() {
this.filteredOptions$ = fromEvent(this.searchBarRef.nativeElement, 'keyup').pipe(
map((event: any) => event.srcElement.value),
startWith(''),
distinctUntilChanged(),
switchMap((searchText) => this.getProjects(searchText)),
map((projects: any[]) =>
projects.map((project) => {
if (isEqual(project.value, this.currentSelection)) {
project.selected = true;
}
return project;
})
)
);
this.recentrecentlyUsedItems$ = fromEvent(this.searchBarRef.nativeElement, 'keyup').pipe(
map((event: any) => event.srcElement.value),
startWith(''),
distinctUntilChanged(),
switchMap((searchText) =>
this.getRecentlyUsedItems().pipe(
// filtering of recently used items wrt searchText is taken care in service method
this.utilityService.searchArrayStream(searchText)
)
)
);
this.cdr.detectChanges();
}
This is useful in scenarios where the user types in multiple search queries in quick succession, as it ensures that the search result for the most recent query is displayed and avoids any race condition issues.
forkJoin
The forkJoin
operator is used to combine multiple observables and output their values only when all input observables have been completed. It accepts an array of input observables and returns an observable that emits an array of values in the same order as the input observables.
forkJoin is particularly useful when we need to make parallel requests and wait for all of them to complete before processing the results. It is commonly used in scenarios where we must retrieve data from multiple sources, such as APIs or databases, and combine them into a single response.
Consider the following example:
const obs1$ = of([1,2,3,4]);
const obs2$ = of([5,6,7,8]);
forkJoin(obs1$, obs2$).subscribe(console.log);
/*
* 0:(4) [1, 2, 3, 4]
* 1:(4) [5, 6, 7, 8]
*/
It is used in device service to get the device information.
getDeviceInfo(): Observable<ExtendedDeviceInfo> {
return forkJoin({
deviceInfo: Device.getInfo(),
deviceId: Device.getId(),
appInfo: this.getAppInfo(),
}).pipe(
map(({ deviceInfo, deviceId, appInfo }) =>
Object.assign(deviceInfo, deviceId, {
appVersion: appInfo.version,
liveUpdateAppVersion: environment.LIVE_UPDATE_APP_VERSION,
})
)
);
}
Zip
While forkJoin
is useful for combining multiple observables and emitting their final values, it only emits the last value of each observable. This can be a problem if we need to operate on the entire stream of data.
To overcome this limitation, we can use the zip
operator. zip works similarly to forkJoin
by accepting an array of input observables and returning an observable that emits an array of values. However, unlike forkJoin
, zip
emits all the values of each observable in the same order.
Consider the following example:
const id$ = of(1,2,3);
const age$ = of(21,34,50);
const name$ = of('Jack', 'John', 'Jill');
zip(id$, age$, name$).subscribe((val) => console.log(val))
/*
* [1, 21, "Jack"]
* [2, 34, "John"]
* [3, 50, "Jill"]
------NOTE-----
* If we have another value 4 in id$, then it doesn't get emitted as values are taken in the order in zip method
*/
Subjects
In previous sections, we learned various methods to create observables such as of
, interval
, and Observable.create
. However, in certain scenarios, we may need to create an observable that can be subscribed to multiple times and emits new values dynamically.
This is where the Subject
comes in. A Subject is a type of observable that allows us to manually emit values to multiple subscribers. It acts as both an observer and an observable, making it easy to multicast values to multiple subscribers.
To create a Subject
, we can use the new Subject()
constructor. We can then emit new values to the subject using the next
method, and subscribers will receive those values via the subscribe
method.
Here's an example:
const subject = new Subject();
subject.subscribe(console.log);
subject.next(5);
subject.subscribe(console.log);
// Prints 5 only once
Observers receive data when it is produced, but they cannot actively push values to an Observable. Subjects, on the other hand, are both observable and observer, and can both emit and receive values. You can use next()
to push new values to a subject and use subscribe()
for a subscription. This method often causes leaky abstraction. To prevent this we have a method - asObservable
.
asObservable()
In RxJS, the asObservable
method is used to convert a Subject
into a read-only observable. This method creates a new observable that can be subscribed to, but it doesn't provide access to the next
method of the original Subject
.
const subject = new Subject();
subject.next(1);
subject.next(2);
const observable$ = subject.asObservable();
observable$.next() -> Throws an error
observable$.subscribe() -> Works fine
asObservable
find its application in tasks service where totalTaskCount is returned as an observable.
getTotalTaskCount() {
return this.totalTaskCount$.asObservable();
}
BehaviorSubject
With BehaviorSubject, we can emit the initial value and its current value whenever it is subscribed to. One of the benefits of using this is that it provides us with a method - getValue
which is used to get the current value of the subject.
const sub = new BehaviorSubject();
sub.next(1);
sub.next(2);
sub.subscribe((val) => console.log('first', val)); // Emits 2 and subsequent values
sub.next(3);
sub.subscribe((val) => console.log('second', val)); // Emits 3 and subs
sub.next(4);
/*
* first 2
* first 3
* second 3
* first 4
* second 4
*/
In the Fyle mobile application, the Tasks service uses BehaviorSubject. This acts as a mini store for storing the application state.
class TaskService {
totalTaskCount$: BehaviorSubject<number> = new BehaviorSubject(0);
// We pass an initial value to the subject
...
getTotalTaskCount() {
return this.totalTaskCount$.asObservable();
}
// We have made the subject observable to prevent leaking of the 'observer side' of the subject
...
}
Then inside the โmy advancesโ page, we use this service to get the total count of tasks.
totalTaskCount: number;
this.tasksService.getTotalTaskCount().subscribe((totalTaskCount) => (this.totalTaskCount = totalTaskCount));
BehaviorSubject
is useful when you need to initialize an observable with a value. By providing an initial value when creating the BehaviorSubject
, you can ensure that new subscribers receive a value immediately, rather than having to wait for the first value to be emitted.
AsyncSubject
When dealing with long-running calculations, it may be necessary to retrieve only the final result while discarding intermediary values. This is where AsyncSubject comes into play.
AsyncSubject is a type of Subject in RxJS that only emits the final value of an observable sequence, and only when the sequence has been completed. This means that by using AsyncSubject, developers can obtain the final result of a calculation without having to receive and process all intermediary values, thereby improving application performance.
const subject = new AsyncSubject();
subject.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
// Only 3 get printed out
// If we remove the subject.complete() nothing will be printed as execution is in progress
All the other subscriptions also emit the last values even when the subject execution is completed.
const subject = new AsyncSubject();
subject.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
subject.next(4);
subject.subscribe(console.log);
// Console -
// 3
// 3
ReplaySubject
ReplaySubject is used to replay previous values to new subscribers. Letโs understand it with the following code
const subject = new ReplaySubject();
subject.subscribe((val) => console.log('first', val));
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe((val) => console.log('second', val));
subject.next(4)
/*
* first 1
* first 2
* first 3
* second 1
* second 2
* second 3
* first 4
* second 4
*/
As you can see, the second subscription can access previously sent values.
In conclusion, RxJS is a powerful library for reactive programming in Angular. It provides a way to operate on streams of data like combining multiple streams, how subscribing to them, and many more. In this guide, we covered some of the basic concepts of RxJS including Observables, Streams, Pipe, Subscriptions, and various map operators like map, concatMap, mergeMap, exhaustMap, and switchMap. We also discussed various Subjects. By using RxJS, you can create more efficient and responsive applications in Angular.