当前位置:首页 > 开发 > 编程语言 > 消息中间件 > 正文

编码实现MQ连接池实现JMS消息发送连接管理

发表于: 2011-12-29   作者:cuisuqiang   来源:转载   浏览次数:
摘要: 今天来说一下在使用到MQ时如果使用MQ的链接池。之前我也是没有注意到MQ也是有连接池的,后来因为系统之前实现每次创建和关闭链接消耗资源、宕机频繁,所以领导要求解决我才接触到。我在网上看到的关于JMS的讲解还挺多,但是对于MQ连接池的讲解时大家都是讲如何在spring中配置连接池。首先采用spring配置后原系统加密配置的密码就成明文了,另外如果要改成spring发送那改动就大了。如果在不使用
  1. 今天来说一下在使用到MQ时如果使用MQ的链接池。之前我也是没有注意到MQ也是有连接池的,后来因为系统之前实现每次创建和关闭链接消耗资源、宕机频繁,所以领导要求解决我才接触到。
    我在网上看到的关于JMS的讲解还挺多,但是对于MQ连接池的讲解时大家都是讲如何在spring中配置连接池。首先采用spring配置后原系统加密配置的密码就成明文了,另外如果要改成spring发送那改动就大了。如果在不使用spring,不大改动代码的情况下完成采用MQ连接池来获得链接呢?
    现在哥已经养成了好的习惯,每次发博客都会将源码和工程直接发布的,不会让你看了半天连个例子也没有的。
  2. 首先要创建工程加入JAR包,要加的包都是工程内,哥不多说了,这些包是必须的。至于activemq-pool-5.2.0.jar,你一定要找到加进去,你懂的。
    你还要到官方下载一个MQ服务器,我使用的是5.2,下载后运行起来以备使用。
  3.  第二步我们要实现如何获得链接的。这里我做了两个实现,用于更直观的了解MQ连接池。
    其中连接池工厂的活跃数我们设置为1,方便测试。这个活跃数实际上是会话的数量,而不是数据库连接池一样的链接数量。
    package com.mq.service;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.pool.PooledConnectionFactory;
    /**
     * 链接工厂管理类
     * 自己工厂定义成了单例模式,连接池是静态块进行初始化,具体实现自己看着办
     */
    public class MQPooledConnectionFactory {
    	private static ActiveMQConnectionFactory connectionFactory;
    	/**
    	 * 获得自己创建的链接工厂,这个工厂只初始化一次
    	 */
    	public static ActiveMQConnectionFactory getMyActiveMQConnectionFactory() {
    		if (null == connectionFactory) {
    			connectionFactory = new ActiveMQConnectionFactory("system","manage", "tcp://127.0.0.1:61616");
    		}
    		return connectionFactory;
    	}
    	private static PooledConnectionFactory pooledConnectionFactory;
    	static {
    		try {
    			// 需要创建一个链接工厂然后设置到连接池中
    			ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    			activeMQConnectionFactory.setUserName("system");
    			activeMQConnectionFactory.setPassword("manage");
    			activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    			// 如果将消息工厂作为属性设置则会有类型不匹配的错误,虽然Spring配置文件中是这么配置的,这里必须在初始化的时候设置进去
    			pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
    			// 链接最大活跃数,为了在测试中区别我们使用的到底是不是一个对象和看是否能控制连接数(实际上是会话数),我们在这里设置为1
    			int maximumActive = 1;
    			pooledConnectionFactory.setMaximumActive(maximumActive);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	/**
    	 * 获得链接池工厂
    	 */
    	public static PooledConnectionFactory getPooledConnectionFactory() {
    		return pooledConnectionFactory;
    	}
    	/**
    	 * 对象回收销毁时停止链接
    	 */
    	@Override
    	protected void finalize() throws Throwable {
    		pooledConnectionFactory.stop();
    		super.finalize();
    	}
    }
     这个类会提供连接池工厂和MQ链接工厂的创建和获得方式。特别要注意到的是,在初始化连接池时要设置一个链接工厂进去,而且只能在初始化时作为参数传递进去,如果你看过spring的配置你会以为要做为属性传递,而且他确实有这个属性。
  4. 然后我们创建发送消息的基础类,在该类中得到链接然后创建消息进行发送,关闭会话等一些列操作。为了防止变量干扰,这些操作是在一个方法里面完成的。
    package com.mq.service;
    import java.util.Map;
    import java.util.Set;
    import javax.jms.Connection;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQConnectionFactory;
    /**
     * 数据发送类,用于发送数据
     * 如果获得链接,请查看被注释的代码
     */
    public class MQ_Service {
    	// 发送消息
    	@SuppressWarnings("static-access")
    	public void send(Map<String, String> map) throws Exception {
    		ActiveMQConnectionFactory connectionFactory = null;
    		Connection connection = null;
    		Session session = null;
    		Queue queue = null;
    		MessageProducer producer = null;
    		MapMessage messagep = null;	
    		// ====================================
    		try {
    			// 获得我们自己初始化的链接工厂然后创建链接
    			connectionFactory = MQPooledConnectionFactory.getMyActiveMQConnectionFactory();
    			connection = connectionFactory.createConnection();
    			
    			// 链接直接从链接池工厂进行获得
    			//connection = MQPooledConnectionFactory.getPooledConnectionFactory().createConnection();
    			
    			session = connection.createSession(false, session.AUTO_ACKNOWLEDGE);			
    			queue = session.createQueue("CUI_JMS");
    			producer = session.createProducer(queue);
    			// 链接开始,如果我们使用的是连接池,那么即使你不开始,也是没有问题的
    			connection.start();			
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		// ====================================
    		messagep = session.createMapMessage();
    		Set<String> keySet = map.keySet();
    		for (String key : keySet) {
    			String value = map.get(key);
    			messagep.setBytes(key, value.getBytes("UTF-8"));
    			System.out.println(key + "-->" + value);
    		}		
    		producer.send(messagep);		
    		messagep.clearBody();
    		messagep.clearProperties();		
    		// ===================================
    		// 通过打印会话的内存地址和链接的客户端编号就可以知道我们使用的是不是同一个会话和链接
    		System.out.println(session.toString());
    		System.out.println(connection.getClientID());
    		// 无论使用的自己的工厂还是连接池的,都要将会话关闭
    		// 如果不关闭,在使用连接池的时可以看到效果,发送两次时只能发送一次,造成堵塞
    		session.close();
    		// 使用自己的工厂和连接池的区别是,运行后自己工厂链接调用关闭程序结束
    		// 而调用连接池链接进行关闭实际上没有关闭,因为连接池要维护这个链接
    		connection.close();
    		messagep = null;
    	}
    	private MQ_Service() {
    	}
    	// 发送对象每次创建一个,用以区别我们使用的对象
    	public static MQ_Service getInstance() {
    		return new MQ_Service();
    	}
    	// 对外开发发送消息方法
    	@SuppressWarnings("unchecked")
    	public synchronized void sendMessage(Map map)
    			throws Exception {
    		this.send(map);
    	}
    }
     取消注释内容,并注释掉获得我们自己初始化的链接工厂然后创建链接下面的两行代码,我们的链接就是从连接池获得了。
    你可以打印链接、会话等各个对象的内存地址来查看是否是同一个对象内容。
  5. 多次调用发送消息,查看是否能发送消息和是否能控制链接,当然准确来说是会话数
    package com.mq.service;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * 调用发送
     */
    public class MQ_Sender {
    	public static void main(String[] args) throws Exception {
    		// 循环调用,这里定义调用两次
    		for (int i = 0; i < 2; i++) {
    			MQ_Service sender = MQ_Service.getInstance();
    			Map<String, String> map = new HashMap<String, String>();
    			map.put("MESS_NUM", "112110119");
    			map.put("MESS_DEPT", "本部");
    			sender.sendMessage(map);
    			System.out.println("数据已经发送完毕!");
    		}
    	}
    }
     通过上面的例子,你就可以在不使用配置的情况下,在代码中也使用MQ连接池。并且更明了的俩接MQ连接池和数据库连接池的不同之处。

请您到ITEYE看我的原创:http://cuisuqiang.iteye.com

或支持我的个人博客,地址:http://www.javacui.com

 

编码实现MQ连接池实现JMS消息发送连接管理

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
Socket一般用于网络之间的通信,在这里,实现的是服务端与客户端的简单消息通信。 首先是客户端的搭
  异步进程通信是面向服务架构(SOA)一个重要的组成部分,因为企业里很多系统通信,特别是与外部组
在程序中大部分对象都不需要重复利用,每次用完直接丢弃就可以了,但有一种对象不能这样做---TCP con
转自:http://www.cnblogs.com/woodcutter/archive/2010/04/22/1718145.html 一般的数据库应用程序
Topic模式消息发送实例 1、pom引入 <dependency> <groupId>junit</groupId> <
MQ中将消息发送至远程队列的配置 摘自MQ资源管理器帮助文档V7 在开始学习本教程之前,您需要从系统
AMQP协议介绍 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个
使用jconsole查看Activemq情况,如下图: 图1.初始链接,一切正常 图2为发送过程中,出现链接断开并
消费者:接收消息 逻辑: 创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换
1.前言 数据库应用,在许多软件系统中经常用到,是开发中大型系统不可缺少的辅助。但如果对数据库资
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号