RabbitMQ实战二(消峰限流)

Hello,我是一名在互联网捡破烂的程序员,最近破烂还挺好捡的,每天都是东逛逛西逛逛,收了很多的破烂呢。。。

收废铁了,十块一斤。快拿来卖哦,什么烂电冰箱,烂电视机,不管什么破烂我都要。。。

每天骑着我的烂三轮车,每天都是活的苟且偷生的,我好可怜。。。

呜呜呜呜

不管有钱木钱,都进来看一看瞧一瞧哦。。

好了~~~~

废话我们就不多说了,我最近在收废铁的时候收到一本武功秘籍,发现了新大陆,今天我就来和你们分享一下。

我们一起成为武林霸主,来吧。氛围搞起来。。。。。

首先呢,我们在上一期中对 RabbitMQ做了一个小小的实践,主要是对 RabbitMQ的异步特性进行了分析。

如果小伙伴还没有修炼的,赶紧去修炼去吧,我们要慢慢的成长起来,要慢慢的修炼武功秘籍,不是一天两天就可以练成的,让我们一起成为世界的主宰吧,嗯哈哈哈哈哈

RabbitMQ实践一(异步解耦)

如果你还是一个一个刚刚入门的小伙伴呢,那你的加紧你的步伐,赶快去从武功秘籍的第一页开始吧,你也不要慌张,我们都是从小白开始的,只不过你要多花一点时间来完成前面的修炼,终有一天你会超过的,哈哈哈
RabbitMQ基本使用一(简单介绍)/#more)

RabbitMQ基本使用二(简单队列)

RabbitMQ基本使用三(工作队列)

RabbitMQ基本使用四(发布/订阅模式)

RabbitMQ基本使用五(路由模式)

RabbitMQ基本使用六(主题模式)

RabbitMQ入门呢?就是上面的这么多啦。自己根据官网来理解的知识点,写的不是那么很好,不过很通俗易懂。代码有详细介绍。如有问题,请留言,多多包涵。

好吧,我们就废话不多说了,那就开始我们的表演吧!!!!

一、开篇前提

本文篇幅比较长,请耐心阅读,你会收获很多的。

今天给大家带来的是 RabbitMQ的消峰限流,我们知道现在互联网越来越强大,我们的系统也是越来越完善,在高峰期呢,系统将要承受巨大的压力,那么也是我们程序员的压力。像淘宝、京东大型网站购物系统每逢双十一,那就是程序员最忙的时候,当天要承受千万级别的流量冲击,不得不抵住压力啊。

想必大家都知道哦我要说什么了吧,没错,就是你想的那样,我们就是要对这千万级别的流量打交道。我们要抵住流量的冲击。让它能够缓解我们的系统的压力,系统压力小了,我们自身的压力就小了。OK

1. 场景介绍

  • 场景一

    我们知道在我们双十一中都会有秒杀的商品,我们所秒杀的商品价格都是非常低的,并且商品也是非常好,比如华为Mate30只要999,、苹果12只要99等等这些秒杀商品,不管是谁,看了都会心动。但是呢,商品的数量是有限的,不是每个用户都会抢到,全国14亿用户,就拿一半的人来抢,那个流量冲击是我们无法想像的。那要是全部流量打在我们的数据库中,那就只有说再见。。凉凉,最后还是我们程序员扛下了所有,那么我们应该怎么办呢?

  • 场景二

相信大多数人都抢过火车票吧,肯定有没有抢到票的吧。比如在我们的12306抢票网站,每次到一定的时间段都会有大量用户涌入抢票,可能我会遇到过服务器忙、或者加载失败等情况。那么在这么大的流量下,我们是怎么抗住的呢?

2. 问题描述

在我们面对瞬时流量的情况下,全部的流量都打在我们的数据库中,那是很难受的。

那么我们应该怎么来解决这种瞬时流量下的并发情况?

在我们秒杀中,库存只有一份,所有人会集中在时间读和写写这些数据,多人读取同一个数据

3. 优化方案

我们无论在抢票或者秒杀商品中时,为什么我没有抢到,别人却抢到了?

下面带你打开这扇门

  • 我们将请求尽量拦截在系统的上游(不要让锁冲突到数据上)。为什么那些传统的秒杀系统会挂,那是因为所有的请求都压到了数据层上,导致数据读取锁冲突,并发的响应数据慢,巨几乎都是所有请求超时。流量虽大,下单成功的有效流量去很少。如果我们秒杀商品库存有100件,那么有1000W人来抢,那么那时的瞬时流量很大,但是基本没有人秒杀成功,原因在于所有的流浪都打在我们的数据层上,到时响应速度慢,请求的有效率为0。那么这是一次很失败的秒杀活动。

    那用户不得吐槽啊,还不得上热搜啊,某某什么秒杀系统,垃圾的很,没人秒杀成功,都是请求超时,哎,溜了溜了

    那程序员就遭殃了啊,老板还不得直接暴扣到头上啊,那是很难受的,薪水都要大打折扣。

  • 充分利用缓存来实现这种瞬时流量大的情况,秒杀买票,这就是一个典型的读多写少的应用场景,大部分请求是车次的查询,票查询,下单和支付才是写请求。一趟火车其实只有2000张票,200W人来买,最多只有2000人下单成功,其他人都是查询库存,写比例只有0.1%,读的比例占99.9%,非常适合用缓存来优化。

这下知道为什么有时候抢不到商品了吧

这样的设计就是为了牺牲用户流量来换系统稳定

4. 架构

RabbitMQ实战二(消峰限流)_第1张图片
这个架构我是根据自己的理解来的,可能不怎么符合理念,知道大概意思就是了哈

OK,说了这么多,我们就开始我们的代码设计吧

二、实践操作

1. 项目开始阶段

下面的所有代码都是本人独立完成,可能不是那么完美,不过应该问题不大吧

如有错误之处,还请包涵,多多指出

这里我们加入我们的 Redis缓存技术,有时间会更新的。。

首先我们要有 RabbitMQ的客户端,可以在Linux上安装,也可以在Windows上安装。

我使用的是Linux上安装的,可以去参考 RabbitMQ在Linux下安装

1.1 创建SpringBoot项目

这个就不用多说了吧,这个都是我们的家常便饭了

1.2 导入依赖
 
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-web
            2.2.5.RELEASE
        
        
            org.springframework.boot
            spring-boot-starter-amqp
            2.2.4.RELEASE
        
        
            mysql
            mysql-connector-java
        
        
            org.springframework.boot
            spring-boot-starter-jdbc
        
        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.1.1
        
        
            log4j
            log4j
            1.2.17
        
        
            org.projectlombok
            lombok
        

主要是导入我们我们的 MQ依赖

 
            org.springframework.boot
            spring-boot-starter-amqp
            2.2.4.RELEASE
        
1.3 修改application.yml
server:
  port: 9000
spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test_rabbitmq?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
    username: root
    password: root
  rabbitmq:
    addresses: 192.168.2.2
    virtual-host: /
    username: guest
    password: guest
    template:
      exchange: ORDER.EXCHANGE
    listener:
     simple:
       #指定最小的消费者数量
       concurrency: 1
       retry:
         enabled: false #是否支持重试
       prefetch: 100
       acknowledge-mode: manual
logging:
  level:
    com.example.rabbitmq.mapper: debug
mybatis:
  mapper-locations: classpath:mapper/*.xml

主要配置的说明

1.3 数据库连接
spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test_rabbitmq?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
    username: root
    password: root

将用户名和密码修改为自己的

如果你的数据库是在Linux上,那么你需要将localhost修改为你的IP地址,并开启3306端口

1.4 RabbitMQ基本配置
  • 基本配置
 rabbitmq:
    addresses: 192.168.2.2
    virtual-host: /
    username: guest
    password: guest

注意,我这里没有列出格式

rabbitmq这是是在spring下的

address:这个是你的IP地址,同样的如果你的 RabbitMQ是在Linux上,使用Linux的IP地址。若是windows版本,直接使用localhost

virtual-host:这个是你rabbitmq上的一个虚拟主机

username:用户名,如果你没有添加用户:默认的是guest

password:密码,同样,如果你没有添加用户:默认是guest

具体说明请详见 RabbitMQ在Linux下安装

  • 默认交换机
 template:
      exchange: ORDER.EXCHANGE

这里我们可以定义默认的交换机名称,那么在代码中我们就需要设置这样的名称

  • 消费者监听配置
listener:
     simple:
       #指定最小的消费者数量
       concurrency: 1
       retry:
         enabled: false #是否支持重试
       prefetch: 100 #每次处理的消息
       acknowledge-mode: manual #手动确认

上面的注释已经很清楚明白了吧,具体需要配置自己可以自行配置。

1.5 日志打印配置(主要打印我们的SQL语句)
logging:
  level:
    com.example.rabbitmq.mapper: debug
1.6 mybatis配置文件位置
mybatis:
  mapper-locations: classpath:mapper/*.xml

2. 数据库(test_rabbitmq)

2.1 Order(订单表)
DROP TABLE IF EXISTS `test_order`;

CREATE TABLE `test_order` (
  `order_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单Id',
  `order_user_name` varchar(255) DEFAULT NULL COMMENT '订单人的名称',
  `order_user_email` varchar(255) DEFAULT NULL COMMENT '订单人的邮箱',
  `order_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '订单时间',
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB AUTO_INCREMEN
2.2 Googs(商品表)
DROP TABLE IF EXISTS `test_goods`;

CREATE TABLE `test_goods` (
  `goods_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品Id',
  `goods_name` varchar(255) DEFAULT NULL COMMENT '商品名称',
  `goods_stock` int(100) DEFAULT NULL COMMENT '商品库存',
  PRIMARY KEY (`goods_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
insert  into `test_goods`(`goods_id`,`goods_name`,`goods_stock`) values (1,'商品',0);

基本类的创建

3.1 数据响应类
ApiResponse
package com.example.rabbitmq.common;
import java.util.HashMap;

/**
 * 数据响应返回
 */
public class ApiResponse extends HashMap {
    /**
     * 状态码
     */
    private Integer code;
    /**
     * 信息
     */
    private String msg;

    @Override
    public Object put(String key, Object value) {

        super.put(key, value);

        return this;
    }

    public ApiResponse code(Integer code){

        this.put("code", code);

        return this;
    }

    public ApiResponse msg(String msg){

        this.put("msg",msg);

        return this;
    }
}

用户下单响应数据

抢单成功,返回给用户信息code=200,msg='抢单成功'

抢单失败,返回给用户信息code=400,msg='抢单失败,或者是请求超时'

3.2 订单类
Order
package com.example.rabbitmq.entity;

import lombok.Data;

import java.util.Date;

/**
 * 基本类
 */
@Data
public class TestOrder {

    /**
     * 订单Id
     */
    private Long orderId;

    /**
     * 订单人名称
     */
    private String orderUserName;

    /**
     * 订单人邮箱
     */
    private String orderUserEmail;

    /**
     * 订单时间
     */
    private Date orderDate;
}

这里我们使用的是一个注解@Data,这个注解来源于依赖lombok,要使用这个依赖不仅需要添加这个依赖,而且在IDEA中还要下载这个插件

我相信现在绝大多数人都是使用的IDEA编译器吧~~~

如果还有小伙伴不知道这个编译器或者没有使用的,那就赶快行动起来吧!!!

IDEA这款编译器是真的很友好

哇哦,扯到一边去了。。。

OK,我们回归正题

3.3 商品类
Goods
package com.example.rabbitmq.entity;

import lombok.Data;

/**
 * 商品
 */
@Data
public class TestGoods {

    /**
     * 商品ID
     */
    private Long goodsId;

    /**
     * 商品名称
     */
    private String goodsName;

    /**
     * 商品库存
     */
    private Integer goodsStock;
}

首先呢我们要模拟一个没有限流的场景

我们先要将goods表中的库存修改为100(自己自定义),这里我们就模拟只有100个库存的商品

我的习惯是从Controller层到持久层,这个因人而异吧。。没有什么强制要求

4. 无限流操作(正常操作)

4.1订单Controller层
OrderController
package com.example.rabbitmq.controller;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
 * 订单Controller
 */
@RestController
@RequestMapping("order")
public class OrderController {
    
    @Autowired(required = false)
    private OrderService orderService;

    private static Integer count = 0;

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
    /**
     * 无RabbitMQ创建订单
     * @return
     */
    @GetMapping("/save/{goodsId}")
    public ApiResponse save(@PathVariable("goodsId") Long goodsId){

        ApiResponse apiResponse = this.orderService.save(goodsId);

        LOGGER.info("流量请求:" + count++);

        return apiResponse;
    }
}

上面代码可能会报错,不过不要慌,心不乱,手不抖,我们跟着感觉走

那是因为我们有些类还没有创建

接下里就是我们的Service类创建

4.2 订单Service层
OrderService
package com.example.rabbitmq.service;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;

import java.util.List;

public interface OrderService {
    /**
     * 无RabbitMQ消峰限流
     * @return
     */
    ApiResponse save(Long goodsId);
}
4.3 订单Service实现类
OrderServiceImpl
package com.example.rabbitmq.service.impl;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.mapper.OrderMapper;
import com.example.rabbitmq.service.OrderService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/**
 * OrderService
 */
@Service
public class OrderServiceImpl implements OrderService {


    @Autowired(required = false)
    private AmqpTemplate amqpTemplate;
    
    @Autowired(required = false)
    private OrderMapper orderMapper;

    @Autowired(required = false)
    private GoodsMapper goodsMapper;


    /**
     * 无RabbitMQ消峰限流
     *
     * @return
     */
    @Override
    public ApiResponse save(Long goodsId) {

        if (goodsId == null){

            return new ApiResponse().code(400).msg("参数错误");
        }

        // 根据商品Id查询商品
        TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

        // 商品不存在或者商品库存为0
        if (testGoods == null || testGoods.getGoodsStock() <= 0){

            return new ApiResponse().code(400).msg("商品不存在或者库存为0");
        }

        // 直接添加
        // 创建订单
        TestOrder testOrder = new TestOrder();

        // 设置参数
        testOrder.setOrderUserEmail("111@qq.com");

        testOrder.setOrderUserName("FC");

        testOrder.setOrderDate(new Date());

        this.orderMapper.save(testOrder);

        // 更新库存
        this.goodsMapper.updateGoodsStock(goodsId);

        return new ApiResponse().code(200).msg("订单创建成功");
    }
}

我们来解释一下我们的正常逻辑

  • 判断参数是否有效
if (goodsId == null){
  return new ApiResponse().code(400).msg("参数错误");
}
  • 查询商品信息
// 根据商品Id查询商品
 TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

 // 商品不存在或者商品库存为0
 if (testGoods == null || testGoods.getGoodsStock() <= 0){

  return new ApiResponse().code(400).msg("商品不存在或者库存为0");
 }

先从数据库中查询商品信息,返回查询结果

判断是否为空或者是库存是否为0

  • 创建订单
// 直接添加
 // 创建订单
 TestOrder testOrder = new TestOrder();

 // 设置参数
 testOrder.setOrderUserEmail("111@qq.com");

 testOrder.setOrderUserName("FC");

 testOrder.setOrderDate(new Date());

this.orderMapper.save(testOrder);

上面条件都满足,那么我们可以下订单了

  • 更新库存
 // 更新库存
 this.goodsMapper.updateGoodsStock(goodsId);

当然呢我们下单成功了,当然要更新我们的库存呐。

那这就是我们下单的一般逻辑啦

这里肯定是有问题的啦,这样我们所有的请求都打在我们的数据库上了,那数据库肯定是罩不住的,那要崩啊

稍后会给出解决方案

4.4 商品Service层
GoodsService
package com.example.rabbitmq.service;

import com.example.rabbitmq.entity.TestGoods;

public interface GoodsService {

    /**
     * 根据商品Id查询商品库存
     * @param goodsId
     * @return
     */
    TestGoods selectGoodsById(Long goodsId);

    /**
     * 更新库存
     * @param goodsId
     */
    void updateGoodsStock(Long goodsId);
}
4.5 商品Service实现类
GoodsServiceImpl
package com.example.rabbitmq.service.impl;

import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 商品Service
 */
@Service
public class GoodsServiceImpl implements GoodsService {
    @Autowired(required = false)
    private GoodsMapper goodsMapper;
    /**
     * 根据商品Id查询商品
     * @param goodsId
     * @return
     */
    @Override
    public TestGoods selectGoodsById(Long goodsId) {

        if (goodsId == null){

            return null;
        }

       TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);
        
        return testGoods;
    }

    /**
     * 更新库存
     *
     * @param goodsId
     */
    @Override
    public void updateGoodsStock(Long goodsId) {

        if (goodsId == null){

            return;
        }

        try {

            this.goodsMapper.updateGoodsStock(goodsId);

            return;

        }catch (Exception e) {

            return;
        }
    }
}
4.6 商品Mapper层
GoodsMapper
package com.example.rabbitmq.mapper;

import com.example.rabbitmq.entity.TestGoods;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

@Mapper
@Repository
public interface GoodsMapper {

    /**
     * 根据商品Id查询商品
     * @param goodsId
     * @return
     */
    TestGoods selectStockById(Long goodsId);

    /**
     * 更新库存
     * @param goodsId
     */
    void updateGoodsStock(Long goodsId);
}
4.7商品Mapper的xml文件
GoodsMapper.xml





    
    

        

        

        

    


    
    

    
    
        
        update test_goods set goods_stock  = goods_stock - 1  where goods_id = #{goodsId} ;
        
    

4.8 订单Mapper层
OrderMapper
package com.example.rabbitmq.mapper;

import com.example.rabbitmq.entity.TestOrder;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Repository;

/**
 *
 */
@Mapper
@Repository
public interface OrderMapper {
    /**
     * 无RabbitMQ消峰限流保存订单
     */
    void save(TestOrder testOrder);
}
4.9 订单Mapper的xml文件
OrderMapper.xml





    
    

        

        

        
        
        
        
    
    

        insert into test_order (order_user_name, order_user_email, order_date) values (

        #{orderUserName},

        #{orderUserEmail},

        #{orderDate}
        );
    

OK,这里我们基本操作就完成了,这些都是我们的正常操作

RabbitMQ实战二(消峰限流)_第2张图片

那么我们启动项目来测试一下

4.10 验证

这里我们先用postman来验证我们的接口

RabbitMQ实战二(消峰限流)_第3张图片

我们可以在控制台看见

RabbitMQ实战二(消峰限流)_第4张图片

我们的全部请求都到了数据库上,我们这里只是发起了两次请求,数据库肯定不会啊,数据库应该每秒能够撑起2000次请求吧,那么要是超过了2000,那就要出现问题了呀。。。

我们来看数据库中的数据

库存信息

RabbitMQ实战二(消峰限流)_第5张图片

订单信息

RabbitMQ实战二(消峰限流)_第6张图片

这样我们就可以轻松的创建订单了,哇哈哈哈哈

但是事实不撩人啊,咋个这么轻松哦,,那是不可能的

那么我们就需要压力测试来啦,这里我们使用的是jmeter

RabbitMQ实战二(消峰限流)_第7张图片

这里我们创建了4000个线程来请求我们接口

RabbitMQ实战二(消峰限流)_第8张图片

这样看我们的数据库压力还大不大,哼

那就来瞧一瞧我们数据库怎么来解决吧

RabbitMQ实战二(消峰限流)_第9张图片

妈呀,这是什么哦,怎么可以乱来哦。

这里我们看见,虽然有查询,有添加,还有更新库存的操作

但是呢,别个还在执行的时候,我还在下订单的时候,另外一个就在更新库存,Are You Sure?

这。。。。

这我看不下去了

那么我们再来看看绝望的时刻

RabbitMQ实战二(消峰限流)_第10张图片

哇哦,这就很尴尬了,这样下去不得了啊,我们不得亏死啊,尽然还超卖了。。。。

为什么会出现这样的情况呢?

当我们的请求流量瞬时就来了,而且一般还是同一时间来的,这样我们的全部流量就打到我们的数据库上,

当我们一个线程还在查询这一步,查询出来哦,还有1个库存,可以下订单,那么另外一个线程也来了,

查询出来,哦,我也还有1个库存,可以下订单。

这样两个线程都去更新库存信息,这样就会出现超卖的情况那。

在我们秒杀活动中,如果这样去实现,那不得亏死哦。

那么我们应该怎么办呢?左想想,右想想

哦,我们不是学过消息队列吗?我们可以用这个来进行优化啊。。。

消息队列中不是可以消峰限流吗?

鼠标往上一划,哇哦,原来写了这么多啦,那我们就先这样结束吗?

不会,我们在下一节会继续讲到的,我们下期再见啦

拜拜

你可能感兴趣的