Angular RxJS Patterns
Master RxJS in Angular for handling async operations, data streams, and reactive programming patterns.
Observable Creation
Basic Observable Creation
import { Observable, of, from, interval, fromEvent } from 'rxjs';
// of - emit values in sequence const numbers$ = of(1, 2, 3, 4, 5);
// from - convert array, promise, or iterable const fromArray$ = from([1, 2, 3]); const fromPromise$ = from(fetch('/api/data'));
// interval - emit numbers at intervals const timer$ = interval(1000); // Every second
// fromEvent - DOM events const clicks$ = fromEvent(document, 'click');
// Custom observable const custom$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.complete(); });
HttpClient Observables
import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { Observable } from 'rxjs';
@Injectable({ providedIn: 'root' }) export class DataService { constructor(private http: HttpClient) {}
getData(): Observable<Data[]> { return this.http.get<Data[]>('/api/data'); }
getItem(id: string): Observable<Data> {
return this.http.get<Data>(/api/data/${id});
}
createItem(data: Data): Observable<Data> { return this.http.post<Data>('/api/data', data); }
updateItem(id: string, data: Data): Observable<Data> {
return this.http.put<Data>(/api/data/${id}, data);
}
deleteItem(id: string): Observable<void> {
return this.http.delete<void>(/api/data/${id});
}
}
Common Operators
Transformation Operators
import { map, pluck, switchMap, mergeMap, concatMap } from 'rxjs/operators'; import { of } from 'rxjs';
// map - transform values const numbers$ = of(1, 2, 3).pipe( map(n => n * 2) // 2, 4, 6 );
// pluck - extract property (deprecated, use map) const users$ = of( { name: 'John', age: 30 }, { name: 'Jane', age: 25 } ).pipe( map(user => user.name) // 'John', 'Jane' );
// switchMap - cancel previous, emit new searchControl.valueChanges.pipe( switchMap(term => this.searchService.search(term)) ).subscribe(results => { this.results = results; });
// mergeMap - run in parallel const ids$ = of(1, 2, 3); ids$.pipe( mergeMap(id => this.getUser(id)) // All requests in parallel ).subscribe();
// concatMap - run in sequence ids$.pipe( concatMap(id => this.getUser(id)) // One at a time ).subscribe();
Filtering Operators
import { filter, take, takeUntil, takeWhile, distinctUntilChanged } from 'rxjs/operators';
// filter - only emit matching values of(1, 2, 3, 4, 5).pipe( filter(n => n % 2 === 0) // 2, 4 );
// take - first N values interval(1000).pipe( take(5) // First 5 emissions );
// takeUntil - until another observable emits const destroy$ = new Subject(); source$.pipe( takeUntil(destroy$) ).subscribe();
// distinctUntilChanged - skip duplicate consecutive values of(1, 1, 2, 2, 3, 3).pipe( distinctUntilChanged() // 1, 2, 3 );
Combination Operators
import { combineLatest, merge, concat, forkJoin, zip } from 'rxjs'; import { startWith } from 'rxjs/operators';
// combineLatest - emit when any source emits combineLatest([ this.user$, this.settings$ ]).pipe( map(([user, settings]) => ({ user, settings })) ).subscribe();
// merge - emit from any source merge( this.clicks$, this.hovers$ ).subscribe();
// concat - emit in sequence concat( this.loadUser$, this.loadSettings$ ).subscribe();
// forkJoin - wait for all to complete forkJoin({ user: this.getUser(), posts: this.getPosts(), comments: this.getComments() }).subscribe(({ user, posts, comments }) => { // All complete });
// zip - pair values from sources
zip(
of(1, 2, 3),
of('a', 'b', 'c')
).pipe(
map(([num, letter]) => ${num}${letter})
); // '1a', '2b', '3c'
Utility Operators
import { tap, delay, debounceTime, throttleTime, distinctUntilChanged } from 'rxjs/operators';
// tap - side effects (logging, etc.) source$.pipe( tap(value => console.log('Value:', value)), map(value => value * 2) );
// delay - delay emissions of(1, 2, 3).pipe( delay(1000) // Delay 1 second );
// debounceTime - wait for pause in emissions searchControl.valueChanges.pipe( debounceTime(300) // Wait 300ms after user stops typing );
// throttleTime - emit first value, ignore for duration clicks$.pipe( throttleTime(1000) // Only once per second );
// distinctUntilChanged - skip duplicates input$.pipe( distinctUntilChanged() // Only when value changes );
Error Handling
catchError - Handle Errors
import { catchError } from 'rxjs/operators'; import { of, EMPTY, throwError } from 'rxjs';
// Return fallback value this.http.get('/api/data').pipe( catchError(error => { console.error('Error:', error); return of([]); // Return empty array }) );
// Return empty observable source$.pipe( catchError(() => EMPTY) // Complete without emitting );
// Re-throw error source$.pipe( catchError(error => { console.error('Error:', error); return throwError(() => new Error('Custom error')); }) );
// Handle different error types source$.pipe( catchError(error => { if (error.status === 404) { return of(null); } return throwError(() => error); }) );
retry and retryWhen
import { retry, retryWhen, delay, take } from 'rxjs/operators';
// Simple retry this.http.get('/api/data').pipe( retry(3) // Retry up to 3 times );
// Retry with delay this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( delay(1000), // Wait 1 second take(3) // Max 3 retries ) ) );
// Exponential backoff this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( mergeMap((error, index) => { if (index >= 3) { return throwError(() => error); } const delayMs = Math.pow(2, index) * 1000; return of(error).pipe(delay(delayMs)); }) ) ) );
Subscription Management
Manual Subscription Cleanup
import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs';
@Component({ selector: 'app-my-component' }) export class MyComponent implements OnDestroy { private subscription = new Subscription();
ngOnInit() { // Add subscriptions this.subscription.add( this.data$.subscribe(data => { this.data = data; }) );
this.subscription.add(
this.other$.subscribe(other => {
this.other = other;
})
);
}
ngOnDestroy() { // Unsubscribe from all this.subscription.unsubscribe(); } }
takeUntil Pattern
import { Component, OnDestroy } from '@angular/core'; import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators';
@Component({ selector: 'app-my-component' }) export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>();
ngOnInit() { this.data$.pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; });
this.other$.pipe(
takeUntil(this.destroy$)
).subscribe(other => {
this.other = other;
});
}
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } }
Async Pipe (No Manual Unsubscribe)
import { Component } from '@angular/core'; import { Observable } from 'rxjs';
@Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div>
<div *ngIf="loading$ | async">Loading...</div>
<div *ngIf="error$ | async as error">Error: {{ error }}</div>
` }) export class UserListComponent { users$: Observable<User[]>; loading$: Observable<boolean>; error$: Observable<string | null>;
constructor(private userService: UserService) { this.users$ = this.userService.getUsers(); this.loading$ = this.userService.loading$; this.error$ = this.userService.error$; } }
Subjects
Subject - Multicast
import { Subject } from 'rxjs';
const subject = new Subject<number>();
// Multiple subscribers subject.subscribe(val => console.log('A:', val)); subject.subscribe(val => console.log('B:', val));
subject.next(1); // A: 1, B: 1 subject.next(2); // A: 2, B: 2
BehaviorSubject - Current Value
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject<number>(0); // Initial value
subject.subscribe(val => console.log('A:', val)); // A: 0
subject.next(1); // A: 1 subject.next(2); // A: 2
subject.subscribe(val => console.log('B:', val)); // B: 2 (latest value)
// Common pattern for state management @Injectable({ providedIn: 'root' }) export class StateService { private stateSubject = new BehaviorSubject<State>(initialState); state$ = this.stateSubject.asObservable();
updateState(newState: State) { this.stateSubject.next(newState); }
get currentState(): State { return this.stateSubject.value; } }
ReplaySubject - Buffer Values
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject<number>(2); // Buffer last 2 values
subject.next(1); subject.next(2); subject.next(3);
subject.subscribe(val => console.log('A:', val)); // A: 2, A: 3
subject.next(4); // A: 4
subject.subscribe(val => console.log('B:', val)); // B: 3, B: 4
AsyncSubject - Last Value on Complete
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject<number>();
subject.subscribe(val => console.log('A:', val));
subject.next(1); subject.next(2); subject.next(3); subject.complete(); // A: 3 (only last value when complete)
Hot vs Cold Observables
Cold Observable - Unicast
// Each subscription creates new execution const cold$ = interval(1000);
cold$.subscribe(val => console.log('A:', val)); // A: 0, 1, 2... setTimeout(() => { cold$.subscribe(val => console.log('B:', val)); // B: 0, 1, 2... (separate execution) }, 2000);
Hot Observable - Multicast
import { Subject, interval } from 'rxjs'; import { share, shareReplay } from 'rxjs/operators';
// Using Subject const subject = new Subject(); const source$ = interval(1000); source$.subscribe(subject);
subject.subscribe(val => console.log('A:', val)); // A: 0, 1, 2... setTimeout(() => { subject.subscribe(val => console.log('B:', val)); // B: 2, 3, 4... (shared) }, 2000);
// Using share operator const shared$ = interval(1000).pipe(share());
shared$.subscribe(val => console.log('A:', val)); setTimeout(() => { shared$.subscribe(val => console.log('B:', val)); // Shares source }, 2000);
// Using shareReplay const cached$ = this.http.get('/api/data').pipe( shareReplay(1) // Cache last 1 value );
// Multiple subscribers get cached result cached$.subscribe(); cached$.subscribe(); // No second HTTP request
RxJS in Services
Data Service with State
import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { BehaviorSubject, Observable } from 'rxjs'; import { tap, catchError, finalize } from 'rxjs/operators';
@Injectable({ providedIn: 'root' }) export class UserService { private usersSubject = new BehaviorSubject<User[]>([]); private loadingSubject = new BehaviorSubject<boolean>(false); private errorSubject = new BehaviorSubject<string | null>(null);
users$ = this.usersSubject.asObservable(); loading$ = this.loadingSubject.asObservable(); error$ = this.errorSubject.asObservable();
constructor(private http: HttpClient) {}
loadUsers(): void { this.loadingSubject.next(true); this.errorSubject.next(null);
this.http.get<User[]>('/api/users').pipe(
tap(users => this.usersSubject.next(users)),
catchError(error => {
this.errorSubject.next(error.message);
return of([]);
}),
finalize(() => this.loadingSubject.next(false))
).subscribe();
}
getUser(id: string): Observable<User> {
return this.http.get<User>(/api/users/${id});
}
}
Search Service with Debounce
import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { Observable, Subject } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
@Injectable({ providedIn: 'root' }) export class SearchService { private searchTerms = new Subject<string>();
results$: Observable<SearchResult[]>;
constructor(private http: HttpClient) { this.results$ = this.searchTerms.pipe( debounceTime(300), distinctUntilChanged(), switchMap(term => this.search(term)) ); }
search(term: string): Observable<SearchResult[]> {
if (!term.trim()) {
return of([]);
}
return this.http.get<SearchResult[]>(/api/search?q=${term});
}
setSearchTerm(term: string): void { this.searchTerms.next(term); } }
Testing RxJS
Testing Observables
import { TestBed } from '@angular/core/testing'; import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
describe('UserService', () => { let service: UserService; let httpMock: HttpTestingController;
beforeEach(() => { TestBed.configureTestingModule({ imports: [HttpClientTestingModule], providers: [UserService] });
service = TestBed.inject(UserService);
httpMock = TestBed.inject(HttpTestingController);
});
afterEach(() => { httpMock.verify(); });
it('should fetch users', () => { const mockUsers = [{ id: 1, name: 'John' }];
service.getUsers().subscribe(users => {
expect(users).toEqual(mockUsers);
});
const req = httpMock.expectOne('/api/users');
expect(req.request.method).toBe('GET');
req.flush(mockUsers);
}); });
Testing with Marble Diagrams
import { TestScheduler } from 'rxjs/testing';
describe('Marble tests', () => { let scheduler: TestScheduler;
beforeEach(() => { scheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); });
it('should debounce', () => { scheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '-----c|';
const result$ = source$.pipe(debounceTime(20));
expectObservable(result$).toBe(expected);
});
}); });
When to Use This Skill
Use angular-rxjs-patterns when building modern, production-ready applications that require:
-
Complex async data flows
-
Real-time updates and streaming data
-
Efficient HTTP request management
-
Form input handling with debouncing
-
State management with observables
-
Error handling and retry logic
-
Combining multiple async sources
-
Memory-safe subscription management
RxJS Best Practices in Angular
-
Use async pipe - Automatic subscription management
-
takeUntil for cleanup - Unsubscribe in ngOnDestroy
-
shareReplay for caching - Avoid duplicate HTTP requests
-
debounceTime for inputs - Reduce API calls
-
switchMap for cancellation - Cancel old requests
-
catchError for errors - Always handle errors
-
BehaviorSubject for state - Share current state
-
Avoid nested subscriptions - Use operators instead
-
Use operators over imperative code - More declarative
-
Test observables properly - Use marble diagrams
Common RxJS Mistakes
-
Not unsubscribing - Memory leaks
-
Nested subscriptions - Callback hell
-
Not using operators - Imperative instead of declarative
-
Subscribing in services - Return observables instead
-
Not handling errors - Silent failures
-
Using Subject incorrectly - Prefer BehaviorSubject for state
-
Not using shareReplay - Duplicate HTTP requests
-
Forgetting to complete subjects - Memory leaks
-
Using subscribe in templates - Use async pipe
-
Not understanding hot vs cold - Unexpected behavior
Resources
-
RxJS Official Documentation
-
RxJS Operators
-
Angular HttpClient
-
Reactive Programming with RxJS
-
RxMarbles - Visual Operator Reference
-
Learn RxJS