深入理解之 Apache Pulsar Connector 与 Partition 关系篇

在前几篇文章中,我们已经介绍了 Connector 与 Function 的关系、在 Function Worker 中如何选举等。其中都涉及到了对 Producer 和 Consumer 的应用。

本篇文章我们就来尝试学习一下 pub/sub 模型与 Partition 的关系。

Partition

下面是官方文档对 Partition 的描述:

通常一个 Topic 仅被一个 Broker 服务,这限制了 Topic 的最大吞吐量。分区 Topic 是特殊的 Topic 类型,他可以被多个 Broker 处理,这让 Topic 有更高的吞吐量。其实在背后,分区的 Topic 通过 N 个内部 Topic 实现,N 是分区的数量。当向分区的 Topic 发送消息,每条消息被路由到其中一个 Broker。Pulsar 自动处理跨 Broker 的分区分布。
可以看到在 Pulsar 中 Partition 也是 Topic,因此可以把 Partition 和 Topic 当成一回事(下文中出现的 Partiton 或者 Topic 可理解为同一个词)上面引用中提到了「Pulsar 自动处理跨 Broker 的分区分布」。

本篇文章就来了解一下是如何自动处理的。主要内容是 Producer 如何发送数据给 Paritition,Consumer 又是如何从多个分区中进行消费的。

创建分区 Topic

从创建分区 Topic 开始看:

./bin/pulsar-admin topics create-partitioned-topic test-partition -p 4

以上命令会创建一个名称为 test-partition 分区数量为 4 的 Topic。

Producer 端

使用 Producer 发送消息:

./bin/pulsar-client produce test-partition --messages "333" -n 10

可以看到与正常的往 Topic 里发送消息是一样的。但是在代码中它们走的逻辑是不同的。

if (metadata.partitions > 1) {
      producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
              producerCreatedFuture, schema, interceptors);
  } else {
      producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
  }

可以看到当 Partition 数量大于 1 的时候,使用了 PartitionedProducerImpl 类来创建 Producer。

在 PartitionedProducerImpl 中初始化了路由策略。有三种路由策略,分别是 RoundRobinPartition,SinglePartition 和 CustomPartition。

  • RoundRobinPartition:如果没有 key,所有的消息通过 round-robin 方式被路由到不同的分区,以达到最大吞吐量。请注意 round-robin 并不是作用于每条单独的消息,而是作用于一批消息。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后分配消息到指定的分区。这是默认的模式。
  • SinglePartition:如果没有 Key 被提供,Producer 将会随机选择一个分区,把所有的消息发往该分区。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后将消息分配到指定分区。
  • CustomPartition:使用定制化消息路由实现,可以决定特定的消息进入指定的分区。用户可以创建定制化的路由模式,通过使用 Java client,实现 MessageRouter 接口。
this.routerPolicy = getMessageRouter();
start();

在 start 中又有如下的逻辑:

for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) {
    String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();
    ProducerImpl producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(),
            partitionIndex, schema, interceptors);
    });
}

可以看到对于多分区,会为每一个分区创建一个 Producer,分区名称的格式为 topic-name-partition-index,上面创建的分区会有 test-partition-partition-0,test-partition-partition-1,test-partition-partition-2,test-partition-partition-3 生成。

当 Producer 创建成功之后,就可以调用 send 方法进行数据的发送了,具体要发往哪个分区,继续往下看:

在 send 中会基于上面初始化的路由策略来进行选择具体发送到哪个分区:

int partition = routerPolicy.choosePartition(message, topicMetadata);
return producers.get(partition).internalSendAsync(message);

关于每种路由策略,后续会有相关文章进行介绍,本篇着重介绍从发送数据到分区,以及从分区接收数据的整个流程。这样就调用普通的 Producer 将数据发送到了 Broker。

Consumer 端

当 Broker 收到数据后,就会将该数据再发送给相应的 Consumer。

使用下面的命令创建消费者,对于每种模式都会生成 4 个 Consumer,这 4 个 Consumer 都是基于相同的订阅。

./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Exclusive
./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Failover
./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Shared

在消费者端的代码中可以看到,当分区数大于 1 时,选择了不同的逻辑来初始化 Consumer,实际在 MultiTopicsConsumerImpl 内部也是初始化了多个 Consumer。

if (metadata.partitions > 1) {
    consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
        listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
    consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
            consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, 
            this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}

Broker 端

再回到 Broker 端,看看 Broker 是如何进行数据路由的。

Consumer 有三种订阅模式,分别是 Exclusive,Faliover,Shared。

在 Exclusive 下,分区参数会一直被初始化为 0,因此会将数据发送给 topicname-partition-0 命名的 Consumer。

dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic, this);

在 Failover 模式下,会将数据发送给当前活跃的 Consumer :

int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
    // For non partition topics, assume index 0 to pick a predictable consumer
    partitionIndex = 0;
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
    dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex,
            topic, this);
}

在 Shared 模式下是最有趣的,现在的场景有多个分区,初始化了多个 Consumer。

 Consumer PriorityLevel Permits
     C1                 0             2
     C2                 0             1
     C3                 0             1
     C4                 1             2
     C5                 1             1

上面有 5 个 Consumer,第一列为 Consumer 名称,第二列是优先级,数字越小优先级越高,第三列是可以接收的数据量。

首先会发送数据给比较高优先级的 Consumer,这里会先发送给 C1、C2、C3,它们三个有相同的优先级,对于有不同优先级的 Consumer,Broker 会先投递数据给高优先级的 Consumer,直到达到 Consumer 的消息数限制,才会再投递消息给下一个优先级的 Consumer。

对于有相同优先级的 Consumer,Broker 默认会轮询进行数据投递,直到达到 Consumer 的消息数限制。因此上面的数据投递顺序会是:C1、C2、C3、C1、C4、C5、C4。

对于上面初始化的多个 Consumer 因为具有相同的优先级,因此基本上轮询进行数据投递的。

总结

本文主要分享了 Producer、Consumer 和 Partition 的关系,Producer 如何往多 Partition 发送数据,Consumer 又是如何从多 Partition 消费数据。

你可能感兴趣的