一、简介

RSocket 是一种二进制协议,可用于 TCP、WebSockets 和 Aeron 等字节流传输的应用协议,具有以下交互模型:

1、Request-Response: 发送一条信息,接收一条信息。

2、Request-Stream: 发送一条消息并接收返回的消息流。

3、Channel: 双向发送消息流。

4、Fire-and-Forget: 发送单向消息。
二、服务端代码

1、安装依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2、配置文件添加如下:

spring:
  rsocket:
    server:
      port: 9898
      transport: tcp

3、服务端测试代码

package com.example.rsocketservice.controller;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Random;

@RestController
public class SendController {

    //Request-Response模式
    @MessageMapping("message")
    public Mono<String> handleMessage(Mono<String> message) {
        return message.doOnNext(msg -> {
            System.out.printf("接收到消息:%s%n", msg) ;
        }).map(msg -> "服务器成功收到了你的消息!!!") ;
    }

    //Request-Stream模式
    // 必须返回Flux
    @MessageMapping("stream")
    public Flux<String> handleStream() {
        return Flux
                .interval(Duration.ofSeconds(2))
                // 随机生成
                .map(i -> String.valueOf(new Random().nextInt(10000000)))
                // 只在此通道中获取10个值
                .take(10)
                .doOnComplete(() -> {
                    System.out.println("completed...") ;
                }) ;
    }

    //Channel模式
    @MessageMapping("channel")
    public Flux<String> handleChannel(Flux<String> datas) {
        return datas.doOnNext(ret -> {
            System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
        }).map(ret -> {
            return ret + " - " + new Random().nextInt(1000) ;
        }) ;
    }

    //Fire-and-Forget模式
    @MessageMapping("faf")
    public Mono<Void> handleFireAndForget(Mono<String> data) {
        return data.doOnNext(ret -> {
            System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
        }).then() ;
    }
}

三、客户端测试代码

1、安装依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

2、新建配置类ClientConfiguration

package com.example.rsocketclient.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;


@Configuration
public class ClientConfiguration {
    @Bean
    RSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {
        RSocketStrategies strategies = RSocketStrategies.builder()
//                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
//                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester requester = RSocketRequester.builder()
                .rsocketStrategies(strategies)
                .tcp("localhost", 9898);

        return requester;
    }
}

3、测试代码

package com.example.rsocketclient.controller;

import jakarta.annotation.Resource;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Random;

@RestController
public class TestController {
    @Resource
    private RSocketRequester rsocketRequester;


    //Request-Response模式
    @GetMapping("/message/{body}")
    // Request-Response 发送一条信息,接收一条信息。
    public void sendMessage(@PathVariable("body") String body) {
        this.rsocketRequester
                .route("message")
                .data(body)
                .retrieveMono(String.class)
                .subscribe(System.out::println) ;
    }

    //Request-Stream模式
    @GetMapping("stream")
    public void sendStream() {
        this.rsocketRequester
                .route("stream")
                .retrieveFlux(String.class)
                .subscribe(ret -> {
                    System.out.printf("%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
                }) ;
    }

    @GetMapping("channel")
    // Channel 双向发送消息流。
    public void sendChannel() {
        this.rsocketRequester
                .route("channel")
                .data(Flux.just("1", "2", "3", "4", "5", "6").delayElements(Duration.ofSeconds(1)))
                .retrieveFlux(String.class)
                .subscribe(ret -> {
                    System.out.printf("【client】%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
                }) ;
    }

    @GetMapping("sendFireAndForget")
    // Fire-and-Forget 发送单向消息。
    public void sendFireAndForget() {
        this.rsocketRequester
                .route("faf")
                .data(Mono.just(String.valueOf(new Random().nextInt(1000))))
                .send()
                .subscribe() ;
    }
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐