聊聊carrera的RocketMQProduceOffsetFetcher

本文主要研究一下carrera的RocketMQProduceOffsetFetcher

RocketMQProduceOffsetFetcher

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java

public class RocketMQProduceOffsetFetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class);

    private DefaultMQAdminExt defaultMQAdminExt;

    private DefaultMQPullConsumer defaultMQPullConsumer;

    private String namesrvAddr;

    public RocketMQProduceOffsetFetcher(String namesrvAddr) {
        this.defaultMQAdminExt = new DefaultMQAdminExt();
        defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
        defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));

        this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
        defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
        defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
        this.namesrvAddr = namesrvAddr;
    }

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void start() throws MQClientException {
        defaultMQAdminExt.start();
        defaultMQPullConsumer.start();
        defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
    }

    public void shutdown() {
        defaultMQAdminExt.shutdown();
        defaultMQPullConsumer.shutdown();
    }

    public ConsumeStats getConsumeStats(String group, String topic) throws Exception {
        return defaultMQAdminExt.examineConsumeStats(group, topic);
    }

    public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        return defaultMQAdminExt.examineTopicStats(topic);
    }

    public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
        return defaultMQPullConsumer.pull(mq, "*", offset, 1);
    }
}
  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

DefaultMQAdminExt

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java

public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
    private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
    private String adminExtGroup = "admin_ext_group";
    private String createTopicKey = MixAll.DEFAULT_TOPIC;
    private long timeoutMillis = 5000;

    //......

    @Override
    public ConsumeStats examineConsumeStats(String consumerGroup,
        String topic) throws RemotingException, MQClientException,
        InterruptedException, MQBrokerException {
        return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
    }

    @Override
    public TopicStatsTable examineTopicStats(
        String topic) throws RemotingException, MQClientException, InterruptedException,
        MQBrokerException {
        return defaultMQAdminExtImpl.examineTopicStats(topic);
    }

    //......
}
  • examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl

DefaultMQAdminExtImpl

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
    private final Logger log = ClientLogger.getLog();
    private final DefaultMQAdminExt defaultMQAdminExt;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mqClientInstance;
    private RPCHook rpcHook;
    private long timeoutMillis = 20000;
    private Random random = new Random();

    //......

    @Override
    public ConsumeStats examineConsumeStats(String consumerGroup,
        String topic) throws RemotingException, MQClientException,
        InterruptedException, MQBrokerException {
        String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic;
        TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic);
        ConsumeStats result = new ConsumeStats();

        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
            String addr = bd.selectBrokerAddr();
            if (addr != null) {
                ConsumeStats consumeStats =
                    this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
                result.getOffsetTable().putAll(consumeStats.getOffsetTable());
                double value = result.getConsumeTps() + consumeStats.getConsumeTps();
                result.setConsumeTps(value);
            }
        }

        if (result.getOffsetTable().isEmpty()) {
            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
                "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
        }

        return result;
    }

    @Override
    public TopicStatsTable examineTopicStats(
        String topic) throws RemotingException, MQClientException, InterruptedException,
        MQBrokerException {
        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
        TopicStatsTable topicStatsTable = new TopicStatsTable();

        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
            String addr = bd.selectBrokerAddr();
            if (addr != null) {
                TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
                topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
            }
        }

        if (topicStatsTable.getOffsetTable().isEmpty()) {
            throw new MQClientException("Not found the topic stats info", null);
        }

        return topicStatsTable;
    }

    //......
}    
  • examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable

小结

  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

doc

你可能感兴趣的