如何通过reactor实现流式响应接口
本文介绍了Spring框架中使用Reactor库实现响应式编程的几种方式。通过示例代码演示了如何返回Mono单条数据、处理中文乱码问题,以及实现异步响应。重点讲解了Flux流式返回多条消息的方法,包括使用Flux.create()和Sinks.Many两种实现方式,并比较了它们的特性差异。文章还详细说明了异步任务封装、消息发送控制、错误处理以及背压策略等关键技术点,为开发响应式应用提供了实用参考。
最近在学习 Spring-AI 框架,发现其流式响应接口使用的是 reactor 的 Flux,于是准备深入学习一番
简单样例
@GetMapping("/mono")
public Mono<String> mono() {
return Mono.just("hello mono");
}
中文乱码
在上面的样例中 mono 接口返回了一个字符串 “hello mono”,但是如果返回中文字符则会乱码,需要添加响应头,设置字符集
@GetMapping("/mono-zh")
public Mono<String> monoChinese(HttpServletResponse response) {
response.setCharacterEncoding("UTF-8");
return Mono.just("你好 mono");
}
异步返回
使用 reactor 的场景基本都是耗时较长的场景,需要异步返回
@GetMapping("/mono-async")
public Mono<String> monoAsync(HttpServletResponse response) {
response.setCharacterEncoding("UTF-8");
Mono<String> mono = Mono.fromFuture(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "你好 mono";
})
);
return mono;
}
在上述代码中,启动异步任务后并不会等待异步任务执行完毕,fromFuture 方法会将执行中的异步任务包装为 Mono 对象并立即返回
多条返回
Mono 对象只能返回一条消息,在使用大模型时往往需要返回多条消息,此时需要切换为 Flux
@GetMapping("/flux-async")
public Flux<String> fluxAsync(HttpServletResponse response) {
response.setCharacterEncoding("UTF-8");
return Flux.create(sink -> {
// 订阅时异步执行
CompletableFuture.runAsync(() -> {
try {
// 模拟逐步发送多条消息
Thread.sleep(3000);
sink.next("第一条消息");
Thread.sleep(1000);
sink.next("第二条消息");
Thread.sleep(1000);
sink.next("第三条消息");
sink.complete();
} catch (Exception e) {
sink.error(e);
}
});
});
}
注意:多条返回是指多次返回不同消息,并不能像ai工具一样打字式的逐字显示,但可以通过每次仅返回一个字符来实现类似的效果
Sinks
也可以使用 reactor 的 Sinks 实现类似的功能
@GetMapping("/sink")
public Flux<String> sink(HttpServletResponse response) {
response.setCharacterEncoding("UTF-8");
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
CompletableFuture.runAsync(() -> {
try {
// 模拟逐步发送多条消息
for (int i = 0; i < 10; i++) {
Thread.sleep(6000);
sink.tryEmitNext("你好 sink " + i);
}
} catch (Exception e) {
sink.tryEmitError(e);
}
});
return sink.asFlux();
}
Sinks 提供了 many 和 one 两个方法,分别支持发送多条消息和一条消息,可以通过 asFlux 和 asMono 方法转换为 Flux 和 Mono 对象。
同时提供 unicast 和 multicast 来实现单播和广播
最后 onBackpressureBuffer 是背压策略,表示当生产速度快于消费速度时,会将数据缓存起来以避免丢失
返回的 Sinks.Many 对象,支持下列方法
- tryEmitNext:发送一条消息
- emitNext:发送一条消息,发送失败会抛出异常
- tryEmitError:发送错误消息
- emitError:发送错误消息,发送失败会抛出异常
- tryEmitComplete:发送完成信号
- emitComplete:发送完成信号,发送失败会抛出异常
- asFlux:转换为 Flux 对象
更多推荐
所有评论(0)