当前位置:首页 > 开发 > 系统架构 > 架构 > 正文

kafka获得最新partition offset

发表于: 2015-06-05   作者:blackproof   来源:转载   浏览:
摘要: kafka获得partition下标,需要用到kafka的simpleconsumer   import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.

kafka获得partition下标,需要用到kafka的simpleconsumer

 

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.Map.Entry;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.SimpleConsumer;

public class KafkaOffsetTools {

	public static void main(String[] args) {
		// 读取kafka最新数据
		// Properties props = new Properties();
		// props.put("zookeeper.connect",
		// "192.168.6.18:2181,192.168.6.20:2181,192.168.6.44:2181,192.168.6.237:2181,192.168.6.238:2181/kafka-zk");
		// props.put("zk.connectiontimeout.ms", "1000000");
		// props.put("group.id", "dirk_group");
		//
		// ConsumerConfig consumerConfig = new ConsumerConfig(props);
		// ConsumerConnector connector =
		// Consumer.createJavaConsumerConnector(consumerConfig);

		String topic = "dirkz";
		String seed = "118.26.148.18";
		int port = 9092;
		if (args.length >= 3) {
			topic = args[0];
			seed = args[1];
			port = Integer.valueOf(args[2]);
		}
		List<String> seeds = new ArrayList<String>();
		seeds.add(seed);
		KafkaOffsetTools kot = new KafkaOffsetTools();

		TreeMap<Integer,PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic);
		
		int sum = 0;
		
		for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) {
			int partition = entry.getKey();
			String leadBroker = entry.getValue().leader().host();
			String clientName = "Client_" + topic + "_" + partition;
			SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,
					64 * 1024, clientName);
			long readOffset = getLastOffset(consumer, topic, partition,
					kafka.api.OffsetRequest.LatestTime(), clientName);
			sum += readOffset;
			System.out.println(partition+":"+readOffset);
			if(consumer!=null)consumer.close();
		}
		System.out.println("总和:"+sum);

	}

	public KafkaOffsetTools() {
//		m_replicaBrokers = new ArrayList<String>();
	}

//	private List<String> m_replicaBrokers = new ArrayList<String>();

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
//		long[] offsets2 = response.offsets(topic, 3);
		return offsets[0];
	}

	private TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic) {
		TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup"+new Date().getTime());
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						map.put(part.partitionId(), part);
//						if (part.partitionId() == a_partition) {
//							returnMetaData = part;
//							break loop;
//						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
//		if (returnMetaData != null) {
//			m_replicaBrokers.clear();
//			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
//				m_replicaBrokers.add(replica.host());
//			}
//		}
		return map;
	}

}

 

kafka获得最新partition offset

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
看如下代码段,kafka sink的key完全取决于上游发来的event中的header。所以,如果前面的source是像e
对Kafka offset的管理,一直没有进行系统的总结,这篇文章对它进行分析。 什么是offset offset是con
对Kafka offset的管理,一直没有进行系统的总结,这篇文章对它进行分析。 什么是offset offset是con
对Kafka offset的管理,一直没有进行系统的总结,这篇文章对它进行分析。 什么是offset offset是con
引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以
partition相当于一个大文件呗平均分成多个segment数据文件,每个segment尤两个两个文件构成***.inde
1. 获取Kafka Consumer Offset Monitor安装包 http://pan.baidu.com/s/1kT5KeQ7 2. Kafka Consumer
1. 获取Kafka Consumer Offset Monitor安装包 http://pan.baidu.com/s/1kT5KeQ7 2. Kafka Consumer
1. 获取Kafka Consumer Offset Monitor安装包 http://pan.baidu.com/s/1kT5KeQ7 2. Kafka Consumer
1. Kafka集群partition replication默认自动分配分析 下面以一个Kafka集群中4个Broker举例,创建1个
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号