Netty概述

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
Netty的地位
在这里插入图片描述
Netty的优势

在这里插入图片描述
在这里插入图片描述
服务器端
在这里插入图片描述

public class HelloServer {
    public static void main(String[] args) {
        // 服务端初始化,是一个启动器,将下面的netty组件组合在一起进行初始化启动
        new ServerBootstrap()
                // 添加eventLoop(循环处理事件)的组集合的组件,eventLoop就是nio中的boss以及worker( selector + thread)
                .group(new NioEventLoopGroup())
                // 选择一个ServerSocketChannel的实现,netty支持多种channel,如NIO,BIO,epoll
                .channel(NioServerSocketChannel.class)
                // boss负责处理客户端链接,worker(child)负责处理读写等事件,决定了将来child将来执行那些操作(handler)
                .childHandler(
                        // channel 代表客户端和服务端进行读写的通道,Initialize负责初始化,主要负责添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                                // 添加具体的handler
                                nioSocketChannel.pipeline().addLast(new StringDecoder()); // 主要是客户端发送过来的数据进行解码handler,将ByteBuf
                                // 添加自定义的handler,处理的内容是上一步的handler处理之后的内容
                                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                    // 重写的是读事件
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println("自定义的通道数据处理器,内容:" + msg);
                                    }
                                });
                            }
                        })
                // 服务器启动之后绑定监听的端口
                .bind(8080);
    }
}

客户端代码

public class HelloClient {
        // 客户端初始化-启动器,将下面的netty组间组合进行初始化
        new Bootstrap()
                // 进行eventLoop选择器组间集合赋值,此处也可用之前学到的nio模式,不选择也行
                .group(new NioEventLoopGroup())
                // 选择客户端socketChannel连接的实现
                .channel(NioSocketChannel.class)
                // 决定了将来连接通道读写事件--worker(child)需要进行的操作
                .handler(
                        // channel 代表客户端和服务端质检读写操作,initializer主要负责初始化,添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                                // 添加通道读写事件处理的handler,此处是将字符串-->byteBuf,进行编码
                                nioSocketChannel.pipeline().addLast(new StringEncoder());
                            }
                        })
                // 与服务端建立连接的 ip + port
                .connect(new InetSocketAddress("localhost", 8089))
                // 客户大多数操作都是异步的,连接也是,所以需要使用sync进行等待连接的正常建立
                .sync()
                // 获取到连接通道,可进行数据的读写
                .channel()
                // 向服务端发送消息,并清空缓冲区,如果只适用write,服务端收不到缓冲区中的消息,需要flush
                .writeAndFlush("hello world! netty");
    }
}

具体流程
在这里插入图片描述

组件

EventLoop组件

事件循环对象
在这里插入图片描述
处理普通与定时任务
案例

public class TestEventLoop {
    public static void main(String[] args) {
        // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        //NioEventLoopGroup能够处理io事件、普通与定时任务,DefaultEventLoopGroup能够处理普通与定时任务
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过next方法可以获得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通过EventLoop执行普通任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通过EventLoop执行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地关闭
        group.shutdownGracefully();
    }
}

输出结果如下

io.netty.channel.nio.NioEventLoop@20322d26
io.netty.channel.nio.NioEventLoop@192b07fd
io.netty.channel.nio.NioEventLoop@20322d26
ordinary task:nioEventLoopGroup-2-1
schedule task:nioEventLoopGroup-2-2--time:1668833656422
schedule task:nioEventLoopGroup-2-2--time:1668833658428
schedule task:nioEventLoopGroup-2-2--time:1668833660429

关闭 EventLoopGroup
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
执行IO任务
服务器代码

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));

                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        // 此处打断点调试,调用 channel.writeAndFlush(...);
        System.in.read();
    }
}

多个客户端分别发送 hello 结果

nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

在这里插入图片描述
分工
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            	// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            
				...
    }
}

增加自定义EventLoopGroup
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理

public class MyServer {
    public static void main(String[] args) {
        // 增加自定义的非NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 调用下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 该handler绑定自定义的Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

在这里插入图片描述
切换的实现不同的EventLoopGroup切换的实现原理如下
由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获得下一个EventLoop, excutor 即为 EventLoopGroup
    EventExecutor executor = next.executor();
    
    // 如果下一个EventLoop 在当前的 EventLoopGroup中
    if (executor.inEventLoop()) {
        // 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
        next.invokeChannelRead(m);
    } else {
        // 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用

Channel组件

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    sync 方法作用是同步等待 Channel 关闭,调用线程等待
    而 addListener 方法是异步等待 Channel 关闭,非调用线程
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去,或者缓冲区数据达到可发送界限
  • writeAndFlush() 方法将数据写入并立即发送(刷出),相当于write() + flush()
    ChannelFuture
    连接建立异步问题
public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
            	// NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 该方法用于等待连接真正建立
        channelFuture.sync();
        
        // 获取客户端-服务器之间的Channel对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();
    }
}

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程(调用线程),等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,还未真正与服务器建立好连接,也就没法将信息正确的传输给服务器端
解决方法:
方法一:所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
方法二:用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO(eventLoop) 线程(去执行connect操作的线程)
addListener方法通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
                // NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
		// 当connect方法执行完毕后,也就是连接真正建立后
        // 会在NIO线程中调用operationComplete方法
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
    }
}

处理关闭

public class ReadClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建EventLoopGroup,使用完毕后关闭
        NioEventLoopGroup group = new NioEventLoopGroup();
        
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();

        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);

        // 创建一个线程用于输入并向服务器发送
        new Thread(()->{
            while (true) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 关闭操作是异步的,在NIO线程中执行
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();

        // 获得closeFuture对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");
        
        // 同步等待NIO线程执行完close操作
        closeFuture.sync();
        
        // 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
        System.out.println("关闭之后执行一些额外操作...");
        
        // 优雅关闭EventLoopGroup,现将eventLoopGroup状态切换为关闭,然后等待正在执行的任务执行完(所有任务都正常执行结束),在进行关闭
        group.shutdownGracefully();
    }
}

关闭channel
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
1.通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作

// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待NIO线程执行完close操作
closeFuture.sync();

2.调用closeFuture.addListener方法,添加close的后续操作

closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 等待channel关闭后才执行的操作
        System.out.println("关闭之后执行一些额外操作...");
        // 优雅的关闭EventLoopGroup
        group.shutdownGracefully();
    }
});

Future与Promise
在这里插入图片描述
在这里插入图片描述
JDK Future

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 获得Future对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });
        // 通过阻塞的方式,获得运行结果
        System.out.println(future.get());
    }
}

在这里插入图片描述

Netty Future

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 获得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });

        // 主线程中获取结果
        System.out.println(Thread.currentThread().getName() + " 获取结果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());

        // NIO线程中异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取结果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

运行结果

main 获取结果
getNow null
get 50
nioEventLoopGroup-2-1 获取结果
getNow 50

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
    Netty Promise
    Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建EventLoop
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
        EventLoop next = eventExecutors.next();
        // 创建Promise(), 需要将eventLoop作为参数进行传递
        DefaultPromise<Integer> integerDefaultPromise = new DefaultPromise<>(next);

        new Thread(() -> {
            try {
                log.info("开始计算");
                TimeUnit.SECONDS.sleep(2);
                if (new Random().nextInt(2) == 0) {
                    log.info("开始计算,异常场景");
                    int i = 1 / 0;
                }
                integerDefaultPromise.setSuccess(100);
            } catch (InterruptedException e) {
                integerDefaultPromise.setFailure(e);
		//     integerDefaultPromise.cancel(false);
            }
        }).start();
        log.info("main thread get result");
        integerDefaultPromise.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.info("promise 的result is Fail or success:{}", integerDefaultPromise.isSuccess());
                if (!integerDefaultPromise.isSuccess()) {
                    log.error("error info", integerDefaultPromise.cause());
                }
            }
        });
        // 使用promise也可是使用,getNow()等方法,获取同步结果,以及添加使用addListener等方法
        // log.info("获取结果: {}", integerDefaultPromise.get());

    }
}

Handler与Pipeline
在这里插入图片描述
Pipeline

public class PipeLineServer {
        public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 获取到流水线对象
                        // 在pipeline中,默认存在head(头结点) --> tail(尾结点),handler是一个双向链表
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        // 一下handler在链表中的顺序应该为:head(头结点)--> inboundHandler1--> inboundHandler2--> inboundHandler3--> outboundHandler4--> outboundHandler5--> outboundHandler6--> tail(尾结点)
                        // 添加handler
                        pipeline.addLast("inboundHandler1",
                                // inbound主要是入站操作,一般是对数据的读写操作
                                // 入站时,handler是从head(头结点)向后调用的,按照添加的先后顺序
                                new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        log.info("inbound 1");
                                        // 调用super.channelRead();其实是调用了ctx.fireChannelRead()方法,两者调用效果一致
                                        // 传递给下一个handler处理之前,可进行本次handler处理的逻辑,下一个handler接收到的数据便是处理之后的数据
//                                        super.channelRead(ctx, msg);
                                        ByteBuf byteBuf = (ByteBuf) msg;
                                        super.channelRead(ctx, byteBuf.toString(Charset.defaultCharset()));
//                                ctx.fireChannelRead(msg);
                                    }
                                });
                        pipeline.addLast("inboundHandler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.info("inbound 2, msg={}", msg);
                                Person person = new Person(msg.toString());
                                // 如果不调用下面的channelRead()或者是ctx.fireChannelRead(),读操作的handler连接便会断开
                                super.channelRead(ctx, person);
                            }
                        });
                        pipeline.addLast("inboundHandler3", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 当前handler是inbound的最后一个handler, 则不需要继续使用调用channelRead()方法;
                                log.info("inbound 3, msg={}, msg instanceof={}", msg, msg.getClass());
//                                super.channelRead(ctx, msg);
                                // 调用write方法想客户端返回数据,触发写便会调用触发出站处理器,出站处理器是按照从 tail节点(尾结点)往前找的的规则,而不是当前入站handler结点往后找
                                // 因此,出站处理顺序应该是 outboundHandler6--> outboundHandler5--> outboundHandler4
                                // nioSocketChannel.writeAndFlush("receive request method1".getBytes(StandardCharsets.UTF_8)); // 该种写法不对
                                // 注意次数使用的是nioSocketChannel对象进行写数据
                                nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("receive request method2".getBytes(StandardCharsets.UTF_8)));
                            }
                        });
                        pipeline.addLast("outboundHandler4", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("outbound 1, msg={}", msg);
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info("outbound 1, analysis msg={}", byteBuf.toString(Charset.defaultCharset()));
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("outboundHandler5", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("outbound 2, msg={}", msg);
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info("outbound 2, analysis msg={}", byteBuf.toString(Charset.defaultCharset()));
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("outboundHandler6", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("outbound 3, msg={}", msg);
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info("outbound 3, analysis msg={}", byteBuf.toString(Charset.defaultCharset()));
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8089);
    }
    
    @Data
    @AllArgsConstructor
    static class Person {
        private String name;
    }
}

运行结果如下

netty.c3.TestPipeLineServer - inbound 1
netty.c3.TestPipeLineServer - inbound 2, msg=hello
netty.c3.TestPipeLineServer - inbound 3, msg=TestPipeLineServer.Person(name=hello), msg instanceof=class com.panape.netty.c3.TestPipeLineServer$Person
netty.c3.TestPipeLineServer - outbound 3, msg=PooledUnsafeDirectByteBuf(ridx: 0, widx: 23, cap: 256)
netty.c3.TestPipeLineServer - outbound 3, analysis msg=receive request method2
netty.c3.TestPipeLineServer - outbound 2, msg=PooledUnsafeDirectByteBuf(ridx: 0, widx: 23, cap: 256)
netty.c3.TestPipeLineServer - outbound 2, analysis msg=receive request method2
netty.c3.TestPipeLineServer - outbound 1, msg=PooledUnsafeDirectByteBuf(ridx: 0, widx: 23, cap: 256)
netty.c3.TestPipeLineServer - outbound 1, analysis msg=receive request method2

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
调用顺序如下
在这里插入图片描述
OutboundHandler
socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler
EmbeddedChannel
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        
        // 执行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 执行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

ByteBuf

可以动态扩容
调试工具方法

    /**
     * 日志打印方法
     *
     * @param buffer 字节缓冲对象
     */
    private static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                // io.netty.util.internal.StringUtil 里面静态变量 NEWLINE
                .append(NEWLINE);
        // io.netty.buffer.ByteBufUtil中的 静态方法
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }

该方法可以帮助我们更为详细地查看ByteBuf中的内容
创建

public class ByteBufStudy {

    public static void main(String[] args) {
        // 创建byteBuf(直接内存,而不是堆内存),默认大小是256,通过该方式创建的ByteBuf对象会自动扩容,与nio的ByteBuffer不一样,nio达到融期限之后,会报错
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(32);

        // 使用ByteBufAllocator创建直接内容,直接内容不需要考虑GC回收,默认获取的 byteBuf 便是直接内存,需要主动释放关闭
        // ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
        // 使用ByteBufAllocator创建堆内存
        // ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();

        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
        // 打印byteBuf的读写指针,容量大小等信息
        log(byteBuf);
        debugAll(byteBuffer);

        // 网byteBuf里面写入内容
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < 32; i++) {
            stringBuilder.append(i).append("-");
        }

        // 往ByteBuf对象中写入数据,当达到容量界限之后,会自动扩容
        byteBuf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
        log(byteBuf);

        // 往nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常
        byteBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
        debugAll(byteBuffer);
    }
}

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作
nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
直接内存与堆内存
通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);

在这里插入图片描述
验证

public class ByteBufStudy {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
        System.out.println(buffer.getClass());
    }
}
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
// 使用池化的堆内存    
class io.netty.buffer.PooledUnsafeHeapByteBuf
// 使用池化的直接内存    
class io.netty.buffer.PooledUnsafeDirectByteBuf

池化与非池化
在这里插入图片描述
组成
在这里插入图片描述
最大容量与当前容量
在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
当ByteBuf容量无法容纳所有数据时,会进行扩容操作,达到一个按需索取,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
ByteBuf分别由读指针和写指针两个指针控制
进行读写操作时,无需进行模式的切换,初始状态,读写指针都是在0位置

  • 读指针前的部分被称为废弃部分,是已经读过的内容
  • 读指针与写指针之间的空间称为可读部分
  • 写指针与当前容量之间的空间称为可写部分
    在这里插入图片描述
    写入
    在这里插入图片描述
    在这里插入图片描述

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        log(buffer);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        log(buffer);

        buffer.writeInt(5);
        log(buffer);

        buffer.writeIntLE(6);
        log(buffer);

        buffer.writeLong(7);
        log(buffer);
    }
}
read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
扩容
在这里插入图片描述
读取
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        log(buffer);

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        log(buffer);
    }
}
1
2
3
4
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置
释放
在这里插入图片描述
释放规则
因为 pipeline 的存在,存在多个handler,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release

切片
在这里插入图片描述

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        log(slice1);
    }
}

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+

在这里插入图片描述

组合
使用CompositeByteBuf可将小的byteBuf组合成大的bytebuf,且不发生数据的复制拷贝

public class TestCompositeByteBuf {
    public static void main(String[] args) {
        ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer();
        byteBuf1.writeBytes(new byte[]{1, 2, 3, 4, 5});

        ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.buffer();
        byteBuf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        // 此种写法将小的byteBuf合成打的byteBuf,但是会发生内容数据的拷贝,以下发生了两次复制
        ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.buffer();
        byteBuf3.writeBytes(byteBuf1).writeBytes(byteBuf2);
        log(byteBuf3);

        // 优点,不会发生数据的拷贝,缺点:增加了维护的负复杂性
        CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
        compositeByteBuf.addComponents(true, byteBuf1, byteBuf2);
        log(compositeByteBuf);
    }
}

unpolled
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

双向通信

编写 server

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 建议使用 ctx.alloc() 创建 ByteBuf
                    ByteBuf response = ctx.alloc().buffer();
                    response.writeBytes(buffer);
                    ctx.writeAndFlush(response);

                    // 思考:需要释放 buffer 吗
                    // 思考:需要释放 response 吗
                }
            });
        }
    }).bind(8080);

在这里插入图片描述

编写 client

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 思考:需要释放 buffer 吗
                }
            });
        }
    }).connect("127.0.0.1", 8080).sync().channel();

channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();

读写双向通信的误解
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
服务器端

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

黑马笔记

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐