Java,如何获得apache kafka中的主题中的消息数量

我正在使用apache kafka进行消息传递。 我已经实现了Java中的生产者和消费者。 我们怎样才能得到一个话题的消息数量?

从消费者的angular度来看,唯一想到的就是实际上消费这些消息并计算它们。

卡夫卡经纪人公开了JMX计数器的数量,自启动以来收到的消息,但你不知道已经有多less人已被清除。

在大多数情况下,Kafka中的消息最好被看作是一个无限的stream,并且获得离散的值,即当前保存在磁盘上的数量是不相关的。 而且,在处理一个主题中包含消息子集的代理集群时,事情会变得更加复杂。

这不是Java,但可能是有用的

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' 

我实际上使用这个来对我的POC进行基准testing。 您要使用ConsumerOffsetChecker的项目。 你可以使用下面的bash脚本来运行它。

 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup 

以下是结果: 在这里输入图像说明 正如您在红色框中看到的,999是主题中当前消息的编号。

更新:ConsumerOffsetChecker自0.10.0起弃用,您可能要开始使用ConsumerGroupCommand。

使用https://prestodb.io/docs/current/connector/kafka-tutorial.html

由Facebook提供的超级SQL引擎,连接到多个数据源(Cassandra,Kafka,JMX,Redis …)。

PrestoDB作为一个带有可选工作服务器的服务器运行(没有额外的工作人员有一个独立模式),然后使用一个小的可执行JAR(称为presto CLI)来进行查询。

一旦你configuration好了Presto服务器,你就可以使用传统的SQL:

 SELECT count(*) FROM TOPIC_NAME; 

要获取为该主题存储的所有消息,您可以在每个分区的stream的开头和末尾寻找使用者,并对结果进行求和

 List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum()); 

我自己并没有尝试过,但似乎是有道理的。

您也可以使用kafka.tools.ConsumerOffsetChecker ( 来源 )。

Apache Kafka命令在主题的所有分区上获取未处理的消息:

 kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 

打印:

 Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none 

第6列是未处理的消息。 把它们加起来就像这样:

 kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}' 

awk读取行,跳过标题行并添加第6列,并在最后打印总和。

打印

 5 

使用Kafka 2.11-1.0.0的Java客户端,您可以执行以下操作:

  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } } 

输出是这样的:

 offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13