|
您目前处于:Development
2017-09-06
|
系列文章:序 最近,我发现一些BlockOperationException异常出现在我的Netty4项目中,为什么会出现这个异常?有人说,在Netty的ServerBootstrap启动服务器的时候,使用sync()或await()方法会造成死锁,可我发现异常是出现在ChannelRead过程中,而且Bootstrap用的是bossGroup,而ChannelRead用的是workerGroup,两者使用的EventLoop应该是不用的,我认为是不会互相影响的,那究竟是什么原因产生思索异常呢? 我将这个问题发布到了StackOverflow进行提问(https://stackoverflow.com/questions/46020266/what-causes-blockingoperationexception-in-netty-4),非常幸运的得到了Norman Maurer(Netty的核心贡献者之一)的解答。
下面我将整个问题的分析思路整理出来,与大家分享。 正文 在使用Netty的ServerBootstrap启动服务器的时候,使用sync()方法会导致阻塞。 public void init() throws Exception {
logger.info("start tcp server ...");
Class clazz = NioServerSocketChannel.class;
// Server 服务启动
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup);
bootstrap.channel(clazz);
bootstrap.childHandler(new ServerChannelInitializer(serverConfig));
// 可选参数
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
// 绑定接口,同步等待成功
logger.info("start tcp server at port[" + port + "].");
ChannelFuture future = bootstrap.bind(port).sync();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.info("Server have success bind to " + port);
} else {
logger.error("Server fail bind to " + port);
throw new InitErrorException("Server start fail !", future.cause());
}
}
});
}在这一行 ChannelFuture future = bootstrap.bind(port).sync(); bootstrap.bind()返回一个ChannelFuture,查看源代码DefaultChannelGroupFuture,sync()方法会调用await(),在await()中对ChannelFuture对象进行了加锁。 public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed(); // 异步操作失败抛出异常
return this;
}sync()和await()很类似。 public Promise<V> await() throws InterruptedException {
// 异步操作已经完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 同步使修改waiters的线程只有一个
synchronized (this) {
// 未完成则一直循环
while (!isDone()) { // 等待直到异步操作完成
// 死锁检测
checkDeadLock();
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}注意其中的checkDeadLock()方法用来进行死锁检测: protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}e.inEventLoop()表示当前线程和executor的执行线程是同一个,即该线程上的一个任务等待该线程上的其他任务唤醒自己。我们知道线程的执行是线性,即前面的代码执行完毕才能执行后面的代码,因此这里产生了一个死锁。 在ChannelHandler方法中调用sync()或await()方法,会有可能引起死锁,而在实践中也偶发出现BlockingOperationException死锁异常: io.netty.util.concurrent.BlockingOperationException:AbstractChannel$CloseFuture(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:391) at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252) at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129) at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28) at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:219) at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117) at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28) 那么什么样的代码会产生这种情况呢,下面给出项目中出现死锁的代码: private void channelWrite(T message) {
boolean success = true;
boolean sent = true;
int timeout = 60;
try {
ChannelFuture cf = cxt.write(message);
cxt.flush();
if (sent) {
success = cf.await(timeout);
}
if (cf.isSuccess()) {
logger.debug("send success.");
}
Throwable cause = cf.cause();
if (cause != null) {
this.fireError(new PushException(cause));
}
} catch (LostConnectException e) {
this.fireError(new PushException(e));
} catch (Exception e) {
this.fireError(new PushException(e));
} catch (Throwable e) {
this.fireError(new PushException("Failed to send message“, e));
}
if (!success) {
this.fireError(new PushException("Failed to send message"));
}
}在这一行 ChannelFuture cf = cxt.write(message); cxt.flush(); write方法只是把发送数据放入一个缓存,而不会真实的发送,而flush则是将放入缓存的数据发送数据,如果不flush会发生什么情况呢?当前线程会进入wait(),而分发送数据的代码没有被执行,因为发送数据的方法也是在当前线程中执行,这样死锁就产生了。 实践中,使用了writeAndFlush方法,仍会小概率的出现死锁异常,这又是为何?同时存在几个疑惑和猜测:
Netty 的线程模型:
EventLoopGroup负责分配一个EventLoop到每个新创建的Channel。每个EventLoop处理绑定Channel的所有event和task。每个EventLoop和一个线程关联。同一个EventLoop可能会被分配到多个Channel。 在Netty4,所有的I/O操作和event都是由分配给EventLoop的那一个Thread来处理的。 让我们再来分析一下cxt.write(message)的源代码: private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
private static void safeExecute(EventExecutor executor, Runnable runnable,
ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}注意这行: EventExecutor executor = next.executor(); ,获取当前channel所绑定的eventLoop。如果当前调用线程就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。
EventLoop和EventExecutor什么关系? public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}所以,我们大致分析出,在执行write方法时,Netty会判断current thread是否就是分给该Channe的EventLoop,如果是则行线程执行IO操作,否则提交executor等待分配。当执行await方法时,会从executor里fetch出执行线程,这里就需要checkDeadLock,判断执行线程和current threads是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException。
那如何解决?官方建议优先使用addListener(GenericFutureListener),而非await()。 // BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// Perform post-closure operation
// ...
}
// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}项目代码改造为: private void pushMessage(T message) {
try {
ChannelFuture cf = cxt.writeAndFlush(message);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws PushException {
if (future.isSuccess()) {
logger.debug("send success.");
} else {
throw new PushException("Failed to send message.");
}
Throwable cause = future.cause();
if (cause != null) {
throw new PushException(cause);
}
}
});
} catch (LostConnectException e) {
this.fireError(new PushException(e));
} catch (Exception e) {
this.fireError(new PushException(e));
} catch (Throwable e) {
this.fireError(new PushException(e));
}
}Reference: https://stackoverflow.com/questions/44390660/how-netty-channelfuture-notify-does-not-cause-dead-lock https://stackoverflow.com/questions/29161129/netty-blockingoperationexception https://www.oschina.net/question/3524591_2244242 http://www.jianshu.com/p/a06da3256f0c http://www.jianshu.com/p/4835eb4e91ab http://blog.csdn.net/gaolong/article/details/12805581 http://blog.csdn.net/youaremoon/article/details/50279965 http://blog.csdn.net/youaremoon/article/details/50282353 http://www.linkedkeeper.com/detail/blog.action?bid=137 转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)” ©著作权归作者所有 |