Netty探险
源码
先看一个简单的服务器搭建的代码。
public class NettyMain {
public static void main(String[] args) throws InterruptedException {
// NioEventLoopGroup是处理I/O操作的多线程事件循环
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup sub = new NioEventLoopGroup(4);
try {
// ServerBootstrap是一个用于设置服务器的引导类。
ServerBootstrap b = new ServerBootstrap();
b.group(boss, sub)
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel类,用于实例化新的通道以接受传入连接
.localAddress(new InetSocketAddress(8888)) // 设置服务器监听端口号
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler()); // 添加请求处理
}
});
// 绑定到端口和启动服务器
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 覆盖channelRead()事件处理程序方法
ByteBuf in = (ByteBuf) msg;
System.out.println(
"Server received: " + in.toString(StandardCharsets.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// channelRead()执行完成后,关闭channel连接
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
先来看一下ServerBootstrap#channel
方法,我们会用NioServerSocketChannel.class
作为参数调用这个方法。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
//这里将factory记录到成员变量中
this.channelFactory = channelFactory;
return self();
}
}
而ReflectiveChannelFactory
实际上是通过反射的方式,将类型适配为工厂。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
'(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
}
}
其余几个方法实际上也都类似,只是赋值给成员变量而已。下面讨论一下bind
方法的实现。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
//开始监听端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}
其中initAndRegister
的逻辑如下:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
//如果设置了服务器的handler,则优先绑定到最前面
if (handler != null) {
pipeline.addLast(handler);
}
//增加接受连接相关的流程
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
简单看一下ServerBootstrapAcceptor
的内部逻辑。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//将一开始指定的childHandler绑定到流水线上
child.pipeline().addLast(childHandler);
//初始化channel的属性和选项
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//将channel注册到childGroup上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
}
我们还需要看一下channel
是如何被注册到EventLoop
上去的。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
看一下具体的注册逻辑,next()
返回的EventLoop
是NioEventLoop
的实例,查看其具体的register
逻辑。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
继续追踪下去。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
//具体的注册代码
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//触发handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//除法channelRegistered事件
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//初次注册还要触发channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
跟踪doRegister()
继续向下。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//下面就是NIO的源码了,channel注册到selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
}
下面看一下doBind0
的逻辑。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}
具体的绑定逻辑,层层递推可以找到:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}
其中next()
的返回值由EventLoopGroup
中的Chooser
策略所决定。但是基本逻辑都是通过轮询的方式,故保证每个线程都能分配到非常接近的连接。
//如果线程池大小为2^i,则可以用位运算代替模运算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
//如果线程池大小不是2^i的形式,这时候必须使用除法来计算下一个位置
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
//这里用long作为下标,保证在可预期的未来不会被耗尽
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
下面看一下NioEventLoop
的工作逻辑,我们会把任务通过它的execute
方法提交。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//如果不处于EventLoop线程中,则需要开启一个新线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
//这里做并发控制,保证仅开启一个线程
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//执行run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
}
SingleThreadEventExecutor
中的run
方法已经被NioEventLoop
所覆盖,看一下具体实现:
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
//strategy用来存储有多少连接当前可用
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
//计算下一个定时任务的触发时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
//-1表示无穷
curDeadlineNanos = NONE; // nothing on the calendar
}
//设置下一次的醒来事件
nextWakeupNanos.set(curDeadlineNanos);
try {
//如果没活干了,直接select的阻塞模式,直到下一次任务发生
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
//select执行次数
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
//ioRatio表示用于IO操作的线程时间的百分比,默认50%,可以自己手动设置
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
//所有时间都应该用来IO
try {
if (strategy > 0) {
//处理IO事件
processSelectedKeys();
}
} finally {
//最后执行全部任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
//处理IO事件
processSelectedKeys();
} finally {
//通过IO事件费时,可以估算出留给其它任务的时间
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//如果IO比例为0,那就不用考虑IO操作了
ranTasks = runAllTasks(0);
}
//如果存在读写操作或者执行了一些任务
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
}
这里出现了SelectStrategy
这个新类型,它用于计算下一步操作该做什么。它的默认实现类如下
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
//如果还有额外的任务,就尝试去获取所有可用的连接,否则返回SelectStrategy.SELECT,表示使用阻塞版本的SELECT
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
看一下具体是如何处理可用连接的:
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//处理key
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
//是否是连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//读或接受连接事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
其中实现在NioByteUnsafe
中
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//调用流水线的channelRead消费读到的数据
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
当然如果你是ServerChannel
的话,逻辑在NioMessageUnsafe
中。
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//接受连接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
下面讲一下客户端连接是如何绑定到从线程组的。ServerBootstrapAcceptor
处理客户端连接的时候,会调用childGroup.register(child)
来注册客户端连接。其中childGroup
一般实现类都是MultithreadEventLoopGroup
。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
//选择下一个线程来负责这个连接
return next().register(channel);
}
}
其next()
返回的线程一般实现类都是NioEventLoop
。它的register
实现如下:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
具体的注册代码处于AbstractUnsafe
中。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//一般走这里,因为现在线程是主线程池中
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
其中doRegister
又会调用回NioSocketChannel
中。具体逻辑如下:
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//这里将自己注册到线程绑定的selector上。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
}
一些总结
Netty的网络模型是主从Reactor多线程模型,即主线程池负责处理连接请求,而从线程池负责处理IO请求。业务请求应该放在业务线程池中处理。
我们一般使用的线程池都是NioEventLoopGroup
,它里面的每个线程都是NioEventLoop
的实例。NioEventLoopGroup
中默认通过轮询的方式分配下一个线程,你可以通过传入EventExecutorChooserFactory
的工厂实例,来提供自定义的选择器策略。
而由于NioEventLoop
继承了SingleThreadEventLoop
,而后者是惰性创建线程的,因此NioEventLoopGroup
中的线程也是惰性创建的。
每个NioEventLoop
都负责了管理多个连接的生命周期,每个NioEventLoop
都维护了一个自己的Selector
来快速确定哪些连接当前读写可用。
如果NioEventLoop
负责处理的连接事件,那么它对应的handler为ServerBootstrapAcceptor
,否则则是用户自定义的handler。
你可以通过NioEventLoopGroup#setIoRation
来设置线程池创建的线程,它们的IO占比,值域为$0$到$100$之间,默认值为$50$。注意即使你设置值为$0$或$100$,线程依旧会处理任务和IO操作。其主要是在处理完IO操作后,会根据这个操作耗时计算分配给任务的时间数。