当前位置:首页 > 开发 > 互联网 > 正文

activeMQ完整的demo,值得你拥有

发表于: 2015-04-24   作者:bigSeven   来源:转载   浏览次数:
摘要:                                       最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ

               

                      最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ是最流行的功能强大的开源即时通讯和集成模式的服务器。Apache ActiveMQ的速度快,支持多语言和跨客户协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。废话不多说,直接上demo了。

===============================     action       start  =========================

 一:必备jar包

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-all</artifactId>

    <version>5.9.0</version>

</dependency>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-pool</artifactId>

<version>5.9.0</version>

</dependency>

二:服务端

            服务端包含4部分:

                    a.消息转换器

                    b.消息发布者

                    c.消息发送

                    d.activemq.xml配置

 a.消息转换器

package cn.innosoft.jt809.activemq.converter;

 

import java.math.BigDecimal;

import java.text.ParseException;

import java.text.SimpleDateFormat;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

 

import cn.innosoft.jt809.biz.dynamic.model.TGnssGpsHis;

/**

 * jms消息转换器

 * @author gaoq

 * @date 2015-3-27 下午2:57:23

 */

public class MsgConverter implements MessageConverter{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Override

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

 if (!(object instanceof GpsVehicleReceive)) {  

           throw new MessageConversionException("obj is not MsgPojo");  

       }  

  GpsVehicleReceive msgPojo = (GpsVehicleReceive) object;  

       TextMessage textMessage = session.createTextMessage();  

       String msg = getGpsMsgString(msgPojo);

       textMessage.setText(msg);  

       return  textMessage;  

}

 

@Override

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

if (!(message instanceof TextMessage)) {  

           throw new MessageConversionException("Message is not TextMessage");  

       }  

       TextMessage textMessage = (TextMessage) message;  

       TGnssGpsHis msg = new TGnssGpsHis();  

       String[] texts=textMessage.getText().split(",");  

         msg.setX(1);

} catch (ParseException e) {

e.printStackTrace();

}

       return msg;  

}

/**

* 获取对象转换后的字符串.

* @param msgPojo TGnssGpsHis

* @return String

*/

private String getGpsMsgString(GpsVehicleReceive msgPojo) {

 

String msg = msgPojo.getX()+","+msgPojo.getY();

return msg;

}

 

}

b.消息发布者

package cn.innosoft.jt809.activemq.service;

import javax.jms.Destination;

import org.springframework.jms.core.JmsTemplate;

import cn.innosoft.jt809.activemq.converter.GpsVehicleReceive;

/**

 * 消息发布者.

 * @author gaoq

 * @date 2015-3-19 上午11:12:00

 */

public class TopicPublisherService {

JmsTemplate jmsTemplate;

Destination destination;

//  public void send(final String msg) {  

//         MessageCreator messageCreator = new MessageCreator() {  

//             public Message createMessage(Session session) throws JMSException {  

//                 TextMessage message = session.createTextMessage();  

//                 message.setText(msg);  

//                 return message;  

//             }  

//         };  

//         jmsTemplate.send(this.destination, messageCreator);

//     }   

public void convertAndSend(GpsVehicleReceive msgPojo){  

        jmsTemplate.convertAndSend(this.destination, msgPojo);  

    }  

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

public void setDestination(Destination destination) {

this.destination = destination;

}

 

}

 c.消息发送(在应用类中,获取数据并发送)

private static void sendMsg(TGnssGpsHis gpsHis){

GpsVehicleReceive r = new GpsVehicleReceive();

r.setX(1);

r.setY(2); 

topicPublisherService.convertAndSend(r);//发送

r = null;

 

}

d.activemq.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

 

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

   <property name="connectionFactory">  

      <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

          <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

      </bean>  

  </property>  

</bean>  

 

    <!-- 发送消息的目的地(主题) -->  

    <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>

    

    <bean id="msgConverter" class="cn.innosoft.jt809.activemq.converter.MsgConverter"></bean>

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

<!-- 转换器 -->

        <property name="messageConverter" ref="msgConverter"></property>

    <!--<property name="receiveTimeout" value="10000" />  -->  

    </bean>  

    

    <bean id="topicPublisherService" class="cn.innosoft.jt809.activemq.service.TopicPublisherService">  

       <property name="jmsTemplate" ref="jmsTopicTemplate"/>  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />

    </bean>

</beans>

 三:客户端使用

                客户端使用包含2部分:

                     a.监听类

                     b.activeMq.xml

 a.监听类

package cn.innosoft.exceptionAnalyse.activemq.service;

 

import java.text.SimpleDateFormat;

 

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

 

import cn.innosoft.exceptionAnalyse.biz.AnalyseMsgMng;

import cn.innosoft.exceptionAnalyse.biz.cache.model.GpsVehicleReceive;

 

 

/**

 * 经度异常接收数据监听类.

 * @author gaoq

 * @date 2015-3-27 下午4:32:30

 */

public class TopicColorExceptionService implements MessageListener{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public void onMessage(Message message) {  

       if(message instanceof TextMessage){  

        TextMessage textMessage = (TextMessage) message; 

           try {  

            GpsVehicleReceive msg = bindProperties(textMessage);

               // AnalyseMsgMng.getInstance().colorQueue.put(msg);

           } catch (Exception e) {  

               e.printStackTrace();  

           }  

       }  

   }

 

/**

 * 绑定参数到对象上.

 * @param textMessage 消息.

 * @return GpsVehicleReceive

 */

private GpsVehicleReceive bindProperties(TextMessage textMessage) throws Exception{

GpsVehicleReceive msg = new GpsVehicleReceive();  

String[] texts=textMessage.getText().split(",");  

    msg.setX1(texts[0]);

              msg.setX2(texts[1]);

                    msg.setX3(texts[2]);

return msg;

 

 

}

 b.activeMq.xml

    <?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

 

 

<bean id="connectionFactory"  

        class="org.apache.activemq.ActiveMQConnectionFactory">  

        <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

    </bean>  

      

    <!-- 发送消息的目的地(主题) -->  

     <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>  

    

<!--     <bean id="msgConverter" class="cn.innosoft.freightAnalyse.activemq.converter.MsgConverter"></bean> -->

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

    </bean>  

    

 

    <bean id="topicColorException" class="cn.innosoft.exceptionAnalyse.activemq.service.TopicColorExceptionService"></bean> 

    <bean id="myMsgTopiclistenerContainerColor"  

        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />  

        <property name="messageListener" ref="topicColorException" />  

        <property name="pubSubDomain" value="true" />  

    </bean> 

</beans>   

=============================    action   end           ==========================

综上所述:一个完整的activeMQ的使用方式介绍完了。

接下来干啥呢。。。。。

想起来了。。。。activeMQ的服务还要启动,那么如何启动activeMQ的服务呢?

https://repository.apache.org/content/repositories/releases/org/apache/activemq/

下载了activeMQ之后

apache-activemq-5.9.0-bin.zip

                      解压后会发现:apache-activemq-5.9.0\bin目录下有win32,win64,那这个时候就要看服务器是什么系统了,我的是64位,所以就进去apache-activemq-5.9.0\bin\win64目录双击activemq.bat服务如果启动无异常就好了,然后在浏览器中访问:http://localhost:8161/admin,用户名:admin,密码:admin (这个应该是固定的,如果想改,也是可以得到自己去配置文件中找到位置改掉就ok了)

 

大功告成。。。。。。。。。。。。。。。

 

 

 

 

activeMQ完整的demo,值得你拥有

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
2015年的夏天,虽然来得不算火热,但是在互联网技术的夏天,比任何一年都更为火热。 刚刚才结束了 5
好久没逛园子了,送上一份薄礼 软件背景 大家平时工作过程中一些重复的过程完全可以程序化,容易忘记
在PHP程序员应该知道的15个库(上)一文中,小编为大家介绍了Mink、Geocoder、Ratchet等8个有用的PH
Live View (iOS): 一个非常棒的的图形设计和原型设计工具,并且允许用户远程屏幕视图。 What the F
引导语:为了保持敏锐,锻炼你的大脑是很重要的。我们思考得越少,它就会变得越迟钝。我们更关心我们
一。场景回顾: 写这篇文章的主要原因是这些天,因为客户那边的需求,而更改的一个需求。在这过程中
【阿里云产品公测】高大上的搜索服务OpenSearch, 你值得拥有! 作者:阿里云用户trcher 一、前言:
【http://www.cnblogs.com/Darren_code/archive/2011/09/28/2179055.html】 最近换了新工作,搬了新
原文地址:http://www.cnblogs.com/Darren_code/archive/2011/09/28/2179055.html 写这篇文章的目的
最近换了新工作,搬了新家,换了新室友,一切都在重新开始。因为家里网还没装好,工作之余上网都得
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号