当前位置:首页 > 开发 > 编程语言 > 网络编程 > 正文

Netty源码学习-ReadTimeoutHandler

发表于: 2013-12-26   作者:bylijinnan   来源:转载   浏览:
摘要: ReadTimeoutHandler的实现思路: 开启一个定时任务,如果在指定时间内没有接收到消息,则抛出ReadTimeoutException 这个异常的捕获,在开发中,交给跟在ReadTimeoutHandler后面的ChannelHandler,例如 private final ChannelHandler timeoutHandler = new ReadTim


ReadTimeoutHandler的实现思路:
开启一个定时任务,如果在指定时间内没有接收到消息,则抛出ReadTimeoutException
这个异常的捕获,在开发中,交给跟在ReadTimeoutHandler后面的ChannelHandler,例如

private final ChannelHandler timeoutHandler =
	new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
	new UptimeClientHandler(bootstrap, timer);

public ChannelPipeline getPipeline() throws Exception {
	return Channels.pipeline(
			timeoutHandler, uptimeHandler);
}

public class UptimeClientHandler ...{
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        Throwable cause = e.getCause();
        if (cause instanceof ReadTimeoutException) {
            // The connection was OK but there was no traffic for last period.
            println("Disconnecting due to no inbound traffic");
        } else {
            cause.printStackTrace();
        }
        ctx.getChannel().close();
    }
}


ReadTimeoutHandler的关键源码:
	
	//ChannelOpen时启动定时任务:
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        initialize(ctx);
        ctx.sendUpstream(e);
    }
    private void initialize(ChannelHandlerContext ctx) {
        State state = state(ctx);
        state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
    }

	
	//每次接收到消息时更新lastReadTime
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        State state = (State) ctx.getAttachment();
        state.lastReadTime = System.currentTimeMillis();
        ctx.sendUpstream(e);
    }
	
	/*定时任务:判断指定时间内是否有消息到达
		举例:
		假设超时时间设为30秒,初始的lastReadTime=10:00:00
		那么,定时任务在10:00:30时执行run方法,如果:
		1.在10:00:18有消息到达(lastReadTime更新为10:00:18),则表示没有超时,
		继续监听下一个30秒,也就是定时任务需要在10:00:48再跑一次
		因此下一次定时任务的执行距离现在是:nextDelay=30-(30-18)=18(秒)
		2.没有消息到达,超时,抛异常
	*/
	private final class ReadTimeoutTask implements TimerTask {
        public void run(Timeout timeout) throws Exception {
            State state = (State) ctx.getAttachment();
            long currentTime = System.currentTimeMillis();
            long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
            if (nextDelay <= 0) {
                // Read timed out - set a new timeout and notify the callback.
                state.timeout =
                    timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
                try {
                    // FIXME This should be called from an I/O thread.
                    //       To be fixed in Netty 4.
                    readTimedOut(ctx);
                } catch (Throwable t) {
                    fireExceptionCaught(ctx, t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                state.timeout =
                    timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
            }
        }
    }
	
	//为什么这里会调用initialize方法?分析在下面
	public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
        if (ctx.getPipeline().isAttached()) {
            // channelOpen event has been fired already, which means
            // this.channelOpen() will not be invoked.
            // We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelOpen event has not been fired yet.
            // this.channelOpen() will be invoked and initialization will occur there.
        }
    }
	
	


上面的beforeAdd方法不太好理解
先看看ClientBootstrap的connect方法:
	public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

        ChannelPipeline pipeline;
        try {
		
			//这里调用ChannelPipeline.addLast,在真正往链表里面插入之前,调用beforeAdd
            pipeline = getPipelineFactory().getPipeline();
        } catch (Exception e) {
            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
        }
		
		//创建一个代表Client的SocketChannel,SocketChannel的构造函数里会调用:
		//        pipeline.attach(this, sink);
		//然后会fireChannelOpen
        Channel ch = getFactory().newChannel(pipeline);
		
		//...
	}
	


从正常的流程来说,是先创建创建pipeline再创建channel,
也就是beforeAdd会在channel创建之前调用,那么beforeAdd里面的判断:
if (ctx.getPipeline().isAttached()) 就不会返回true(因为此时channel还未创建,更不可能与pipeline关联了)
这样看来,只需要在channelOpen中调用initialize就可以了?
不是的,
因为还有一种情况:动态添加ChannelHandler
有可能channel已经创建(与pipeline关联了),且channelOpen已经执行过了,
那就需要在添加ReadTimeoutHandler时,执行initialize


Netty源码学习-ReadTimeoutHandler

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
背景 最忌工作中接触到Netty相关应用场景,之前看过mima的部分源码,所以最近看了Netty的部分源码和
类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExe
Transport API的核心: Channel接口 类图表示Channel含有Pipeline和Config接口,pipeline上一节有所
Netty是一个基于JAVA NIO类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、
先看下netty的channel对象关联关系。channel是由channelfactory来创建的,channelfactory又分为clie
感谢网友【黄亿华】投递本稿。 上一篇文章我们概要介绍了Netty的原理及结构,下面几篇文章我们开始
上一篇文章我们概要介绍了Netty的原理及结构,下面几篇文章我们开始对Netty的各个模块进行比较详细
netty里面最重要的应该是ChannelHandler,这个里面也是用户编程直接打交道的接口,也是串行于Channe
本文采用版本为Jboss Netty-3.2.4.Final,Jboss Netty示例example、几十页的user guide是快速学习的
本文为原创,转载请注明出处 netty 4源码分析-write Netty的写操作由两个步骤组成: Write:将msg存
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号