RabbitMQ消息队列学习笔记

摘要: 初次使用AMQP的过程中,总是容易被AMQP支持的消息模型绕晕,这里结合官方的教程,对AMQP的消息模型做一个简要总结,供参考。目前官方给出了六种消息发送/接收模型,这里主要介绍前五种消息模型。

概述

  初次使用AMQP的过程中,总是容易被AMQP支持的消息模型绕晕,这里结合官方的教程,对AMQP的消息模型做一个简要总结,供参考。目前官方给出了六种消息发送/接收模型,这里主要介绍前五种消息模型。

消息模型1、Hello World
简单模式就是生产者将消息发送到队列、消费者从队列中获取消息。一条消息对应一个消费者

工具类
[Java] 纯文本查看 复制代码
?

import AMQP.AliyunCredentialsProvider;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

public static Connection getConnection() throws Exception{

  // 初始化参数设置
  String AccessKey= "********";
  String SecretKey = "********";
  Long Uid = ********16617278L;
  String VhostName = "********";
  String host = "********16617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";

  // 定义连接工厂
  ConnectionFactory connectionFactory = new ConnectionFactory();
  // 设置服务地址
  connectionFactory.setHost(host);
  // 端口
  connectionFactory.setPort(5672);
  // 设置用户名、密码、vhost
  connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(AccessKey,SecretKey,Uid));
  connectionFactory.setAutomaticRecoveryEnabled(true);
  connectionFactory.setNetworkRecoveryInterval(5000);
  connectionFactory.setVirtualHost(VhostName);

  // 通过工厂获取连接对象
  Connection connection = connectionFactory.newConnection();
  return connection;

}
}
发送端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// hello world 单个消费者和接收者
public class Send {

private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception {
    // 获取连接及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(Queue_name,false,false,false,null);
    //消息内容
    String message = "Hello World!";
    // 1、交换机,此处无   2、发送到那个队列 3、属性  4、消息内容
    channel.basicPublish("",Queue_name,null,message.getBytes());

    System.out.println("发送数据:" + message);

    // 关闭连接
    channel.close();
    connection.close();
}

}

消费端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver {

private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception{
    Connection connection = ConnectionUtil.getConnection();
    final Channel channel =  connection.createChannel();
     
    // 开始消费消息
    channel.basicConsume(Queue_name, false, "ConsumerTag", new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            //接收到的消息,进行业务逻辑处理
            System.out.println("message receive: ");
            System.out.println("Received: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });
    Thread.sleep(100000);
    channel.close();
    connection.close();
}

}

2、Work Queues
一条消息可以被多个消费者尝试接收,最终只有一个消费者能够获取到消息。

发送端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 1:N 消费者各自接收消息
public class Sender {

private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(queueName,false,false,false,null);
    for (int i = 0; i < 100; i++) {

        String message = "workqueues message " + i;
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("发送消息: " + message);

        Thread.sleep(10);//休眠
    }
    // 关闭连接
    channel.close();
    connection.close();
}

}
消费端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver1 {

private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{

    Connection connection = ConnectionUtil.getConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(queueName,false,false,false,null);
    channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。

    DefaultConsumer consumer = new DefaultConsumer(channel){
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            //接收到的消息,进行业务逻辑处理
            System.out.println("message receive1: ");
            System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
        }
    };
    channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}

}
消费端示例代码2
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver2 {

private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{

    Connection connection = ConnectionUtil.getConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(queueName,false,false,false,null);
    channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。

    DefaultConsumer consumer = new DefaultConsumer(channel){
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            //接收到的消息,进行业务逻辑处理
            System.out.println("message receive2: ");
            System.out.println("Received2: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
        }
    };
    channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}

}
3、Routing
生产者将消息发送到type为direct模式的交换机,消费者的队列将自己绑定到路由的时候给自己绑定一个key,只有生产者发送的消息key和绑定的key一致时,消费者才能收到对应的消息。

发送端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {

private static final String ExchangeName = "Rout_Change";//路由消息交换机

public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(ExchangeName,"direct");
    channel.basicPublish(ExchangeName,"key3",null,"route 消息".getBytes());

    channel.close();
    connection.close();
}

}
消费端示例代码
[Java] 纯文本查看 复制代码
?

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub {

private static final String ExchangeName = "Rout_Change";//路由消息交换机
public static void main(String[] args) throws Exception{

    Connection connection = ConnectionUtil.getConnection();
    final Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare("testroutequeue1",false,false,false,null);

    // 绑定交换机
    // 参数3 标记 绑定到交换机的时候会有一个标记,只有和它一样标记的消息才会别消费到
    channel.queueBind("testroutequeue1",ExchangeName,"key1");
    channel.queueBind("testroutequeue1",ExchangeName,"key2");//绑定多个标记
    channel.basicQos(1);

    DefaultConsumer consumer = new DefaultConsumer(channel){
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            //接收到的消息,进行业务逻辑处理
            System.out.println("message route receive1: ");
            System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
        }
    };
    channel.basicConsume("testroutequeue1",false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}

}

摘自:https://yq.aliyun.com/article...

你可能感兴趣的