当前位置:首页 > 开发 > 系统架构 > 架构 > 正文

java ipc 实例

发表于: 2014-05-21   作者:blackproof   来源:转载   浏览次数:
摘要: java ipc实例,仿照hadoop ipc写的实例 1.用接口规定ipc协议的方法 2.client端用动态代理作调用远程ipc接口方法 3.server端用反射,执行ipc接口方法,并返回给client端接口方法返回值   hadoop ipc的另一个特点是server端用三个角色,Listener,Handler,Responser。server聚合这三个角色 Lis

java ipc实例,仿照hadoop ipc写的实例

1.用接口规定ipc协议的方法

2.client端用动态代理作调用远程ipc接口方法

3.server端用反射,执行ipc接口方法,并返回给client端接口方法返回值

 

hadoop ipc的另一个特点是server端用三个角色,Listener,Handler,Responser。server聚合这三个角色

Listener:nio socket获取请求CALL对象,放入队列中

Handler:从队列中获取CALL对象,执行ipc接口方法

Responser:被Handler调用,用nio socket返回接口方法返回值

 

简单实例:(实例来自github)

1.定义协议

public interface Echo {
	public String who() throws IOException;;

	public void from(String name) throws IOException;;
}

 2.定义代理

/*Invocation封装方法名和参数*/
		public Object invoke(Object proxy, Method method, Object[] args)
				throws Throwable {
			final boolean logDebug = LOG.isDebugEnabled();
			long startTime = 0;
			if (logDebug) {
				startTime = System.currentTimeMillis();
			}
			//调用Client call方法
			Invocation value = client.call(new Invocation(iface, method, args),
					remoteId);//调用远程的当前方法,阻塞直到server返回值
			if (logDebug) {
				long callTime = System.currentTimeMillis() - startTime;
				LOG.debug("Call: " + method.getName() + "() " + callTime);
			}
			return value.getResult();
		}

 

public Invocation call(Invocation invoked, ConnectionId remoteId)
			throws InterruptedException, IOException {
		Call call = new Call(invoked);    //将传入的数据封装成call对象 Serializable接口
		//已经向服务器端 RPCHeader ConnectionHeader验证
		Connection connection = getConnection(remoteId, call); //获得一个连接  
		connection.sendParam(call); // 向服务端发送Call对象
		boolean interrupted = false;
		synchronized (call) {
			while (!call.done) {
				try {
					call.wait(); //等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程  
				} catch (InterruptedException ie) {
					// save the fact that we were interrupted
					interrupted = true;
				}
			}

			if (interrupted) {
				//因中断异常而终止,设置标志interrupted为true 
				Thread.currentThread().interrupt();
			}

			if (call.error != null) {
				if (call.error instanceof RemoteException) {
					call.error.fillInStackTrace();
					throw call.error;
				} else {
					/* local exception use the connection because it will
					 * reflect an ip change, unlike the remoteId
					 */
					throw wrapException(connection.getRemoteAddress(),
							call.error);
				}
			} else {
				return call.value; //返回结果数据  
			}
		}
	}

 

3.server端反射ipc接口方法

private class Handler extends Thread {
		public Handler(int instanceNumber) {
			this.setDaemon(true);
			this.setName("IPC Server handler " + instanceNumber + " on " + port);
		}

		@Override
		public void run() {
			LOG.info(getName() + ": starting");
			SERVER.set(Server.this);
			//创建大小为10240个字节的响应缓冲区
			ByteArrayOutputStream buf = new ByteArrayOutputStream(
					INITIAL_RESP_BUF_SIZE);
			while (running) {
				try {
					/** pop the queue; maybe blocked here */
					//获取一个远程调用请求 Server.Call
					final Call call = callQueue.take();       //弹出call,可能会阻塞 

					if (LOG.isDebugEnabled())
						LOG.debug(getName() + ": has #" + call.id + " from "
								+ call.connection);

					String errorClass = null;
					String error = null;
					Serializable value = null;
					
					/** process the current call. */
					//处理当前Call
					CurCall.set(call);
					try {
						//调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 
						value = call(call.connection.protocol, call.param,
								call.timestamp);  //Invocation对象
					} catch (Throwable e) {
						LOG.info(getName() + ", call " + call + ": error: " + e, e);
						errorClass = e.getClass().getName();
						error = e.getMessage();
					}
					
					CurCall.set(null);
					synchronized (call.connection.responseQueue) {
						/**
						 * setupResponse() needs to be sync'ed together with
						 * responder.doResponse() since setupResponse may use
						 * SASL to encrypt response data and SASL enforces its
						 * own message ordering.
						 */
						//将返回结果序列化到Call的成员变量response中
						setupResponse(
								buf,
								call,
								(error == null) ? Status.SUCCESS : Status.ERROR,
								value, errorClass, error);
						/* Discard the large buf and reset it back to smaller size to freeup heap*/
						//丢弃大的buf 重设到更小的容量 释放内存
						if (buf.size() > maxRespSize) {
							LOG.warn("Large response size " + buf.size()
									+ " for call " + call.toString());
							buf = new ByteArrayOutputStream(
									INITIAL_RESP_BUF_SIZE);
						}
						//给客户端响应请求
						responder.doRespond(call);//Responder在Server构造器初始化
					}
				} catch (InterruptedException e) {
					if (running) { // unexpected -- log it
						LOG.info(getName() + " caught: " + e);
					}
				} catch (Exception e) {
					LOG.info(getName() + " caught: " + e);
				}
			}
			LOG.info(getName() + ": exiting");
		}
	}

 

 

public Serializable call(Class<?> iface, Serializable param,
				long receivedTime) throws IOException {
			try {
				Invocation call = (Invocation) param; //调用参数 Invocationd对象包含方法名称 形式参数列表和实际参数列表
				if (verbose)
					log("Call: " + call);
				//从实例缓存中按照接口寻找实例对象
				Object instance = INSTANCE_CACHE.get(iface);
				if (instance == null)
					throw new IOException("interface `" + iface	+ "` not inscribe.");
				//通过Class对象获取Method对象
				Method method = iface.getMethod(call.getMethodName(),
						call.getParameterClasses());
				//取消Java语言访问权限检查
				method.setAccessible(true);

				long startTime = System.currentTimeMillis();
				//调用Method对象的invoke方法
				Object value = method.invoke(instance, call.getParameters());
				int processingTime = (int) (System.currentTimeMillis() - startTime);
				int qTime = (int) (startTime - receivedTime);
				if (LOG.isDebugEnabled()) {
					LOG.debug("Served: " + call.getMethodName()
							+ " queueTime= " + qTime + " procesingTime= "
							+ processingTime);
				}
				if (verbose)
					log("Return: " + value);

				call.setResult(value); //向Invocation对象设置结果
				return call;
			} catch (InvocationTargetException e) {
				Throwable target = e.getTargetException();
				if (target instanceof IOException) {
					throw (IOException) target;
				} else {
					IOException ioe = new IOException(target.toString());
					ioe.setStackTrace(target.getStackTrace());
					throw ioe;
				}
			} catch (Throwable e) {
				if (!(e instanceof IOException)) {
					LOG.error("Unexpected throwable object ", e);
				}
				IOException ioe = new IOException(e.toString());
				ioe.setStackTrace(e.getStackTrace());
				throw ioe;
			}
		}
	}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

java ipc 实例

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1.建立Activity和Service的IPC之前 在上一篇 Binder机制,从Java到C (1. IPC in Application Remot
转载请标注:张小燕:http://www.cnblogs.com/zhangxinyan 1. Application 中的 service 我们知道An
1.一次IPC通信過程的幾個步驟 一次通信过程简单的说有下面5个步骤,第一眼看上去,肯定不知道什么玩
基本类型实例 //1.定义一个一维数组,先声明,在分配空间 int []number;//生命,没有初始化,number=
基本类型实例 //1.定义一个一维数组,先声明,在分配空间 int []number;//生命,没有初始化,number=
《JAVA与模式》之责任链模式 在阎宏博士的《JAVA与模式》一书中开头是这样描述责任链(Chain of Res
继承是OOP的三大特点之一. 这一节主要做一下继承的笔记. 1. 继承的概念及使用 在Java中,通过继承可以
Java代码优化过程的实例介绍 通过笔者经历的一个项目实例,本文介绍了 Java 代码优化的过程,总结了
上一回,我们使用官方的介绍,完成了Highcharts的入门 1. 引入Highcharts 依赖的JS 2. 新建DIV容器
上一回,我们实现了从后台传递数据,在图表中展示,而图表的大部分配置都实在JS中控制的, 个人有个
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号