在SpringBoot2下SSE实现是返回一个SseEmitter,然后通过SseEmitter的send方法来发送事件.
在SpringBoot3的WebFlux 下SSE实现是返回一个Flux<ServerSentEvent<?>>,但是怎么手动向客户端发送SSE事件搜遍全网也没有看到一个讲清楚的.网上的例子一般都是这样的:
@GetMapping("/stream-sse")public Flux<ServerSentEvent<String>> streamEvents() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> {ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder().id(String.valueOf(sequence)).event("periodic-event").data("SSE - " + LocalTime.now().toString()).build();log.info("stream-sse: " + serverSentEvent);return serverSentEvent;}).doOnCancel(() -> log.warn("stream-sse canceled")).doOnError(e -> log.error("stream-sse error", e));}
经过半天的摸索,终于找到解决方案,原来是通过Sinks.Many<ServerSentEvent<?>>这个类的tryEmitNext方法来手动发送事件!
下面是代码例子:
// 使用 Sinks.Many<ServerSentEvent<String>> 对应非反应式的SseEmitter@GetMapping("/stream-sse-sink")public Flux<ServerSentEvent<String>> streamSseMvc() {Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureError();Flux<ServerSentEvent<String>> flux = sink.asFlux();Scheduler single = Schedulers.boundedElastic();single.schedule(() -> {try {for (int i = 0; i < 50; i++) {ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder().id(String.valueOf(i)).event("periodic-event").data("SSE - " + LocalTime.now().toString()).build();log.info("stream-sse-sink: " + serverSentEvent);if(sink.tryEmitNext(serverSentEvent).isFailure()) {log.error("sink.tryEmitNext isFailure");break;}Thread.sleep(1000);}} catch (Exception ex) {sink.tryEmitError(ex);} finally {sink.tryEmitComplete();}},3,TimeUnit.SECONDS);return flux;}