Netty源码解析 -- 事件循环机制实现原理

本文主要分享Netty中事件循环机制的实现。
源码分析基于Netty 4.1

EventLoop

前面分享服务端和客户端启动过程的文章中说过,Netty通过事件循环机制(EventLoop)处理IO事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。示例如下

while(true) {
    process(selector.select());

    process(getTask());
}

这种事件循环机制也是一种常用的IO事件处理机制,包括Redis,Mysql都使用了类似的机制。

关于异步任务,前面文章说过,EventLoop实现了(jvm)Executor的接口,execute方法可以提供异步任务。
register,bind,connect等操作,都会提交一个任务给EventLoop处理。如

if (eventLoop.inEventLoop()) {
    register0(promise);    
} else {
    eventLoop.execute(new Runnable() {    
        public void run() {
            register0(promise);
        }
    });
}

下面看一下Netty中事件循环机制相关的类。

EventExecutor,事件执行器,负责处理事件。
EventExecutorGroup维护了一个EventExecutor链表,它继承了ScheduledExecutorService,execute方法通过next方法选择一个EventExecutor,并调用EventLoop#execute处理事件。
(EventExecutor继承了EventExecutorGroup,可以看做一个特殊的EventExecutorGroup,其execute方法可以提交一个任务任务)

EventLoop,事件循环器,继承了EventExecutor,通过循环不断处理注册于其上的Channel的IO事件。
EventLoopGroup接口则继承了EventExecutorGroup,负责调度EventLoop。

SingleThreadEventExecutor实现了EventExecutor,它会创建一个新线程,并在该线程上处理事件,可以理解为单线程处理器。
MultithreadEventExecutorGroup实现EventExecutorGroup,可以理解为多线程处理器(实际上是维护了多个EventExecutor,一个EventExecutor可以理解为一个线程),newChild方法构造具体的EventExecutor。
MultithreadEventExecutorGroup可以配置EventExecutor数量,即线程数量。
EventExecutorChooserFactory.EventExecutorChooser负责选择一个EventExecutor执行实际操作。

NioEventLoop继承了SingleThreadEventExecutor,负责处理NIO事件。所以,一个NioEventLoop对象可以看做是一个线程。
NioEventLoop也实现了EventLoop接口,它实现了事件循环机制,是Netty核心类。

MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup,并实现了EventLoopGroup,其newChild方法构造具体的EventLoop。
NioEventLoopGroup#newChild会构建NioEventLoop。

EventLoop各实现类关系如下
Netty源码解析 -- 事件循环机制实现原理_第1张图片

启动

SingleThreadEventExecutor关键字段

private final Queue taskQueue;    // 待处理异步任务
private volatile Thread thread;                // EventLoop执行线程,即SingleThreadEventExecutor创建的新线程
private final Executor executor;            // java.util.concurrent.Executor,负责创建线程

当我们通过execute方法提交任务时,如果还没有创建执行新线程,会通过SingleThreadEventExecutor#executor一个新线程,并在新线程中调用run方法(run方法由子类实现,负责实现事件循环机制,新线程是EventLoop真正执行线程)。

SingleThreadEventExecutor#execute

public void execute(Runnable task) {
    ...

    boolean inEventLoop = inEventLoop();
    // #1
    addTask(task);
    // #2
    if (!inEventLoop) {
        startThread();
        // #3
        if (isShutdown()) {
            ...
        }
    }
    // #4
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

#1 添加任务到待处理列表
#2
inEventLoop方法,判断当前线程是否为EventLoop执行线程
若当前线程非EventLoop执行线程,调用startThread方法启动一个新的线程,执行run方法。
这里可以理解为启动EventLoop。
#3 如果当前EventLoop已关闭,拒绝任务
#4 若当前EventLoop线程阻塞正等待IO事件(Selector#select方法),调用wakeup方法唤醒线程执行该新增任务

循环机制

NioEventLoop#run方法负责实现NIO事件处理机制。

protected void run() {
    int selectCnt = 0;
    // #1
    for (;;) {

            int strategy;
            
                // #2
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    // #3
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        // #4
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // #5
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
                ...
            
            // #6
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            // #7
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }
            // #8
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        
        
            // #9
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        
    }
}

为了版面整洁,这里删除了异常处理代码。
#1 可以看到,这里通过一个死循环不断处理IO事件和异步任务。
#2 如果当前存在待处理的任务,调用selector.selectNow(),这时会跳出switch语句,往下处理事件和任务,否则返回SelectStrategy.SELECT。
#3 curDeadlineNanos,计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),没有任务则返回-1。
更新nextWakeupNanos为阻塞时间。
由于频繁调用(jvm)Selector.wakeup会造成性能消耗,NioEventLoop维护了一个唤醒标识nextWakeupNanos。nextWakeupNanos有三种值
NONE -- 执行线程被阻塞
AWAKE -- 执行线程未阻塞
其他值 -- 执行线程被超时阻塞,在指定的时间后唤醒
NioEventLoop#wakeup方法中,只有nextWakeupNanos.getAndSet(AWAKE) != AWAKE成功才调用selector.wakeup()方法。
#4
这时如果还没有任务加入,则执行select,阻塞线程。select方法返回结果作为新的strategy。
#5
lazySet方法,设置值之后其他线程在短期内还是可能读到旧值
这里将nextWakeupNanos设置为AWAKE,主要是减少wakeup方法中不必要的wakeup操作。
所以使用lazySet方法也没有问题。
#6 selectCnt增加
旧版本的Java NIO在Linux Epoll实现上存在bug,(jvm)Selector.select方法可能在没有任何就绪事件的情况下返回,导致CPU空转,利用率飙升到100%。
于是,Netty计算select方法重复调用次数selectCnt,并在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD配置(默认512)时,重建selector,从而规避该问题。
幸好在JDK6_6u4,JDK7_b12已修复该Bug。
#7 processSelectedKeys方法处理IO事件,runAllTasks方法处理任务。
ioRatio表示执行IO事件所占CPU时间百分比,默认50,
ioTime * (100 - ioRatio) / ioRatio,通过ioTime,ioRatio计算处理任务的CPU时间。
#8 如果执行了任务或者select方法返回有效值,直接重置selectCnt。
unexpectedSelectorWakeup方法中会在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD时重建selector。
#9 如果是正在关闭状态,则要关闭所有的Channel

IO事件

下面看一下Eventloop中如何处理IO事件。
NioEventLoop关键字段

Selector unwrappedSelector;                // JVM中的Selector
Selector selector;                        // 优化后的SelectedSelectionKeySetSelector
SelectedSelectionKeySet selectedKeys;    // 对(jvm)Selector#selectedKeys进行优化

SelectedSelectionKeySetSelector每次调用select前都清除SelectedSelectionKeySet
SelectedSelectionKeySet使用数组代替原Selector的中的HashSet,提高性能。数组默认大小为1024,不够用时扩展为原大小的2倍。

NioEventLoop#构造方法 -> NioEventLoop#openSelector

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // #1
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    ...

    final Class selectorImplClass = (Class) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {
        
        public Object run() {
            try {
                // #2
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                ...

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } ...
        }
    });

    ...
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // #3
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
} 
 

#1 通过(jvm)SelectorProvider打开一个Selector
#2 构造了selectedKeySet,并通过反射将该对象设置到Selector的selectedKeys,publicSelectedKeys属性中,这样Selector监听到的事件就会存储到selectedKeySet。
#3 构造了SelectedSelectionKeySetSelector对象

NioEventLoop#select负责阻塞线程,等待IO事件

private int select(long deadlineNanos) throws IOException {
    // #1
    if (deadlineNanos == NONE) {
        return selector.select();
    }

    // #2
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

#1 一直阻塞,知道发生IO事件或加入了新任务
#2 计算阻塞时间,在原阻塞时间加上995微秒后转化为毫秒。
如果原阻塞时间在5微秒内,就不阻塞了。

IO事件的处理流程为
NioEventLoop#processSelectedKeys -> (没有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    ...

    try {
        int readyOps = k.readyOps();
        // #1
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // #2
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // #3
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

#1 处理OP_CONNECT
移除关注事件OP_CONNECT,否则Selector.select(..)将不断返回
前面分享客户端启动过程的文章说过了,这里会调用AbstractNioUnsafe#finishConnect,完成客户端Connect操作,可回顾《客户端启动过程解析》。
#2 先处理OP_WRITE事件,能够尽早写入数据释放内存,这里涉及flush操作,后面有文章解析。
#3 处理OP_READ或OP_ACCEPT事件。
对于ServerChannel,这里会调用NioMessageUnsafe#read,处理OP_ACCEPT事件,可回顾《客户端启动过程解析》。
对于SocketChannel,这里会调用NioByteUnsafe#read,进行读写操作,后面有文章解析。

异步任务

下面看一下Eventloop中如何处理异步任务。
run方法#4步骤 -> SingleThreadEventExecutor#runAllTasks

protected boolean runAllTasks(long timeoutNanos) {
    // #1
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // #2
        safeExecute(task);

        runTasks ++;

        // #3
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // #4
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // #5
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

#1 AbstractScheduledEventExecutor#scheduledTaskQueue中存放的是定时任务,
SingleThreadEventExecutor#taskQueue中存放的是待处理的任务。
fetchFromScheduledTaskQueue方法会获取已到期的定时任务,移动到SingleThreadEventExecutor#taskQueue。
#2 执行获取的任务
#3 每个64个任务检查一次是否超时,因为nanoTime()方法也是一个相对昂贵的操作。
#4 取下一个任务,继续处理
#5 预留的扩展方法。

NioEventLoop在Netty 4.1版本被优化,代码做了较大改动,删除了原来的wakeup标志,改用nextWakeupNanos,代码更清晰。
请参考 -- Clean up NioEventLoop

Netty是由事件驱动的,服务端register,bind,客户端connect等操作都是提交异步任务给EventLoop处理的
,而Accept,Read/Writ,Connect等IO事件都都需要EventLoop的处理。
大家可以结合前面分析服务端和客户端启动过程的文章,理解EventLoop是如何驱动Netty工作的。

如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!