了解 Reactor 模式,就要先从事件驱动的开发方式说起。

我们知道,服务器开发,CPU 的处理速度远高于 IO 速度,为了避免 CPU 因为 IO 为阻塞,好一点的方法是多进程或线程处理,但这会带来一些进程切换的开销。

这时先驱者找到了事件驱动,或者叫回调的方法。这种方式就是,应用向一个中间人注册一个回调(Event handler),当 IO 就绪后,这个中间人产生一个时间,并通知此 handler 进行处理。这种回调的方式,也闲了"好莱坞原则" - "Don't call us, we'll call you."

那在 IO 就绪这个事件后,谁来充当这个中间人?Reactor 模式的答案是:有一个不断等待和循环的单独进程(线程)来做这件事,它接受所有 handler 的注册,并负责先操作系统个查询 IO 是否就绪,在就绪后用指定的 handler 进行处理,这个角色的名称就叫做 Reactor。

Reactor 与 NIO

1392fb9dbbae89860b02899f694492ca.png

NIO 中 Reactor 的核心是 selector,一个简单的 Reactor 示例,一个核心的 Reactor 的循环,这种循环结构又叫做 EventLoop。

8846270ea97acf38e1bb5301a3621c51.png

结合 NIO 服务端创建时序图 & 实际代码进行解说:public class Reactor implements Runnable {

public final Selector selector;

public final ServerSocketChannel server;

/**

* 创建了 ServerSocketChannel 对象,并调用 configureBlocking() 方法,配置为非阻塞模式

* 把通道绑定到制定端口,向 Selector 注册事件,并指定参数 OP_ACCEPT,即监听 accept 事件

*/

public Reactor(int port) throws IOException {

// 创建Selector对象

selector = Selector.open();

// 创建可选择通道,并配置为非阻塞模式

server = ServerSocketChannel.open();

server.configureBlocking(false);

// 绑定通道到指定端口

ServerSocket socket = server.socket();

InetSocketAddress address = new InetSocketAddress(port);

socket.bind(address);

/**

* 为了将Channel和Selector配合使用,必须将channel注册到selector上。

* 通过SelectableChannel.register()方法来实现

*/

// 向 Selector 注册该 channel

SelectionKey selectionKey = server.register(selector, Selection.OP_ACCEPT);

/**

* selectionKey.attach(theObject); 可以将一个对象或更多信息附着到 SelectionKey上,

* Object attachedObj = selectionKey.attachment();  可以从 SelectionKey 获取附着的信息。

*/

// 利用 selectionKey 的 attach 功能绑定 Acceptor,如果有事件,触发 Acceptor

selectionKey.attach(new Acceptor(this));

}

/**

* Selector 开始监听 ,进入内部循环。在非阻塞 IO 中,内部循环模式都是遵循这种方式。

* 首先调用 select() 方法,该方法会阻塞,直到至少有一个事件发生,

* 然后使用 selectedKeys() 方法获取发生事件的 SelectionKey,然后使用迭代器进行循环

*/

@Override

public void run() {

try {

while (!Thread.interrupted()) {

// 该调用会阻塞,直到至少有一个事件发生

selector.select();

Set selected = selector.selectedKeys();

Iterator it = selected.iterator();

while (it.hasNext()) {

SelectionKey key = (SelectionKey) it.next();

dispatch(key);

}

selected.clear();

}

} catch (IOException ex) {

/* ... */

}

}

/**

* 运行 Acceptor

*/

void dispatch(SelectionKey key) {

Acceptor acceptor = (Acceptor) key.attachment();

Runnable r = (Runnable)(acceptor );

if (r != null) {

r.run();

}

}

}public class Acceptor implements Runnable {

private Reactor reactor;

public Acceptor(Reactor reactor) {

this.reactor=reactor;

}

/**

* 接收请求

*/

@Override

public void run() {

try {

ServerSocketChannel server = reactor.server;

SocketChannel channel = server.accept();

if(channel != null) {

// 调用 Handler 来处理 channel

new SocketReadHandler(reactor.selector, channel);

}

} catch (IOException e) {

/* ... */

}

}

}public class SocketReadHandler implements Runnable {

private Selector selector;

private SocketChannel channel;

public SocketReadHandler(Selector selector, SocketChannel channel) throws IOException {

this.selector = selector;

this.channel = channel;

channel.configureBlocking(false);

/**

* 将新接入的客户端连接注册到 Reactor 线程的多路复用器上

* 监听读操作位,用来读取客户端发送的网络消息

*/

SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ);

// 将 SelectionKey 绑定为本 Handler 有事件触发时,将调用本类的 run 方法。

selectionKey.attach(this);

}

/**

* 处理读取客户端发来的信息的事件

*/

@Override

public void run() {

// 创建读取的缓冲区

ByteBuffer buffer = ByteBuffer.allocate(1024);

try {

int count = channel.read(buffer);

if (count > 0) {

buffer.flip():

CharBuffer charBuffer = decoder.decode(buffer);

String msg = charBuffer.toString();

// ...

SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_WRITE);

selectionKey.attach(name);

}

} catch (IOException e) {

/* ... */

}

buffer.clear();

}

}

从一个通道里读数据,直到所有的数据都读到缓冲区里。

a21a136707689502be24d12f44accea8.png

Reactor 与 Netty

Reactor 模式有多个变种,Netty 基于 Multiple Reactors 模式做了一定的修改,Mutilple Reactors 模式有多个 reactor:mainReactor 和 subReactor,其中 mainReactor 只有一个,负责响应 client 的连接请求,并建立连接,它使用 NIO Selector;subReactor 可以有一个或多个,每个 subReactor 都会在一个独立线程中执行,并且维护一个独立的 NIO Selector。

这是因为 subReactor 会执行一个比较耗时的 IO 操作,例如消息的读写,使用个多个线程去执行,则更加有利于发挥 CPU 的运算能力,减少 IO 等待时间。

5a38af4b1a371545ea4a0587f8315c04.png

Netty 的线程模型基于 Multiple Reactors 模式,借用了 mainReactor 和 subReactor 结构,从代码来看,它并没有 Thread Pool。Netty 的 subReactor 与 worker thread 是用一个线程,采用 IO 多路复用机制,可以使一个 subReactor 监听并处理多个 channel 的 IO 请求。

b2c6d7e8c60142cb2b6d9c81f3178125.png

其中 parentGroup 和 childGroup 是 Bootstrap 构建方法中传入的两个对象,这两个 group 均是线程池,childGroup 线程池会被各个 subReactor 充分利用,parentGroup 线程池则只是在 bind 某个端口后,获得其中一个线程作为 mainReactor。

Netty 里对应 mainReactor 的角色叫做 "Boss",而对应 subReactor 的角色叫做 "Worker"。Boss 负责分配请求,Worker 负责执行。在 Netty 4.0 之后,NioEventLoop 是 Netty NIO 部分的核心。

Reactor 与 Kafka/**

* An NIO socket server. The threading model is

* 1 Acceptor thread that handles new connections

* Acceptor has N Processor threads that each have their own selector and read requests from sockets

* M Handler threads that handle requests and produce responses back to the processor threads for writing.

*/

class SocketServer(val host: String,

val port: Int,

val processorBeginIndex: Int,

val numProcessorThreads: Int,

val totalProcessorThreads: Int,

val time: Time,

val metrics: Metrics) extends Logging {

private val processors = new Array[Processor](totalProcessorThreads)

/**

* Start the socket server

*/

def startup() {

this.synchronized {

new Acceptor(host, port, processorBeginIndex, numProcessorThreads, processors, time, metrics)

}

}

}

/**

* Thread that accepts and configures new connections. There is only need for one of these

*/

private class Acceptor(val host: String,

private val port: Int,

val processorBeginIndex: Int,

numProcessorThreads: Int,

processors: Array[Processor],

val time: Time,

val metrics: Metrics) extends Runnable {

val nioSelector = java.nio.channels.Selector.open()

val serverChannel = openServerSocket(host, port)

val processorEndIndex = processorBeginIndex + numProcessorThreads

this.synchronized {

for (i 

processors(i) = new Processor(time, metrics)

}

}

/*

* Create a server socket to listen for connections on.

*/

def openServerSocket(host: String, port: Int): ServerSocketChannel = {

val serverChannel = ServerSocketChannel.open()

serverChannel.configureBlocking(false)

val socketAddress =

if (host == null || host.trim.isEmpty)

new InetSocketAddress(port)

else

new InetSocketAddress(host, port)

try {

serverChannel.socket.bind(socketAddress)

} catch {

case e: SocketException =>

throw new Exception("Socket server failed to bind.")

}

serverChannel

}

/**

* Accept loop that checks for new connection attempts

*/

def run() {

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

val currentProcessor = processorBeginIndex

val ready = nioSelector.select()

if (ready > 0) {

val keys = nioSelector.selectedKeys()

val iterator = keys.iterator()

while (iterator.hasNext) {

var key: SelectionKey = null

try {

key = iterator.next()

iterator.remove()

if (key.isAcceptable)

accept(key, processors(currentProcessor))

else

throw new IllegalStateException("Unrecognized key state for acceptor thread.")

} catch {

case e: Throwable => error("Error while accepting connection")

}

}

}

}

/*

* Accept a new connection

*/

def accept(key: SelectionKey, processor: Processor): Unit = {

val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]

val socketChannel = serverSocketChannel.accept()

socketChannel.configureBlocking(false)

socketChannel.socket().setTcpNoDelay(true)

socketChannel.socket().setKeepAlive(true)

processor.accept(socketChannel)

}

}

/**

* Thread that processes all requests from a single connection. There are N of these running in parallel

* each of which has its own selectors

*/

private class Processor(val time: Time,

val metrics: Metrics) extends Runnable {

private val metricTags = new util.HashMap[String, String]()

private val selector = new org.apache.kafka.common.network.Selector(

metrics,

time,

"socket-server",

metricTags)

def run() {

while (!Thread.interrupted()) {

try {

selector.poll(300)

} catch {

case e@(_: IllegalStateException | _: IOException) => {

throw e

}

}

}

}

/**

* Queue up a new connection for reading

*/

def accept(socketChannel: SocketChannel) {

selector.wakeup()

}

}

转载请并标注: “本文转载自 linkedkeeper.com ”  ©著作权归作者所有

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐