最近在学习 Spring-AI 框架,发现其流式响应接口使用的是 reactor 的 Flux,于是准备深入学习一番

简单样例

    @GetMapping("/mono")
	public Mono<String> mono() {
		return Mono.just("hello mono");
	}

中文乱码

在上面的样例中 mono 接口返回了一个字符串 “hello mono”,但是如果返回中文字符则会乱码,需要添加响应头,设置字符集

    @GetMapping("/mono-zh")
	public Mono<String> monoChinese(HttpServletResponse response) {
		response.setCharacterEncoding("UTF-8");
		return Mono.just("你好 mono");
	}

异步返回

使用 reactor 的场景基本都是耗时较长的场景,需要异步返回

    @GetMapping("/mono-async")
	public Mono<String> monoAsync(HttpServletResponse response) {
		response.setCharacterEncoding("UTF-8");
		Mono<String> mono = Mono.fromFuture(
				CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "你好 mono";
				})
		);
		return mono;
	}

在上述代码中,启动异步任务后并不会等待异步任务执行完毕,fromFuture 方法会将执行中的异步任务包装为 Mono 对象并立即返回

多条返回

Mono 对象只能返回一条消息,在使用大模型时往往需要返回多条消息,此时需要切换为 Flux

    @GetMapping("/flux-async")
	public Flux<String> fluxAsync(HttpServletResponse response) {
		response.setCharacterEncoding("UTF-8");

		return Flux.create(sink -> {
			// 订阅时异步执行
			CompletableFuture.runAsync(() -> {
				try {
					// 模拟逐步发送多条消息
					Thread.sleep(3000);
					sink.next("第一条消息");
					Thread.sleep(1000);
					sink.next("第二条消息");
					Thread.sleep(1000);
					sink.next("第三条消息");
					sink.complete();
				} catch (Exception e) {
					sink.error(e);
				}
			});
		});
	}

注意:多条返回是指多次返回不同消息,并不能像ai工具一样打字式的逐字显示,但可以通过每次仅返回一个字符来实现类似的效果

Sinks

也可以使用 reactor 的 Sinks 实现类似的功能

    @GetMapping("/sink")
	public Flux<String> sink(HttpServletResponse response) {
		response.setCharacterEncoding("UTF-8");
		Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

		CompletableFuture.runAsync(() -> {
			try {
				// 模拟逐步发送多条消息
				for (int i = 0; i < 10; i++) {
					Thread.sleep(6000);
					sink.tryEmitNext("你好 sink " + i);
				}
			} catch (Exception e) {
				sink.tryEmitError(e);
			}
		});
		return sink.asFlux();
	}

Sinks 提供了 many 和 one 两个方法,分别支持发送多条消息和一条消息,可以通过 asFlux 和 asMono 方法转换为 Flux 和 Mono 对象。

同时提供 unicast 和 multicast 来实现单播和广播

最后 onBackpressureBuffer 是背压策略,表示当生产速度快于消费速度时,会将数据缓存起来以避免丢失

返回的 Sinks.Many 对象,支持下列方法

  • tryEmitNext:发送一条消息
  • emitNext:发送一条消息,发送失败会抛出异常
  • tryEmitError:发送错误消息
  • emitError:发送错误消息,发送失败会抛出异常
  • tryEmitComplete:发送完成信号
  • emitComplete:发送完成信号,发送失败会抛出异常
  • asFlux:转换为 Flux 对象
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐