当前位置:首页 > 开发 > 编程语言 > Java IO > 正文

Netty源码学习-Java-NIO-Reactor

发表于: 2013-12-19   作者:bylijinnan   来源:转载   浏览:
摘要: Netty里面采用了NIO-based Reactor Pattern 了解这个模式对学习Netty非常有帮助 参考以下两篇文章: http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Netty里面采用了NIO-based Reactor Pattern
了解这个模式对学习Netty非常有帮助

参考以下两篇文章:
http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

本文所贴的代码来自第一篇文章,在注释部分加入了我自己的理解
完整代码可以到我的github上下载,仅供参考:
https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/reactor

package com.ljn.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/*

单线程的实现
Server端用一个Selector利用一个线程(在main方法里面start)来响应所有请求
1.当ACCEPT事件就绪,Acceptor被选中,执行它的run方法:创建一个Handler(例如为handlerA),并将Handler的interestOps初始为READ
2.当READ事件就绪,handlerA被选中,执行它的run方法:它根据自身的当前状态,来执行读或写操作
因此,每一个Client连接过来,Server就创建一个Handler,但都所有操作都在一个线程里面

Selection Key   Channel                 Handler     Interested Operation
------------------------------------------------------------------------
SelectionKey 0  ServerSocketChannel     Acceptor    Accept
SelectionKey 1  SocketChannel 1         Handler 1   Read and Write
SelectionKey 2  SocketChannel 2         Handler 2   Read and Write
SelectionKey 3  SocketChannel 3         Handler 3   Read and Write

如果采用多个selector,那就是所谓的“Multiple Reactor Threads”,大体思路如下:

Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ... 
     public synchronized void run() { ...
         Socket connection = serverSocket.accept();
         if (connection != null)
             new Handler(selectors[next], connection);
         if (++next == selectors.length) next = 0;
     }
}

 */
public class Reactor implements Runnable {
 
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    final boolean isWithThreadPool;
 
    /*Reactor的主要工作:
     * 1.给ServerSocketChannel设置一个Acceptor,接收请求
     * 2.给每一个一个SocketChannel(代表一个Client)关联一个Handler
     * 要注意其实Acceptor也是一个Handler(只是与它关联的channel是ServerSocketChannel而不是SocketChannel)
     */
    Reactor(int port, boolean isWithThreadPool) throws IOException {
 
        this.isWithThreadPool = isWithThreadPool;
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey0.attach(new Acceptor());
    }
 
 
    public void run() {
        System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort());
        try {
            while (!Thread.interrupted()) {
                int readySelectionKeyCount = selector.select();
                if (readySelectionKeyCount == 0) {
                    continue;
                }
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey) (it.next()));
                }
                
                //不会自动remove,因此要手动清;下次事件到来会自动添加
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
    
    //从SelectionKey中取出Handler并执行Handler的run方法,没有创建新线程
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        } 
    }
    
    //主要工作是为每一个连接成功后返回的SocketChannel关联一个Handler,详见Handler的构造函数
    class Acceptor implements Runnable {
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    if (isWithThreadPool)
                        new HandlerWithThreadPool(selector, socketChannel);
                    else
                        new Handler(selector, socketChannel);
                }
                System.out.println("Connection Accepted by Reactor2");
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws IOException{
        
        int port = 9900;
        boolean withThreadPool = false;
        Reactor reactor  = new Reactor(port, withThreadPool);
        new Thread(reactor).start();
    }
}


package com.ljn.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/*
 * 单线程版本的Handler
 */
public class Handler implements Runnable {
 
    final SocketChannel socketChannel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(1024);
    static final int READING = 0, SENDING = 1;
    
    //初始状态
    int state = READING;
    String clientName = "";
 
    //在handler里面设置interestOps,而且这个interestOps是会随着事件的进行而改变的
    Handler(Selector selector, SocketChannel c) throws IOException {
        socketChannel = c;
        c.configureBlocking(false);
        selectionKey = socketChannel.register(selector, 0);
        
        /*
        handler作为SellectionKey的attachment。这样,handler就与SelectionKey也就是interestOps对应起来了
        反过来说,当interestOps发生、SelectionKey被选中时,就能从SelectionKey中取得handler
        */
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
 
    //在Reactor的dispatch方法里面被调用,但是直接的方法调用,没有创建新线程
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
 
    void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            readProcess(readCount);
        }
        state = SENDING;
        // Interested in writing
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }
 
    /**
     * Processing of the read message. This only prints the message to stdOut.
     * 非IO操作(业务逻辑,实际应用中可能会非常耗时):将Client发过来的信息(clientName)转成字符串形式
     * @param readCount
     */
    synchronized void readProcess(int readCount) {
        StringBuilder sb = new StringBuilder();
        input.flip();   //from writing mode to reading mode
        byte[] subStringBytes = new byte[readCount];
        byte[] array = input.array();
        System.arraycopy(array, 0, subStringBytes, 0, readCount);
        // Assuming ASCII (bad assumption but simplifies the example)
        sb.append(new String(subStringBytes));
        input.clear();
        clientName = sb.toString().trim();
    }
 
    void send() throws IOException {
        System.out.println("Saying hello to " + clientName);
        ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());
        socketChannel.write(output);
        selectionKey.interestOps(SelectionKey.OP_READ);
        state = READING;
    }
}



	package com.ljn.reactor;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/*
 * 多线程版本的Handler
 * 思路就是把耗时的操作(非IO操作)放到其他线程里面跑,
 * 使得Handler只专注与Channel之间的IO操作;
 * Handler快速地从Channel中读或写,可以使Channel及时地、更快地响应其他请求
 * 耗时的操作完成后,产生一个事件(改变state),再“通知”(由Handler轮询这个状态是否有改变)
 * Handler执行Channel的读写操作
 */
public class HandlerWithThreadPool extends Handler {
 
    static ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 2;
 
    public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {
        super(sel, c);
    }
 
    //Handler从SocketChannel中读到数据后,把“数据的处理”这个工作扔到线程池里面执行
    void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            state = PROCESSING;
            
            //execute是非阻塞的,所以要新增一个state(PROCESSING),表示数据在处理当中,Handler还不能执行send操作
            pool.execute(new Processer(readCount)); 
        }
        //We are interested in writing back to the client soon after read processing is done.
        //这时候虽然设置了OP_WRITE,但下一次本Handler被选中时不会执行send()方法,因为state=PROCESSING
        //或者可以把这个设置放到Processer里面,等process完成后再设为OP_WRITE
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }
 
    //Start processing in a new Processer Thread and Hand off to the reactor thread.
    synchronized void processAndHandOff(int readCount) {
        readProcess(readCount);
        //Read processing done. Now the server is ready to send a message to the client.
        state = SENDING;
    }
 
    class Processer implements Runnable {
        int readCount;
        Processer(int readCount) {
            this.readCount =  readCount;
        }
        public void run() {
            processAndHandOff(readCount);
        }
    }
}

Netty源码学习-Java-NIO-Reactor

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

我来说两句
评论内容:
验  证  码:
 
(网友评论仅供其表达个人看法,并不表明本站同意其观点或证实其描述。)
评论列表
已有 0 条评论(查看更多评论)
编辑推荐
背景 最忌工作中接触到Netty相关应用场景,之前看过mima的部分源码,所以最近看了Netty的部分源码和
类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExe
Transport API的核心: Channel接口 类图表示Channel含有Pipeline和Config接口,pipeline上一节有所
Netty是一个基于JAVA NIO类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、
先看下netty的channel对象关联关系。channel是由channelfactory来创建的,channelfactory又分为clie
感谢网友【黄亿华】投递本稿。 上一篇文章我们概要介绍了Netty的原理及结构,下面几篇文章我们开始
上一篇文章我们概要介绍了Netty的原理及结构,下面几篇文章我们开始对Netty的各个模块进行比较详细
netty里面最重要的应该是ChannelHandler,这个里面也是用户编程直接打交道的接口,也是串行于Channe
本文采用版本为Jboss Netty-3.2.4.Final,Jboss Netty示例example、几十页的user guide是快速学习的
本文为原创,转载请注明出处 netty 4源码分析-write Netty的写操作由两个步骤组成: Write:将msg存
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号