当前位置:首页 > 开发 > 编程语言 > 消息中间件 > 正文

jms p2p和PubSub模型总结

发表于: 2012-08-11   作者:eighthspace   来源:转载   浏览次数:
摘要: 首先先总结下jms规范下定义的实现接口:     ConnectionFactory 接口(连接工厂)   用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接

首先先总结下jms规范下定义的实现接口:

 

 

ConnectionFactory 接口(连接工厂)

  用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection 接口(连接)

  连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

Destination 接口(目标)

  目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

MessageConsumer 接口(消息消费者)

  由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

MessageProducer 接口(消息生产者)

  由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message 接口(消息)

  是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  消息头(必须):包含用于识别和为消息寻找路由的操作设置。

  一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

  一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

  消息接口非常灵活,并提供了许多方式来定制消息的内容。

Session 接口(会话)

 

  表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

 

p2p(point-to-point)模型:

 

其中包含几个比较重要的概念:消息队列(Queue),发送者(sender),接受者(receiver),每个消息被发送到规

 

定的消息队列中,并且该消息会被持久化。每个消息只能对应一个消费者,即使发送消息时,消费者没有在运行,仍然不影

 

响该消息被发送到消息队列,当消费者运行时,该消息就会被消费者接受到。下面以一个实例说明p2p模型的运行机制。

 

服务器端代码:

 

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Queue"), //指定使用的是queue
				@ActivationConfigProperty(propertyName="destination" , propertyValue="queue/LeadfarQueue")//指定具体使用哪个queue
		}
)
public class MdbBeanTest01 implements MessageListener {

	public void onMessage(Message msg) {
		// TODO Auto-generated method stub
		try {
			TextMessage tm = (TextMessage)msg; 
			String text =tm.getText();
			System.out.println("服务器接收到了信息:" +text);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
 

 

上面的“queue/LeadfarQueue”队列可以在jboss的配置文件中进行配置,具体路径如下:

 

JBOSS_HOME\server\default\deploy\messaging\destinations-service.xml

 

在此文件中添加:

 

 

 <mbean code="org.jboss.jms.server.destination.QueueService"
      name="jboss.messaging.destination:service=Queue,name=LeadfarQueue"
      xmbean-dd="xmdesc/Queue-xmbean.xml">
      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
      <depends>jboss.messaging:service=PostOffice</depends>
 </mbean>  

 

 

这样服务器端的测试代码就完成了。

 

客户端代码:

 

 

try {
			InitialContext context = new InitialContext();
			QueueConnectionFactory factory = (QueueConnectionFactory)context.lookup("ConnectionFactory");//创建ConnectionFactory

			QueueConnection connection = factory.createQueueConnection(); //创建Connection
			
			/*
			 * 第一个参数:true表示开启事务,最后要手动进行commit提交,false表示自动提交*/
			QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//创建会话

			Queue destination = (Queue)context.lookup("queue/LeadfarQueue");//创建Destination

			QueueSender sender = session.createSender(destination);//创建发送者

			TextMessage msg = session.createTextMessage("mdbBean , 你好");//创建文本消息

			sender.send(msg);//发送消息

			System.out.println("客户端消息发送完成");
			sender.close();
			session.close();
			connection.close();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
 

 

这样就可以进行测试,测试结果:

 

服务器端调试信息:19:39:56,234 INFO  [STDOUT] 服务器接收到了信息:mdbBean , 你好

 

验证p2p模式下消息的持久化的特性:

 

首先停止服务器端EJB的运行,再次运行客户端测试代码。此时,该消息仍然会发送到指定的消息队列,这时再次启动服务

 

器端的运行,消息仍然会发送到服务器端。说明,队列会将消息持久化。

 

PubSub模型

 

其中包含的几个概念:主题(Topic),发布者(Publisher),订阅者(Subscriber)。与p2p模型不同,p2p模型中消

 

息与接收者是一对一的关系,而在PubSub模型中,主题与订阅者是一对多的关系,也就是一个Topic会发送给多个订阅者

 

,但是主题并不会被持久化,也就是说如果在Topic发送时,订阅者程序没有运行,那么等到订阅者下次再次运行时,该

 

Topic也不会再发送给该订阅者,该Topic对该订阅者来说已经丢失。测试代码如下:

 

 

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),//指定使用的是Topic
				@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic")//指定使用哪个Topic
		}
)
public class TopicMdbBeanTest01 implements MessageListener {

	public void onMessage(Message msg) {
		// TODO Auto-generated method stub
		try {
			TextMessage tm = (TextMessage)msg;
			String text =tm.getText();
			System.out.println("服务器接收到了信息:" +text);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
 

 

“topic/LeadfarTopic”选项还是在上面的xml文件中进行指定,这个实例中,在服务器端定义了多个EJB,都是使用“

 

topic/LeadfarTopic”。

 

客户端代码:

 

try {
			InitialContext context = new InitialContext();
			TopicConnectionFactory factory = (TopicConnectionFactory)context.lookup("ConnectionFactory");
			TopicConnection connection = factory.createTopicConnection();
			TopicSession session = connection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			Topic destination = (Topic)context.lookup("topic/LeadfarTopic");
			TopicPublisher publisher = session.createPublisher(destination);
			TextMessage msg = session.createTextMessage("topicMdbBean , 你好");
			publisher.send(msg);
			System.out.println("客户端消息发送完成");
			publisher.close();
			session.close();
			connection.close();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
 

与上面的客户端代码类似,这个测试结果如下:

 

服务器端调试信息:

 

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

20:24:57,281 WARN  [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container

20:24:57,281 WARN  [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

 

可见,服务器端的三个EJB都收到了该Topic。

 

如果将服务器端程序停止运行,再运行客户端程序,再重新启动服务器端程序,也不会收到该Topic,也就是说Topic被丢

 

失了。也可以将Topic持久化,方法如下:在@MessageDriven的配置中增加几个配置选项:

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),
				@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic"),
				@ActivationConfigProperty(propertyName="SubscriptionDurability" ,propertyValue="Durable"),
				@ActivationConfigProperty(propertyName="subscriptionName" ,propertyValue="sn"),
				@ActivationConfigProperty(propertyName="clientId" , propertyValue="snClientId")
		}
)
 

经过这样的配置后,该Topic对于配置成这样的EJB来说就进行了持久化,这样,这个EJB在Topic发送后再运行也能接收到

 

。以上就是两种模型各自的特点。

jms p2p和PubSub模型总结

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1. JMS基本概念 JMS(Java Message Service)是访问企业消息系统的标准API,它便于消息系 统中的Java应
1.3 消息传送模型 Messaging Models JMS支持两类消息传送模型:点对点模型和发布/订阅模型。有时候,
此文章的题目来自于当下的两哥之争,本意有调侃之意,但是用在本文,却无此意,我以十分真诚并且后
1.1 JMS模型简介 JMS支持两种消息通信模型: 点对点模型(Point to Point,P2P) 发布者/订阅者模型(
前段时间看了点关于P2P技术的资料,现在简单的整理写进自己的博客,对于更深入的学习会在后续的博客
6 JMS
1 JMS概念 JMS是Java Message Service Java消息服务的简称,是Sun公司制定的J2EE规范之一,JMS是一个
7 JMS
1 JMS概念 JMS是Java Message Service Java消息服务的简称,是Sun公司制定的J2EE规范之一,JMS是一个
8 JMS
http://baike.baidu.com/view/157103.htm 百科名片 jms即Java消息服务(Java Message Service)应用
9 JMS
1 JMS概念 JMS是Java Message Service Java消息服务的简称,是Sun公司制定的J2EE规范之一,JMS是一个
10 JMS
1 JMS概念 JMS是Java Message Service Java消息服务的简称,是Sun公司制定的J2EE规范之一,JMS是一个
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号