Kafka0.11之RoundRobinPartitioner/HashPartitioner(Scala):

RoundRobinPartitioner/HashPartitioner:

 

import java.util
import java.util.concurrent.atomic.AtomicLong

import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster


class SelfRoundRobinPartitioner extends Partitioner {

  val next = new AtomicLong();

  override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
    val partitionInfo = cluster.partitionsForTopic(topic)
    val numPartitions = partitionInfo.size()
    val nextIndex = next.incrementAndGet()
    val partionNum: Long = nextIndex % numPartitions
    partionNum.toInt
  }


  override def close() = {

  }

  override def configure(configs: util.Map[String, _]) = {

  }
}

 

import java.util

import scala.math._
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster

class SelfHashPartitioner extends Partitioner {

  override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
    val partitionInfo = cluster.partitionsForTopic(topic)
    val numPartitions = partitionInfo.size()

    if (key.isInstanceOf[Int]) {
      abs(key.toString().toInt) % numPartitions
    }

    key.hashCode() % numPartitions

  }

  override def close() = {

  }

  override def configure(configs: util.Map[String, _]) = {

  }
}

 

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducer {
  def main(args: Array[String]): Unit = {

    val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
    //    val brokers = "192.168.1.151:9092"
    val topic = "ScalaTopic";

    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//    props.put("partitioner.class", classOf[SelfHashPartitioner].getName)
    props.put("partitioner.class", classOf[SelfRoundRobinPartitioner].getName)
    props.put("producer.type", "sync")
    props.put("batch.size", "1")
    props.put("acks", "all")

    val producer = new KafkaProducer[String, String](props);

    val sleepFlag = false;
    val message1 = new ProducerRecord[String, String](topic, "1", "test 1aa");
    producer.send(message1);
    if (sleepFlag) Thread.sleep(5000);
    val message2 = new ProducerRecord[String, String](topic, "1", "test 1bb");
    producer.send(message2);
    if (sleepFlag) Thread.sleep(5000);
    val message3 = new ProducerRecord[String, String](topic, "1", "test 1cc");
    producer.send(message3);
    if (sleepFlag) Thread.sleep(5000);
    val message4 = new ProducerRecord[String, String](topic, "4", "test 4dd");
    producer.send(message4);
    if (sleepFlag) Thread.sleep(5000);
    val message5 = new ProducerRecord[String, String](topic, "4", "test 4aa");
    producer.send(message5);
    if (sleepFlag) Thread.sleep(5000);
    val message6 = new ProducerRecord[String, String](topic, "3", "test 3bb");
    producer.send(message6);
    if (sleepFlag) Thread.sleep(5000);
    val message7 = new ProducerRecord[String, String](topic, "2", "test 2bb");
    producer.send(message7);
    if (sleepFlag) Thread.sleep(5000);
    producer.close()
  }
}

 

import java.lang
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._

object KafkaTConsumer {
  def main(args: Array[String]): Unit = {
    var groupid = "ScalaGroup"
    var consumerid = "ScalaConsumer"
    var topic = "ScalaTopic"

    //args match {
    //      case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
    //}

    val props = new Properties()
    props.put("bootstrap.servers", "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092")
    props.put("group.id", groupid)
    props.put("client.id", "test")
    props.put("consumer.id", consumerid)
    //    props.put("auto.offset.reset", "smallest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "100")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")


    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(java.util.Arrays.asList(topic))

    while (true) {
      val records: ConsumerRecords[String, String] = consumer.poll(100)
      for (record <- records) {
        println(s"Topic = ${record.topic()}, partition = ${record.partition()}, key = ${record.key()}, value = ${record.value()}")
      }
    }

  }
}

Roud robin运行结果:

Topic = ScalaTopic, partition = 0, key = 1, value = test 1cc
Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
Topic = ScalaTopic, partition = 2, key = 1, value = test 1bb
Topic = ScalaTopic, partition = 2, key = 4, value = test 4aa
Topic = ScalaTopic, partition = 1, key = 2, value = test 2bb

 Hash 运行结果:

Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
Topic = ScalaTopic, partition = 1, key = 1, value = test 1bb
Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
Topic = ScalaTopic, partition = 2, key = 2, value = test 2bb
Topic = ScalaTopic, partition = 1, key = 1, value = test 1cc
Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
Topic = ScalaTopic, partition = 1, key = 4, value = test 4aa

 

转载于:https://www.cnblogs.com/AK47Sonic/p/8097890.html

你可能感兴趣的