ActiveMQ

ActiveMQ: 本质的好处就是解决异步处理消息的问题。

  • toptic模式
    消息为广播模式,每一个监听到的队列都可以收到消息。
@JmsListener(destination = "topic.receivemsg", containerFactory = "jmsListenerContainerMsgTopic", concurrency = "5")
  • queue模式
    消息为竞争消费模式,只有竞争到消息的队列可以消费,即使多个监听者监听同一个队列,最终也只有一个消费者可以消费。
// 配置
@JmsListener(destination = "${lly.queue.sendMsgInfo}" + "${spring.profiles.active}", containerFactory = "jmsListenerContainerQueue", concurrency = "5")
package com.xes.ops.insight.plus.mp;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.TimeUnit;

public class Sender {
    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "system",
                "manager",
                "tcp://203.195.176.21:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("first");
        MessageProducer messageProducer = session.createProducer(destination);
        for (int i = 0; i < 100; ++i) {
            TextMessage textMessage = session.createTextMessage("这是消息:" + i);
            messageProducer.send(destination, textMessage);
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("ok");
    }
}

import javax.jms.*;
import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "system",
                "manager",
                "tcp://203.195.176.21:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("first");
        MessageConsumer consumer = session.createConsumer(destination);
        for (int i = 0; i < 100; ++i) {
            Message message = consumer.receive();
            System.out.println(message);
            TimeUnit.SECONDS.sleep(1);
        }
    }
}
  • 消息的消费者接收消息可以采用两种方式
  1. consumer.receive() 或 consumer.receive(int timeout);
  2. 注册一个MessageListener.
    采用第一种方式,消息的接收者会一直等待下去,知道有消息到达或者超时。后一种方式会注册一个监听器,
    当有消息到达的时候,会调用它的onMessage()方法。以下举例说明:
MessageConsumer consumer=session.createConsumer(queue);
consumer.setMessageListener(new MessageListener(){
public void onMessage(Message msg)
{
System.out.println("接收到的消息为+"((TextMessage)msg).getText());
}
})
  • Java jms连接ActiveMQ密码问题
    参考
    1 JAVA JMS 连接 ActiveMQ,帐号密码错误都可以登录的原因以及解决方法
    2 Configuring advanced settings for ActiveMQ
    3 ActiveMQ权限配置

你可能感兴趣的