spring-reactive

Build reactive applications - WebFlux, Mono/Flux, R2DBC, backpressure, reactive streams

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "spring-reactive" with this command: npx skills add pluginagentmarketplace/custom-plugin-spring-boot/pluginagentmarketplace-custom-plugin-spring-boot-spring-reactive

Spring Reactive Skill

Master reactive programming with Spring WebFlux, Project Reactor, R2DBC, and reactive streams patterns.

Overview

This skill covers building non-blocking, reactive applications with Spring WebFlux and Project Reactor.

Parameters

NameTypeRequiredDefaultValidation
reactive_dbenumr2dbc-postgresqlr2dbc-postgresql | r2dbc-mysql | mongodb
streamingenum-sse | websocket | rsocket
backpressureenumbufferbuffer | drop | latest

Topics Covered

Core (Must Know)

  • WebFlux: @RestController with Mono<T> and Flux<T>
  • Project Reactor: Core operators (map, flatMap, filter)
  • R2DBC: Reactive database access

Intermediate

  • Error Handling: onErrorResume, onErrorReturn
  • Backpressure: Handling fast producers
  • SSE: Server-Sent Events

Advanced

  • WebSocket: Reactive WebSocket handlers
  • RSocket: Bi-directional reactive streams
  • Context Propagation: MDC in reactive chains

Code Examples

Reactive Controller

@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {

    private final UserService userService;

    @GetMapping
    public Flux<UserResponse> findAll() {
        return userService.findAll().map(UserResponse::from);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserResponse>> findById(@PathVariable Long id) {
        return userService.findById(id)
            .map(UserResponse::from)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserResponse> create(@Valid @RequestBody Mono<CreateUserRequest> request) {
        return request.flatMap(userService::create).map(UserResponse::from);
    }
}

Reactive Repository (R2DBC)

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Mono<User> findByEmail(String email);

    Flux<User> findByActiveTrue();

    @Query("SELECT * FROM users WHERE created_at > :since")
    Flux<User> findRecentUsers(@Param("since") LocalDateTime since);
}

Reactive Service with Error Handling

@Service
@RequiredArgsConstructor
@Transactional
public class UserService {

    private final UserRepository userRepository;

    public Mono<User> create(CreateUserRequest request) {
        return userRepository.findByEmail(request.email())
            .flatMap(existing -> Mono.<User>error(new DuplicateEmailException()))
            .switchIfEmpty(Mono.defer(() -> {
                User user = new User(request.email(), request.name());
                return userRepository.save(user);
            }));
    }

    public Flux<User> findAll() {
        return userRepository.findAll()
            .timeout(Duration.ofSeconds(5))
            .onErrorResume(TimeoutException.class, e -> Flux.empty());
    }
}

Server-Sent Events

@RestController
@RequestMapping("/api/events")
public class EventController {

    private final Sinks.Many<Event> eventSink = Sinks.many()
        .multicast().onBackpressureBuffer();

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Event>> stream() {
        return eventSink.asFlux()
            .map(e -> ServerSentEvent.<Event>builder()
                .id(e.id())
                .event(e.type())
                .data(e)
                .build());
    }

    @PostMapping
    public Mono<Void> publish(@RequestBody Event event) {
        return Mono.fromRunnable(() -> eventSink.tryEmitNext(event));
    }
}

Operator Quick Reference

Transformation: map(), flatMap(), flatMapMany()
Filtering:      filter(), take(), skip(), distinct()
Combination:    merge(), concat(), zip()
Error:          onErrorResume(), onErrorReturn(), retry(), timeout()
Side Effects:   doOnNext(), doOnError(), doFinally(), log()

Troubleshooting

Failure Modes

IssueDiagnosisFix
Nothing happensNot subscribedReturn Mono/Flux from controller
Blocking errorBlocking in reactiveUse subscribeOn(Schedulers.boundedElastic())
Memory issuesUnbounded bufferAdd backpressure strategy

Debug Checklist

□ Verify Mono/Flux is returned (not subscribed manually)
□ Check for blocking calls (JDBC, Thread.sleep)
□ Review backpressure strategy
□ Enable Reactor debug: Hooks.onOperatorDebug()
□ Use .log() operator for debugging

Unit Test Template

@WebFluxTest(UserController.class)
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void shouldReturnUsers() {
        when(userService.findAll()).thenReturn(Flux.just(
            new User(1L, "john@test.com", "John")));

        webTestClient.get().uri("/api/users")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(UserResponse.class)
            .hasSize(1);
    }

    @Test
    void shouldReturn404WhenNotFound() {
        when(userService.findById(1L)).thenReturn(Mono.empty());

        webTestClient.get().uri("/api/users/1")
            .exchange()
            .expectStatus().isNotFound();
    }
}

Usage

Skill("spring-reactive")

Version History

VersionDateChanges
2.0.02024-12-30R2DBC, SSE, WebTestClient patterns
1.0.02024-01-01Initial release

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

java-spring-boot

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

java-testing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

react-native-animations

No summary provided by upstream source.

Repository SourceNeeds Review