angular-rxjs-patterns

Angular RxJS Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "angular-rxjs-patterns" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-angular-rxjs-patterns

Angular RxJS Patterns

Master RxJS in Angular for handling async operations, data streams, and reactive programming patterns.

Observable Creation

Basic Observable Creation

import { Observable, of, from, interval, fromEvent } from 'rxjs';

// of - emit values in sequence const numbers$ = of(1, 2, 3, 4, 5);

// from - convert array, promise, or iterable const fromArray$ = from([1, 2, 3]); const fromPromise$ = from(fetch('/api/data'));

// interval - emit numbers at intervals const timer$ = interval(1000); // Every second

// fromEvent - DOM events const clicks$ = fromEvent(document, 'click');

// Custom observable const custom$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.complete(); });

HttpClient Observables

import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { Observable } from 'rxjs';

@Injectable({ providedIn: 'root' }) export class DataService { constructor(private http: HttpClient) {}

getData(): Observable<Data[]> { return this.http.get<Data[]>('/api/data'); }

getItem(id: string): Observable<Data> { return this.http.get<Data>(/api/data/${id}); }

createItem(data: Data): Observable<Data> { return this.http.post<Data>('/api/data', data); }

updateItem(id: string, data: Data): Observable<Data> { return this.http.put<Data>(/api/data/${id}, data); }

deleteItem(id: string): Observable<void> { return this.http.delete<void>(/api/data/${id}); } }

Common Operators

Transformation Operators

import { map, pluck, switchMap, mergeMap, concatMap } from 'rxjs/operators'; import { of } from 'rxjs';

// map - transform values const numbers$ = of(1, 2, 3).pipe( map(n => n * 2) // 2, 4, 6 );

// pluck - extract property (deprecated, use map) const users$ = of( { name: 'John', age: 30 }, { name: 'Jane', age: 25 } ).pipe( map(user => user.name) // 'John', 'Jane' );

// switchMap - cancel previous, emit new searchControl.valueChanges.pipe( switchMap(term => this.searchService.search(term)) ).subscribe(results => { this.results = results; });

// mergeMap - run in parallel const ids$ = of(1, 2, 3); ids$.pipe( mergeMap(id => this.getUser(id)) // All requests in parallel ).subscribe();

// concatMap - run in sequence ids$.pipe( concatMap(id => this.getUser(id)) // One at a time ).subscribe();

Filtering Operators

import { filter, take, takeUntil, takeWhile, distinctUntilChanged } from 'rxjs/operators';

// filter - only emit matching values of(1, 2, 3, 4, 5).pipe( filter(n => n % 2 === 0) // 2, 4 );

// take - first N values interval(1000).pipe( take(5) // First 5 emissions );

// takeUntil - until another observable emits const destroy$ = new Subject(); source$.pipe( takeUntil(destroy$) ).subscribe();

// distinctUntilChanged - skip duplicate consecutive values of(1, 1, 2, 2, 3, 3).pipe( distinctUntilChanged() // 1, 2, 3 );

Combination Operators

import { combineLatest, merge, concat, forkJoin, zip } from 'rxjs'; import { startWith } from 'rxjs/operators';

// combineLatest - emit when any source emits combineLatest([ this.user$, this.settings$ ]).pipe( map(([user, settings]) => ({ user, settings })) ).subscribe();

// merge - emit from any source merge( this.clicks$, this.hovers$ ).subscribe();

// concat - emit in sequence concat( this.loadUser$, this.loadSettings$ ).subscribe();

// forkJoin - wait for all to complete forkJoin({ user: this.getUser(), posts: this.getPosts(), comments: this.getComments() }).subscribe(({ user, posts, comments }) => { // All complete });

// zip - pair values from sources zip( of(1, 2, 3), of('a', 'b', 'c') ).pipe( map(([num, letter]) => ${num}${letter}) ); // '1a', '2b', '3c'

Utility Operators

import { tap, delay, debounceTime, throttleTime, distinctUntilChanged } from 'rxjs/operators';

// tap - side effects (logging, etc.) source$.pipe( tap(value => console.log('Value:', value)), map(value => value * 2) );

// delay - delay emissions of(1, 2, 3).pipe( delay(1000) // Delay 1 second );

// debounceTime - wait for pause in emissions searchControl.valueChanges.pipe( debounceTime(300) // Wait 300ms after user stops typing );

// throttleTime - emit first value, ignore for duration clicks$.pipe( throttleTime(1000) // Only once per second );

// distinctUntilChanged - skip duplicates input$.pipe( distinctUntilChanged() // Only when value changes );

Error Handling

catchError - Handle Errors

import { catchError } from 'rxjs/operators'; import { of, EMPTY, throwError } from 'rxjs';

// Return fallback value this.http.get('/api/data').pipe( catchError(error => { console.error('Error:', error); return of([]); // Return empty array }) );

// Return empty observable source$.pipe( catchError(() => EMPTY) // Complete without emitting );

// Re-throw error source$.pipe( catchError(error => { console.error('Error:', error); return throwError(() => new Error('Custom error')); }) );

// Handle different error types source$.pipe( catchError(error => { if (error.status === 404) { return of(null); } return throwError(() => error); }) );

retry and retryWhen

import { retry, retryWhen, delay, take } from 'rxjs/operators';

// Simple retry this.http.get('/api/data').pipe( retry(3) // Retry up to 3 times );

// Retry with delay this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( delay(1000), // Wait 1 second take(3) // Max 3 retries ) ) );

// Exponential backoff this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( mergeMap((error, index) => { if (index >= 3) { return throwError(() => error); } const delayMs = Math.pow(2, index) * 1000; return of(error).pipe(delay(delayMs)); }) ) ) );

Subscription Management

Manual Subscription Cleanup

import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs';

@Component({ selector: 'app-my-component' }) export class MyComponent implements OnDestroy { private subscription = new Subscription();

ngOnInit() { // Add subscriptions this.subscription.add( this.data$.subscribe(data => { this.data = data; }) );

this.subscription.add(
  this.other$.subscribe(other => {
    this.other = other;
  })
);

}

ngOnDestroy() { // Unsubscribe from all this.subscription.unsubscribe(); } }

takeUntil Pattern

import { Component, OnDestroy } from '@angular/core'; import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators';

@Component({ selector: 'app-my-component' }) export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>();

ngOnInit() { this.data$.pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; });

this.other$.pipe(
  takeUntil(this.destroy$)
).subscribe(other => {
  this.other = other;
});

}

ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } }

Async Pipe (No Manual Unsubscribe)

import { Component } from '@angular/core'; import { Observable } from 'rxjs';

@Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div>

&#x3C;div *ngIf="loading$ | async">Loading...&#x3C;/div>
&#x3C;div *ngIf="error$ | async as error">Error: {{ error }}&#x3C;/div>

` }) export class UserListComponent { users$: Observable<User[]>; loading$: Observable<boolean>; error$: Observable<string | null>;

constructor(private userService: UserService) { this.users$ = this.userService.getUsers(); this.loading$ = this.userService.loading$; this.error$ = this.userService.error$; } }

Subjects

Subject - Multicast

import { Subject } from 'rxjs';

const subject = new Subject<number>();

// Multiple subscribers subject.subscribe(val => console.log('A:', val)); subject.subscribe(val => console.log('B:', val));

subject.next(1); // A: 1, B: 1 subject.next(2); // A: 2, B: 2

BehaviorSubject - Current Value

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject<number>(0); // Initial value

subject.subscribe(val => console.log('A:', val)); // A: 0

subject.next(1); // A: 1 subject.next(2); // A: 2

subject.subscribe(val => console.log('B:', val)); // B: 2 (latest value)

// Common pattern for state management @Injectable({ providedIn: 'root' }) export class StateService { private stateSubject = new BehaviorSubject<State>(initialState); state$ = this.stateSubject.asObservable();

updateState(newState: State) { this.stateSubject.next(newState); }

get currentState(): State { return this.stateSubject.value; } }

ReplaySubject - Buffer Values

import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(2); // Buffer last 2 values

subject.next(1); subject.next(2); subject.next(3);

subject.subscribe(val => console.log('A:', val)); // A: 2, A: 3

subject.next(4); // A: 4

subject.subscribe(val => console.log('B:', val)); // B: 3, B: 4

AsyncSubject - Last Value on Complete

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe(val => console.log('A:', val));

subject.next(1); subject.next(2); subject.next(3); subject.complete(); // A: 3 (only last value when complete)

Hot vs Cold Observables

Cold Observable - Unicast

// Each subscription creates new execution const cold$ = interval(1000);

cold$.subscribe(val => console.log('A:', val)); // A: 0, 1, 2... setTimeout(() => { cold$.subscribe(val => console.log('B:', val)); // B: 0, 1, 2... (separate execution) }, 2000);

Hot Observable - Multicast

import { Subject, interval } from 'rxjs'; import { share, shareReplay } from 'rxjs/operators';

// Using Subject const subject = new Subject(); const source$ = interval(1000); source$.subscribe(subject);

subject.subscribe(val => console.log('A:', val)); // A: 0, 1, 2... setTimeout(() => { subject.subscribe(val => console.log('B:', val)); // B: 2, 3, 4... (shared) }, 2000);

// Using share operator const shared$ = interval(1000).pipe(share());

shared$.subscribe(val => console.log('A:', val)); setTimeout(() => { shared$.subscribe(val => console.log('B:', val)); // Shares source }, 2000);

// Using shareReplay const cached$ = this.http.get('/api/data').pipe( shareReplay(1) // Cache last 1 value );

// Multiple subscribers get cached result cached$.subscribe(); cached$.subscribe(); // No second HTTP request

RxJS in Services

Data Service with State

import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { BehaviorSubject, Observable } from 'rxjs'; import { tap, catchError, finalize } from 'rxjs/operators';

@Injectable({ providedIn: 'root' }) export class UserService { private usersSubject = new BehaviorSubject<User[]>([]); private loadingSubject = new BehaviorSubject<boolean>(false); private errorSubject = new BehaviorSubject<string | null>(null);

users$ = this.usersSubject.asObservable(); loading$ = this.loadingSubject.asObservable(); error$ = this.errorSubject.asObservable();

constructor(private http: HttpClient) {}

loadUsers(): void { this.loadingSubject.next(true); this.errorSubject.next(null);

this.http.get&#x3C;User[]>('/api/users').pipe(
  tap(users => this.usersSubject.next(users)),
  catchError(error => {
    this.errorSubject.next(error.message);
    return of([]);
  }),
  finalize(() => this.loadingSubject.next(false))
).subscribe();

}

getUser(id: string): Observable<User> { return this.http.get<User>(/api/users/${id}); } }

Search Service with Debounce

import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { Observable, Subject } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

@Injectable({ providedIn: 'root' }) export class SearchService { private searchTerms = new Subject<string>();

results$: Observable<SearchResult[]>;

constructor(private http: HttpClient) { this.results$ = this.searchTerms.pipe( debounceTime(300), distinctUntilChanged(), switchMap(term => this.search(term)) ); }

search(term: string): Observable<SearchResult[]> { if (!term.trim()) { return of([]); } return this.http.get<SearchResult[]>(/api/search?q=${term}); }

setSearchTerm(term: string): void { this.searchTerms.next(term); } }

Testing RxJS

Testing Observables

import { TestBed } from '@angular/core/testing'; import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';

describe('UserService', () => { let service: UserService; let httpMock: HttpTestingController;

beforeEach(() => { TestBed.configureTestingModule({ imports: [HttpClientTestingModule], providers: [UserService] });

service = TestBed.inject(UserService);
httpMock = TestBed.inject(HttpTestingController);

});

afterEach(() => { httpMock.verify(); });

it('should fetch users', () => { const mockUsers = [{ id: 1, name: 'John' }];

service.getUsers().subscribe(users => {
  expect(users).toEqual(mockUsers);
});

const req = httpMock.expectOne('/api/users');
expect(req.request.method).toBe('GET');
req.flush(mockUsers);

}); });

Testing with Marble Diagrams

import { TestScheduler } from 'rxjs/testing';

describe('Marble tests', () => { let scheduler: TestScheduler;

beforeEach(() => { scheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); });

it('should debounce', () => { scheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '-----c|';

  const result$ = source$.pipe(debounceTime(20));
  expectObservable(result$).toBe(expected);
});

}); });

When to Use This Skill

Use angular-rxjs-patterns when building modern, production-ready applications that require:

  • Complex async data flows

  • Real-time updates and streaming data

  • Efficient HTTP request management

  • Form input handling with debouncing

  • State management with observables

  • Error handling and retry logic

  • Combining multiple async sources

  • Memory-safe subscription management

RxJS Best Practices in Angular

  • Use async pipe - Automatic subscription management

  • takeUntil for cleanup - Unsubscribe in ngOnDestroy

  • shareReplay for caching - Avoid duplicate HTTP requests

  • debounceTime for inputs - Reduce API calls

  • switchMap for cancellation - Cancel old requests

  • catchError for errors - Always handle errors

  • BehaviorSubject for state - Share current state

  • Avoid nested subscriptions - Use operators instead

  • Use operators over imperative code - More declarative

  • Test observables properly - Use marble diagrams

Common RxJS Mistakes

  • Not unsubscribing - Memory leaks

  • Nested subscriptions - Callback hell

  • Not using operators - Imperative instead of declarative

  • Subscribing in services - Return observables instead

  • Not handling errors - Silent failures

  • Using Subject incorrectly - Prefer BehaviorSubject for state

  • Not using shareReplay - Duplicate HTTP requests

  • Forgetting to complete subjects - Memory leaks

  • Using subscribe in templates - Use async pipe

  • Not understanding hot vs cold - Unexpected behavior

Resources

  • RxJS Official Documentation

  • RxJS Operators

  • Angular HttpClient

  • Reactive Programming with RxJS

  • RxMarbles - Visual Operator Reference

  • Learn RxJS

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

android-jetpack-compose

No summary provided by upstream source.

Repository SourceNeeds Review
General

fastapi-async-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

storybook-story-writing

No summary provided by upstream source.

Repository SourceNeeds Review