Netty源码解析 -- 客户端启动过程

上一篇文章分享了Netty服务端启动过程,本文继续分享Netty客户端启动过程。
源码分析基于Netty 4.1

Connect

客户端启动过程比较简单,主要是Connect操作。
Netty客户端启动引导类是Bootstrap,同样继承了AbstractBootstrap,它只有一个EventLoopGroup,下文称为ConnectGroup。

Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                           final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        final EventLoop eventLoop = channel.eventLoop();
        // #1
        final AddressResolver resolver = this.resolver.getResolver(eventLoop);
        
        ...
        
        final Future resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // #2
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        ...
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

#1
AddressResolver负责解析SocketAddress。它可以做一些地址转换工作。如Netty提供了RoundRobinInetAddressResolver,可以对下游服务集群进行轮询调用。
Bootstrap#resolver是一个AddressResolverGroup,它负责构造AddressResolver,默认使用DefaultAddressResolverGroup。
#2 调用doConnect,执行Connect操作。

doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(这里涉及DefaultChannelPipeline的内容后续有文章解析)

public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...

    try {
        ...

        boolean wasActive = isActive();
        // #1
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // #2
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            // #3
            promise.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

#1 调用SocketChannel#connect,如果是非阻塞Socket调用,该方法返回false。
#2 给EventLoop添加一个定时任务,如果连接超时则关闭Channel。
Netty中也提供了ReadTimeoutHandler处理读超时的场景。
#3 给promise添加一个回调方法,connect操作完成时,如果connect操作被取消了,则关闭Channel。

NioSocketChannel#doConnect

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    ...

    boolean success = false;
    try {
        // #1
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        // #2
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

#1 调用(jvm)SocketChannel#connect方法,同样,非阻塞SocketChannel调用该方法,返回false。
#2 关注OP_CONNECT事件。

EventLoop中负责处理OP_CONNECT事件(EventLoop后面有文章解析),调用AbstractNioUnsafe#finishConnect完成连接操作。

public final void finishConnect() {
    ...
    try {
        boolean wasActive = isActive();
        // #1
        doFinishConnect();
        // #2
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // #3
        if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

#1 doFinishConnect方法由子类NioSocketChannel实现,就是调用(jvm)SocketChannel#finishConnect()方法
#2 设置connectPromise处理成功
#3 取消connectTimeoutFuture延迟任务

注册关注Read事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
前面解析服务端启动过程时说过,HeadContext#channelActive会调用readIfIsAutoRead方法,判断是否开启autoRead,开启则自动触发read事件处理方法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead在解析服务端启动过程时也说过,这里会注册关注Read事件。

客户端启动完成后,客户端和服务端就可以开始进行Read/Write操作了。

如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!

你可能感兴趣的