Tag: apache kafka

Java,如何获得apache kafka中的主题中的消息数量

我正在使用apache kafka进行消息传递。 我已经实现了Java中的生产者和消费者。 我们怎样才能得到一个话题的消息数量?

我如何发送卡夫卡(超过15MB)的大消息?

我使用Java Producer API将string消息发送到Kafka V. 0.8。 如果邮件大小约为15 MB,则会收到MessageSizeTooLargeException 。 我试图设置message.max.bytes到40 MB,但我仍然得到exception。 小信息没有问题。 (生产者出现exception,我没有这个应用程序的消费者。) 我能做些什么来摆脱这个exception? 我的示例生产者configuration private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); } 错误日志: 4709 [main] WARN kafka.producer.async.DefaultEventHandler – Produce request with correlation id 214 failed due […]

有没有办法在每次运行之前删除主题中的所有数据或删除主题?

有没有办法在每次运行之前删除主题中的所有数据或删除主题? 我可以修改KafkaConfig.scala文件来更改logRetentionHours属性吗? 消费者读取消息时消息是否被删除? 我正在使用生产者从某处获取数据并将数据发送到消费者使用的特定主题,我可以在每次运行时删除该主题的所有数据吗? 每次我只想要新的数据。 有没有办法重新初始化这个话题?

领袖在控制台制作人中没有可用的Kafka

我正在尝试使用卡夫卡。 所有configuration都正确完成,但是当我试图从控制台产生消息时,我不断收到以下错误 WARN Error while fetching metadata with correlation id 39 : {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 任何人都可以请帮助,因为我无法find任何地方的解决scheme?

ActiveMQ vs阿波罗vs卡夫卡

我以前没有任何关于* MQ的经验,我正在寻求build立关于JMS和消息队列的知识。 那样的话,我想知道我应该从ActiveMQ开始,还是完全“忽略”它,并开始自学阿波罗。 Apollo与ActiveMQ一样function齐全吗? 它是否实现JMS 2.0(我看到ActiveMQ被困在1.1)? 我会错过一些非常重要的东西吗? 另外,卡夫卡如何比较这两个解决scheme?

Apache Kafka vs Apache Storm

Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据pipe道中使用这两种技术来处理事件数据? 就实时数据pipe道而言,我认为这两项工作完全相同。 我们如何在数据pipe道上使用这两种技术?

在Zookeeper 3.4.6中使用Kafka 0.8.1时运行到LeaderNotAvailableException

我按照他们的网站安装了稳定版本的kafka(0.8.1和2.9.2 Scala),并且正在运行一个3节点zookeeper合奏(3.4.6)。 我试图创build一个testing主题,但是看到没有领导者被分配到主题的分区: [kafka_2.9.2-0.8.1]$ ./bin/kafka-topics.sh –zookeeper <zookeeper_ensemble> –describe –topic test-1 Topic:test-1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: test-1 Partition: 0 **Leader: none** Replicas: 0,1,2 **Isr:** 我尝试使用控制台生产者编写主题,但遇到了LeaderNotAvailableExceptionexception: [kafka_2.9.2-0.8.1]$ ./kafka-console-producer.sh –broker-list <broker_list> –topic test-1 hello world [2014-04-22 11:58:48,297] WARN Error while fetching metadata [{TopicMetadata for topic test-1 -> No partition metadata for topic test-1 due to kafka.common.LeaderNotAvailableException}] for topic […]

什么决定了卡夫卡消费者抵消?

我对卡夫卡相对较新。 我已经做了一些实验,但有些事情我不清楚消费者抵消。 根据我的理解,当一个消费者启动时,它将开始读取的偏移量由configuration设置auto.offset.reset (如果我错了,请纠正我)。 现在说,例如,主题中有10条消息(偏移0到9),消费者在发生故障之前(或者在杀死消费者之前)恰好消耗了5条消息。 然后说我重新启动消费者的过程。 我的问题是: 如果auto.offset.reset被设置为smallest ,它是否总是开始从偏移0消耗? 如果auto.offset.reset被设置为largest ,它是否会开始消耗偏移5? 这种情况下的行为总是具有确定性吗? 如果我的问题中有任何不清楚的地方,请不要犹豫。 提前致谢。

清除卡夫卡队列

我在我的本地机器上推送了一个太大的消息给我,现在我收到一个错误消息: kafka.common.InvalidMessageSizeException: invalid message size 增加fetch.size在这里并不理想,因为我实际上并不想接受那么大的消息。 有没有办法清除卡夫卡的话题?

使用Kafka作为(CQRS)事件库。 好主意?

虽然我以前碰到过Kafka ,但是最近才意识到Kafka也许可以作为一个CQRS 事件库的基础。 卡夫卡支持的主要观点之一是: 事件捕获/存储,当然是所有的HA。 Pub / Sub架构 能够重放事件日志,允许新用户在事后注册系统。 无可否认,我并不是100%熟悉CQRS /事件采购,但这似乎与事件应该是非常接近的。 有趣的是:我真的无法find关于卡夫卡被用作事件库的很多东西,所以也许我一定会错过一些东西。 那么,卡夫卡有什么遗漏,因为它是一个很好的事件? 它会工作吗? 使用它生产? 对洞察力,链接等感兴趣 基本上,根据系统所接收的事务/事件来保存系统的状态,而不是仅保存通常所做的系统的当前状态/快照。 (把它看作会计中的一个总账:所有的交易最终总计到最终的状态)这允许各种各样的很酷的事情,但只是在提供的链接上阅读。