清除卡夫卡队列

我在我的本地机器上推送了一个太大的消息给我,现在我收到一个错误消息:

kafka.common.InvalidMessageSizeException: invalid message size 

增加fetch.size在这里并不理想,因为我实际上并不想接受那么大的消息。 有没有办法清除卡夫卡的话题?

暂时将主题上的保留时间更新为一秒钟:

 kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000 

然后等待清除生效(大约一分钟)。 清除后,还原以前的retention.ms值。

以下是我要删除名为MyTopic的主题的步骤:

  1. 停止Apache Kafka守护进程
  2. 删除主题数据文件夹: rm -rf /tmp/kafka-logs/MyTopic-0
  3. 删除主题元数据: zkCli.sh然后rmr /brokers/MyTopic
  4. 启动Apache Kafka守护进程

如果你错过了第3步,那么Apache Kafka将继续报告主题(例如,如果运行kafka-list-topic.sh )。

用Apache Kafka 0.8.0testing。

要清除队列,可以删除主题:

 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test 

然后重新创build它:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic test 

在Kafka 0.8.2中进行testing,快速入门示例:首先,将一行添加到config文件夹下的server.properties文件中:

 delete.topic.enable=true 

那么你可以运行这个命令:

 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test 

虽然接受的答案是正确的,但该方法已被弃用。 主题configuration现在应该通过kafka-configs

 kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic 

通过此方法设置的configuration可以使用该命令显示

 kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic 

是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易find它)。 卡夫卡重启后,话题将是空的。

最简单的方法是将单个日志文件的date设置为比保留期更早。 那么经纪人应该在几秒钟内把它们清理干净。 这提供了几个优点:

  1. 没有必要打倒经纪人,这是一个运行时操作。
  2. 避免无效的抵消例外的可能性(更多在下面)。

根据我对Kafka 0.7.x的经验,删除日志文件并重新启动代理可能会导致某些消费者无效的抵消例外。 发生这种情况的原因是,代理重新启动零(在没有任何现有的日志文件的情况下)的偏移,并且以前消费的主题的消费者将重新连接以请求特定的[一次有效]偏移量。 如果这个偏差恰好落在新的主题日志的范围之外,那么无论是开始还是结束都不会造成任何伤害和消费者的重新开始。 但是,如果偏移量落在新主题日志的范围内,则代理尝试获取消息集,但由于偏移量未与实际消息alignment而失败。

这也可以通过清除动物园pipe理员对该主题的消费者偏移来减轻。 但是,如果您不需要处女主题,只想删除现有内容,那么只需“触摸”几个主题日志就比停止代理,删除主题日志和清除某些动物园pipe理员节点要容易和可靠得多。

使用您的应用程序组清除特定主题中的所有消息(GroupName应与应用程序的kafka组名相同)。

./kafka-path/bin/kafka-console-consumer.sh –zookeeper localhost:2181 – topic topicName –from-beginning –group application-group

托马斯的build议是伟大的,但不幸的是zkCli旧版本的Zookeeper(例如3.3.6)似乎不支持rmr 。 例如比较现代Zookeeper的命令行实现与版本3.3 。

如果你面对的是老版本的Zookeeper,一个解决scheme就是使用一个客户端库,比如用于Python的zc.zk。 对于不熟悉Python的人,您需要使用pip或easy_install进行安装。 然后启动一个Python shell( python ),你可以这样做:

 import zc.zk zk = zc.zk.ZooKeeper('localhost:2181') zk.delete_recursive('brokers/MyTopic') 

甚至

 zk.delete_recursive('brokers') 

如果你想删除卡夫卡的所有主题。

无法添加作为评论,因为大小:不知道如果这是真的,除了更新retention.ms和retention.bytes,但我注意到主题清理策略应该是“删除”(默认),如果“紧凑”,它将要坚持消息更长,即如果它是“紧凑”,你也必须指定delete.retention.ms 。

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还必须监视最早的/最新的偏移量应该是一样的才能确认这个成功发生,也可以查看du -h / tmp / kafka-logs / test-topic-3-100- *

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

另一个问题是,您必须先获取当前的configuration,以便在删除成功后记得恢复: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics / ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

kafka没有清除/清除主题(队列)的直接方法,但可以通过删除该主题并重新创build它。

首先确保sever.properties文件有,如果没有,添加delete.topic.enable=true

然后,删除主题bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

然后再次创build它。

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2