|
您目前处于:Architecture
2015-09-01
|
系列文章:了解 Reactor 模式,就要先从事件驱动的开发方式说起。 我们知道,服务器开发,CPU 的处理速度远高于 IO 速度,为了避免 CPU 因为 IO 为阻塞,好一点的方法是多进程或线程处理,但这会带来一些进程切换的开销。 这时先驱者找到了事件驱动,或者叫回调的方法。这种方式就是,应用向一个中间人注册一个回调(Event handler),当 IO 就绪后,这个中间人产生一个时间,并通知此 handler 进行处理。这种回调的方式,也闲了"好莱坞原则" - "Don't call us, we'll call you." 那在 IO 就绪这个事件后,谁来充当这个中间人?Reactor 模式的答案是:有一个不断等待和循环的单独进程(线程)来做这件事,它接受所有 handler 的注册,并负责先操作系统个查询 IO 是否就绪,在就绪后用指定的 handler 进行处理,这个角色的名称就叫做 Reactor。 Reactor 与 NIO
NIO 中 Reactor 的核心是 selector,一个简单的 Reactor 示例,一个核心的 Reactor 的循环,这种循环结构又叫做 EventLoop。
结合 NIO 服务端创建时序图 & 实际代码进行解说: public class Reactor implements Runnable {
public final Selector selector;
public final ServerSocketChannel server;
/**
* 创建了 ServerSocketChannel 对象,并调用 configureBlocking() 方法,配置为非阻塞模式
* 把通道绑定到制定端口,向 Selector 注册事件,并指定参数 OP_ACCEPT,即监听 accept 事件
*/
public Reactor(int port) throws IOException {
// 创建Selector对象
selector = Selector.open();
// 创建可选择通道,并配置为非阻塞模式
server = ServerSocketChannel.open();
server.configureBlocking(false);
// 绑定通道到指定端口
ServerSocket socket = server.socket();
InetSocketAddress address = new InetSocketAddress(port);
socket.bind(address);
/**
* 为了将Channel和Selector配合使用,必须将channel注册到selector上。
* 通过SelectableChannel.register()方法来实现
*/
// 向 Selector 注册该 channel
SelectionKey selectionKey = server.register(selector, Selection.OP_ACCEPT);
/**
* selectionKey.attach(theObject); 可以将一个对象或更多信息附着到 SelectionKey上,
* Object attachedObj = selectionKey.attachment(); 可以从 SelectionKey 获取附着的信息。
*/
// 利用 selectionKey 的 attach 功能绑定 Acceptor,如果有事件,触发 Acceptor
selectionKey.attach(new Acceptor(this));
}
/**
* Selector 开始监听 ,进入内部循环。在非阻塞 IO 中,内部循环模式都是遵循这种方式。
* 首先调用 select() 方法,该方法会阻塞,直到至少有一个事件发生,
* 然后使用 selectedKeys() 方法获取发生事件的 SelectionKey,然后使用迭代器进行循环
*/
@Override
public void run() {
try {
while (!Thread.interrupted()) {
// 该调用会阻塞,直到至少有一个事件发生
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
dispatch(key);
}
selected.clear();
}
} catch (IOException ex) {
/* ... */
}
}
/**
* 运行 Acceptor
*/
void dispatch(SelectionKey key) {
Acceptor acceptor = (Acceptor) key.attachment();
Runnable r = (Runnable)(acceptor );
if (r != null) {
r.run();
}
}
}public class Acceptor implements Runnable {
private Reactor reactor;
public Acceptor(Reactor reactor) {
this.reactor=reactor;
}
/**
* 接收请求
*/
@Override
public void run() {
try {
ServerSocketChannel server = reactor.server;
SocketChannel channel = server.accept();
if(channel != null) {
// 调用 Handler 来处理 channel
new SocketReadHandler(reactor.selector, channel);
}
} catch (IOException e) {
/* ... */
}
}
}public class SocketReadHandler implements Runnable {
private Selector selector;
private SocketChannel channel;
public SocketReadHandler(Selector selector, SocketChannel channel) throws IOException {
this.selector = selector;
this.channel = channel;
channel.configureBlocking(false);
/**
* 将新接入的客户端连接注册到 Reactor 线程的多路复用器上
* 监听读操作位,用来读取客户端发送的网络消息
*/
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ);
// 将 SelectionKey 绑定为本 Handler 有事件触发时,将调用本类的 run 方法。
selectionKey.attach(this);
}
/**
* 处理读取客户端发来的信息的事件
*/
@Override
public void run() {
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
int count = channel.read(buffer);
if (count > 0) {
buffer.flip():
CharBuffer charBuffer = decoder.decode(buffer);
String msg = charBuffer.toString();
// ...
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_WRITE);
selectionKey.attach(name);
}
} catch (IOException e) {
/* ... */
}
buffer.clear();
}
}从一个通道里读数据,直到所有的数据都读到缓冲区里。
Reactor 与 Netty Reactor 模式有多个变种,Netty 基于 Multiple Reactors 模式做了一定的修改,Mutilple Reactors 模式有多个 reactor:mainReactor 和 subReactor,其中 mainReactor 只有一个,负责响应 client 的连接请求,并建立连接,它使用 NIO Selector;subReactor 可以有一个或多个,每个 subReactor 都会在一个独立线程中执行,并且维护一个独立的 NIO Selector。 这是因为 subReactor 会执行一个比较耗时的 IO 操作,例如消息的读写,使用个多个线程去执行,则更加有利于发挥 CPU 的运算能力,减少 IO 等待时间。
Netty 的线程模型基于 Multiple Reactors 模式,借用了 mainReactor 和 subReactor 结构,从代码来看,它并没有 Thread Pool。Netty 的 subReactor 与 worker thread 是用一个线程,采用 IO 多路复用机制,可以使一个 subReactor 监听并处理多个 channel 的 IO 请求。
其中 parentGroup 和 childGroup 是 Bootstrap 构建方法中传入的两个对象,这两个 group 均是线程池,childGroup 线程池会被各个 subReactor 充分利用,parentGroup 线程池则只是在 bind 某个端口后,获得其中一个线程作为 mainReactor。 Netty 里对应 mainReactor 的角色叫做 "Boss",而对应 subReactor 的角色叫做 "Worker"。Boss 负责分配请求,Worker 负责执行。在 Netty 4.0 之后,NioEventLoop 是 Netty NIO 部分的核心。 Reactor 与 Kafka /**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val host: String,
val port: Int,
val processorBeginIndex: Int,
val numProcessorThreads: Int,
val totalProcessorThreads: Int,
val time: Time,
val metrics: Metrics) extends Logging {
private val processors = new Array[Processor](totalProcessorThreads)
/**
* Start the socket server
*/
def startup() {
this.synchronized {
new Acceptor(host, port, processorBeginIndex, numProcessorThreads, processors, time, metrics)
}
}
}
/**
* Thread that accepts and configures new connections. There is only need for one of these
*/
private class Acceptor(val host: String,
private val port: Int,
val processorBeginIndex: Int,
numProcessorThreads: Int,
processors: Array[Processor],
val time: Time,
val metrics: Metrics) extends Runnable {
val nioSelector = java.nio.channels.Selector.open()
val serverChannel = openServerSocket(host, port)
val processorEndIndex = processorBeginIndex + numProcessorThreads
this.synchronized {
for (i <- processorBeginIndex until processorEndIndex) {
processors(i) = new Processor(time, metrics)
}
}
/*
* Create a server socket to listen for connections on.
*/
def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
val socketAddress =
if (host == null || host.trim.isEmpty)
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
try {
serverChannel.socket.bind(socketAddress)
} catch {
case e: SocketException =>
throw new Exception("Socket server failed to bind.")
}
serverChannel
}
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
val currentProcessor = processorBeginIndex
val ready = nioSelector.select()
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iterator = keys.iterator()
while (iterator.hasNext) {
var key: SelectionKey = null
try {
key = iterator.next()
iterator.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection")
}
}
}
}
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor): Unit = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
processor.accept(socketChannel)
}
}
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selectors
*/
private class Processor(val time: Time,
val metrics: Metrics) extends Runnable {
private val metricTags = new util.HashMap[String, String]()
private val selector = new org.apache.kafka.common.network.Selector(
metrics,
time,
"socket-server",
metricTags)
def run() {
while (!Thread.interrupted()) {
try {
selector.poll(300)
} catch {
case e@(_: IllegalStateException | _: IOException) => {
throw e
}
}
}
}
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
selector.wakeup()
}
}
转载请并标注: “本文转载自 linkedkeeper.com ” ©著作权归作者所有 |