RocketMQ源码解析(三)——HA机制之读写分离

参考:

RocketMQ HA机制(主从同步) (qq.com)

RocketMQ HA机制 - 知乎 (zhihu.com)

RocketMQ Producer 发送消息时如何选择broker_想着你就无比幸福的博客-CSDN博客

源码版本:RocketMQ 4.9.3

HA(High Available) 即高可用,HA机制的目的是为了消除单点故障(SPOF),提高消息的高可用性。针对这个,主从备份实现高可用的 RocketMQ 中引入了读写分离机制

而在RocketMQ 中默认是 Producer 只能往 Master 写消息, Consumer 可以从 Master 或 Slave 读消息。

我们来看看源码验证一下。


文章目录

  • 一、 Producer 往 Master 写消息
  • 二、Consumer 可以从 Master 和 Slave 读消息
    • 1. suggestPullingFromSlave 是什么?怎么获得?
    • 2. suggestPullingFromSlave 如何参与读写分离逻辑?
  • 三、总结


一、 Producer 往 Master 写消息

producer在发送消息的时候,会选择一个messageQueue,然后会将消息发送到messageQueue所在的broker。

org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl#sendDefaultImpl

// MessageQueue mq上次选择的BrokerName, 是null就设为null,不是null就直接用之前的BrokerName就行
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择一个MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
    mq = mqSelected;
    brokersSent[times] = mq.getBrokerName();
    try {
        ......
        // 发送消息
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
        ......
    }
    ......
}

selectOneMessageQueue是怎么选择MessageQueue的呢?

org/apache/rocketmq/client/latency/MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // Broker故障延迟机制 sendLatencyFaultEnable 默认是false
    if (this.sendLatencyFaultEnable) {
        ......
    }
    // 所以我们看这个
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

org/apache/rocketmq/client/impl/producer/TopicPublishInfo#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 第一次选择队列
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 遍历消息队列集合
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            // sendWhichQueue自增
            int index = this.sendWhichQueue.incrementAndGet();
            // 对队列大小取模
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            // 规避上次Broker队列
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // 如果以上情况都不满足,返回sendWhichQueue自增后取模的队列
        return selectOneMessageQueue();
    }
}

org/apache/rocketmq/client/impl/producer/TopicPublishInfo#selectOneMessageQueue

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

现在已经选择了一个MessageQueue,然后是发送消息。调用:

org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl#sendKernelImpl

// 获得broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
// 没有找到,从NameServer更新broker地址
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

org/apache/rocketmq/client/impl/factory/MQClientInstance#findBrokerAddressInPublish

public String findBrokerAddressInPublish(final String brokerName) {
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
    new ConcurrentHashMap<String, HashMap<Long, String>>();

根据 brokerName 从 brokerAddrTable 中查询对应的 brokerId 和 address,如果查到的话消息直接发送到这个MessageQueue 对应的 Master ,否则返回 null。

到这里,可以看出 Producer 是只往 Master 写消息的了。

二、Consumer 可以从 Master 和 Slave 读消息

Consumer 在发起拉取请求时,根据findBrokerAddressInSubscribe()函数获得待拉取消息的Broker。

org/apache/rocketmq/client/impl/factory/MQClientInstance#findBrokerAddressInSubscribe

public FindBrokerResult findBrokerAddressInSubscribe(
    final String brokerName,
    final long brokerId,
    final boolean onlyThisBroker
) {
    String brokerAddr = null;
    boolean slave = false;
    boolean found = false;

    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        // 查找broker地址
        brokerAddr = map.get(brokerId);
        // 要查的broker是不是slave
        slave = brokerId != MixAll.MASTER_ID;
        // 是否查找到了
        found = brokerAddr != null;

        // 没查找到 且 要求为slave
        if (!found && slave) {
            brokerAddr = map.get(brokerId + 1);
            found = brokerAddr != null;
        }

        // 没查找到 且 不要求只能是brokerId对应的broker
        // 实现主从切换的关键
        // 当Master宕机时, 从剩下可用的Broker按顺序读取一个,map的HashCode是有序的, 默认获取到的是第一个,也就是 brokerId = 1 的Slave
        if (!found && !onlyThisBroker) {
            Entry<Long, String> entry = map.entrySet().iterator().next();
            brokerAddr = entry.getValue();
            slave = entry.getKey() != MixAll.MASTER_ID;
            found = true;
        }
    }

    if (found) {
        return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    }

    return null;
}

这里可以看出 Consumer 是能选择从 Master 或 Slave 拉取消息的。

那么问题来了 Master 、Slave 都在运行时,Consumer 到底是从 Master 还是 Slave 拉取消息?

这里先说结论, RokcetMQ 在默认情况下会优先选择从 Master 拉取消息。那到底什么时候是从 Slave 拉取消息?这就得说到一个属性suggestPullingFromSlave ,这又是什么东西?我们从源码来看看!

1. suggestPullingFromSlave 是什么?怎么获得?

在消息拉取流程中,在 Consumer 发起拉取请求后,Broker负责响应这个请求,组装的响应消息中包含了下一次是否要从 Slave 拉取数据的建议的属性suggestPullingFromSlave,Consumer 收到了这个建议后,会找到合适的 Broker 节点进行拉取,也就是说是根据suggestPullingFromSlave来决定 Consumer 下次是从 Master 还是 Slave 拉取消息的。

那这个建议属性是怎么得到的呢?

org/apache/rocketmq/store/DefaultMessageStore#getMessage

// @1
long diff = maxOffsetPy - maxPhyOffsetPulling;
// @2
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// @3
getResult.setSuggestPullingFromSlave(diff > memory);

代码@1:获取目前未处理的消息总大小

  • maxOffsetPy:代表当前 Master 消息存储文件 commitLog 的最大偏移量。
  • maxPhyOffsetPulling:本次拉取消息最大偏移量。
  • diff:目前未处理的消息总大小

代码@2:获取设定的存储在内存中的消息总大小的阙值(物理内存大小 * 0.4)

  • StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE:当前系统的总物理内存。
  • accessMessageInMemoryMaxRatio:设置的存储消息在内存中的阙值比例,默认为40。

代码@3:设置下次是否从 Slave 拉取消息的建议属性。如果 目前未处理的消息总大小 > 设定的存储在内存中的消息总大小的阙值,则建议下次从 Slave 拉取消息,反之建议下次从 Master 拉取消息。

suggestPullingFromSlave属性是如何使用呢?

2. suggestPullingFromSlave 如何参与读写分离逻辑?

决定 Consumer 下次是从 Master 还是 Slave 拉取消息的逻辑,是在 Broker 处理 Consumer 发起拉取请求的时候进行的,也就是PullMessageProcessor#processRequest中。

源码分析如下:

org/apache/rocketmq/broker/processor/PullMessageProcessor#processRequest

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    ......
        
    // 调用MessageStore.getMessage获得消息(就是刚才那个函数)
    final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), 																		 requestHeader.getTopic(),
                                                              requestHeader.getQueueId(), 
                                                              requestHeader.getQueueOffset(), 
                                                              requestHeader.getMaxMsgNums(), 
                                                              messageFilter);
    if (getMessageResult != null) {
        response.setRemark(getMessageResult.getStatus().name());
        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
        responseHeader.setMinOffset(getMessageResult.getMinOffset());
        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

        // 如果获得的消息中建议下次从slave拉取消息,
        if (getMessageResult.isSuggestPullingFromSlave()) {
            // 则brokerId设为订阅组建议的当消息消费缓慢时拉取的brokerId。
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
        } else {
            // 否则下次从master拉取消息
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
	    
        switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                break;
            case SLAVE:
                // 如果当前Broker的角色为slave但是slaveReadEnable=false(从不允许读),则忽略建议设置,下次从master拉取消息
                if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
                break;
        }

        if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
            // consume too slow ,redirect to another machine
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 如果slaveReadEnable=true(从允许读),并且建议从slave读取,
                // 则brokerId设为订阅组建议的当消息消费缓慢时拉取的brokerId。
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            }
            // consume ok
            else {
                // 如果消息消费速度正常,则使用订阅组建议的brokerId拉取消息进行消费,
                // 默认为主服务器。
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
            }
        } else {
            // 如果不允许从可读,则固定使用从主拉取。
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
        ......
    } 
    ......
    return response;
}

上面设置的建议 Consumer 下次拉取消息时使用的 brokerId 被封装进响应消息的消息头,Consumer 收到后会根据这个 brokerId 从指定的 Broker 拉取消息。

Consumer 在处理拉取结果时会将 Broker 建议的 brokerId 更新到 Broker 拉取缓存表中。

org/apache/rocketmq/client/impl/consumer/PullAPIWrapper#processPullResult

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;

    // 将Broker建议的brokerId更新到broker拉取缓存表中
    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    ......
    return pullResult;
}

org/apache/rocketmq/client/impl/consumer/PullAPIWrapper#updatePullFromWhichNode

public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (null == suggest) {
        this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
    } else {
        suggest.set(brokerId);
    }
}

org/apache/rocketmq/client/impl/consumer/PullAPIWrapper

private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
    new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

这样在 Consumer 下次发起拉取请求时,根据recalculatePullFromWhichNode()函数获得待拉取消息的Broker。

org/apache/rocketmq/client/impl/consumer/PullAPIWrapper#pullKernelImpl

FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}

org/apache/rocketmq/client/impl/consumer/PullAPIWrapper#recalculatePullFromWhichNode

public long recalculatePullFromWhichNode(final MessageQueue mq) {
    // 若开启默认Broker开关,则返回Master的编号 : 0l
    if (this.isConnectBrokerByUser()) {
        return this.defaultBrokerId;
    }

    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (suggest != null) {
        return suggest.get();
    }

    return MixAll.MASTER_ID;
}

这样,Consumer 下次是从 Master 还是 Slave 拉取消息的就决定好了。

三、总结

在RocketMQ 中默认是 Producer 只能往 Master 写消息, Consumer 可以从 Master 或 Slave 读消息。

Master 、Slave 都在运行时,默认情况下,Consumer 从 Master 拉取消息,当 Master 积压的消息超过了物理内存的40%时,则建议从 Slave 拉取。但是如果 slaveReadEnable 为 false,即从不可读,Consumer 依然还是从 Master 拉取消息。

你可能感兴趣的