ZooKeeper源码分析之NIOServerCnxn

文章目录

  • 2021SC@SDUSC
  • 源码分析
  • 总结

2021SC@SDUSC

NIOServerCnxn继承了ServerCnxn抽象类,使用NIO来处理与客户端之间的通信。

源码分析

(1)属性

	//日志
    private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);
    //基于NIO的ServerCnxn工厂
    private final NIOServerCnxnFactory factory;
    //面向流的连接套接字的可选择的通道
    private final SocketChannel sock;
    //Selector线程
    private final SelectorThread selectorThread;
    //表示可选择的通道在Selector中注册的标记
    private final SelectionKey sk;
    //是否初始化标志
    private boolean initialized;
    //分配四个字节缓冲区,以及赋值
    private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
    private ByteBuffer incomingBuffer = lenBuffer;
    //创建缓冲队列
    private final Queue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
    //会话超时
    private int sessionTimeout;
    //会话ID
    private long sessionId;

(2)构造函数,在构造函数中会对Socket通道进行相应设置,如设置TCP连接无延迟、获取客户端的IP地址并将此信息进行记录,方便后续认证,最后设置SelectionKey感兴趣的操作类型为read。

    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException {
        super(zk);
        //设置socket通道
        this.sock = sock;
        this.sk = sk;
        this.factory = factory;
        this.selectorThread = selectorThread;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
        sock.socket().setTcpNoDelay(true);
        /* 设置linger为false,以便在socket关闭时不会阻塞 */
        sock.socket().setSoLinger(false, -1);
        //获取IP地址
        InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress();
        //认证信息中添加IP地址
        addAuthInfo(new Id("ip", addr.getHostAddress()));
        //设置感兴趣的操作类型
        this.sessionTimeout = factory.sessionlessCnxnTimeout;
    }

(3)核心函数——doIO(),该函数主要是进行IO处理

    void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (!isSocketOpen()) {//socket未开启
                LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) {//key可读
            		//若是客户端请求,此时触发读事件
                //将内容从socket写入incoming缓冲
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {//流结束异常,无法从客户端读取数据
                    handleFailedRead();
                }
                //缓冲区已经写满
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    //读取下个请求
                    
                    if (incomingBuffer == lenBuffer) {
                        //翻转缓冲区,可读
                        //解析上文中读取的报文总长度,同时为"incomingBuffer"分配len的空间供读取全部报文
                        incomingBuffer.flip();
                        //读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false
                        isPayload = readLength(k);
                        //清除缓冲
                        incomingBuffer.clear();
                    } else {
                        //因为在readLength中根据Len已经重新分配了incomingBuffer
                        isPayload = true;
                    }
                    if (isPayload) { //不为四个字母,为实际内容
                        //读取内容
                        readPayload();
                    } else {
                        //四个字母,为四字母的命令
                        return;
                    }
                }
            }
            //key可写
            if (k.isWritable()) {
                handleWrite(k);
                //没有初始化或者没有获取读写权限,抛出异常
                if (!initialized && !getReadInterest() && !getWriteInterest()) {
                    throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
                }
            }
        } catch (CancelledKeyException e) {
            LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));

            LOG.debug("CancelledKeyException stack trace", e);

            close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
        } catch (CloseRequestException e) {
            // expecting close to log session closure
            close();
        } catch (EndOfStreamException e) {
            LOG.warn("Unexpected exception", e);
            // expecting close to log session closure
            close(e.getReason());
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.CLIENT_CNX_LIMIT);
        } catch (IOException e) {
            LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
            close(DisconnectReason.IO_EXCEPTION);
        }
    }
    

sendBuffer(),该函数将缓冲写入socket中

    /**
     * sendBuffer将字节缓冲区推送到传出缓冲区队列中,以便异步写入。
     */
    public void sendBuffer(ByteBuffer... buffers) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid());
        }

        synchronized (outgoingBuffers) {
            for (ByteBuffer buffer : buffers) {
                outgoingBuffers.add(buffer);
            }
            outgoingBuffers.add(packetSentinel);
        }
        requestInterestOpsUpdate();
    }
  

readPayload(),首先会将socket中的实际内容写入incomingBuffer中,当读取完成后,则更新接收的包统计信息,之后再根据是否初始化了还确定读取连接请求还是直接请求,最后会清除缓存,并重新让incomingBuffer与lenBuffer相等,表示该读取过程结束。

    /** 读取请求有效负载(长度前缀后面的所有内容) */
    private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        // 表示还未读取完socket中内容
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            // 将socket的内容读入缓冲
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            // 流结束异常,无法从客户端读取数据
            if (rc < 0) {
                handleFailedRead();
            }
        }
        // 表示已经读取完了Socket中内容
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            incomingBuffer.flip();
            // 接收到packet
            packetReceived(4 + incomingBuffer.remaining());
            // 未初始化
            if (!initialized) {
                // 读取连接请求
                readConnectRequest();
            } else {
                // 读取请求
                readRequest();
            }
            // 清除缓冲
            lenBuffer.clear();
            // 赋值incomingBuffer,即清除incoming缓冲
            incomingBuffer = lenBuffer;
        }
    }

handleWrite(),当key为可写时调用此方法。

   void handleWrite(SelectionKey k) throws IOException {
        if (outgoingBuffers.isEmpty()) {
            return;
        }

        /*
         * 尝试获取直接内存
         */
        ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
        if (directBuffer == null) {
            //不使用直接内存
            ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
            // Use gathered write call. This updates the positions of the
            // byte buffers to reflect the bytes that were written out.
            sock.write(outgoingBuffers.toArray(bufferList));

            // Remove the buffers that we have sent
            ByteBuffer bb;
            while ((bb = outgoingBuffers.peek()) != null) {
                if (bb == ServerCnxnFactory.closeConn) {
                    throw new CloseRequestException("close requested", DisconnectReason.CLIENT_CLOSED_CONNECTION);
                }
                if (bb == packetSentinel) {
                    packetSent();
                }
                if (bb.remaining() > 0) {
                    break;
                }
                outgoingBuffers.remove();
            }
        } else {
            //使用直接内存
            directBuffer.clear();

            for (ByteBuffer b : outgoingBuffers) {
                if (directBuffer.remaining() < b.remaining()) {
                    /*
                     * 若directBuffer的剩余可写空间不足以容纳b的所有数据,则修改b的limit为directBuffer的剩余可写空间.
                     * 这样下面的复制代码刚好将directBuffer的可写空间写满
                     */
                    b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
                }
                /*
                 * put()会修改b和directBuffer的position值,但是我们不能修改b的position值,
                 * 因为下文需要position的值将已发送的数据移出outgoingBuffers,因此在复制结束后重置position值.
                 */
                int p = b.position();
                //将b中的数据复制到directBuffer中
                directBuffer.put(b);
                b.position(p);
                if (directBuffer.remaining() == 0) {
                    break;
                }
            }
            /*
             * Do the flip: limit becomes position, position gets set to
             * 0. This sets us up for the write.
             */
            directBuffer.flip();
            //返回发送的字节数,下文据此移除已发送的数据
            int sent = sock.write(directBuffer);

            ByteBuffer bb;

            // 将已发送的buffers从outgoingBuffers中移除
            while ((bb = outgoingBuffers.peek()) != null) {
                if (bb == ServerCnxnFactory.closeConn) {
                    throw new CloseRequestException("close requested", DisconnectReason.CLIENT_CLOSED_CONNECTION);
                }
                if (bb == packetSentinel) {
                    packetSent();
                }
                if (sent < bb.remaining()) {
                    /*
                     * 只发送了此Buffer的部分数据,因此修改position的值并退出循环
                     */
                    bb.position(bb.position() + sent);
                    break;
                }
                //该buffer的数据已经全部发送,将buffer从outgoingBuffers中移除
                sent -= bb.remaining();
                outgoingBuffers.remove();
            }
        }
    }

总结

NIOServerCnxn读取数据流程如下:

(1)NIOServerCnxn中有两个属性,一个是lenBuffer,容量为4个字节,用于读取长度信息.一个是incomingBuffer,其初始化时即为lenBuffer,但是读取长度信息后,就为incomingBuffer分配对应的空间用于读取payload
(2)根据请求报文的长度分配incomingBuffer的大小
(3)将读到的字节存放在incomingBuffer中,直至读满
(4)处理报文

你可能感兴趣的