我如何发送卡夫卡(超过15MB)的大消息?

我使用Java Producer API将string消息发送到Kafka V. 0.8。 如果邮件大小约为15 MB,则会收到MessageSizeTooLargeException 。 我试图设置message.max.bytes到40 MB,但我仍然得到exception。 小信息没有问题。

(生产者出现exception,我没有这个应用程序的消费者。)

我能做些什么来摆脱这个exception?

我的示例生产者configuration

 private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); } 

错误日志:

 4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source) 

您需要调整三个(或四个)属性:

  • 消费者方: fetch.message.max.bytes – 这将确定消费者可以获取的最大消息大小。
  • 代理方: replica.fetch.max.bytes – 这将允许代理中的副本在群集内发送消息,并确保消息被正确复制。 如果这太小,则消息将永远不会被复制,因此,消费者永远不会看到该消息,因为该消息将永远不会被提交(完全复制)。
  • 经纪人方面: message.max.bytes – 这是经纪人从制作人可以收到的最大消息大小。
  • 代理方(每个主题): max.message.bytes – 这是代理允许附加到主题的最大消息大小。 这个大小是经过validation的预压缩。 (默认为代理的message.max.bytes 。)

我发现关于数字2的难题 – 你没有得到任何来自卡夫卡的例外,消息或警告,所以当你发送大量消息时一定要考虑到这一点。

Kafka 0.10和新消费者需要做些小的改动,比起laughing_man的回答 :

  • 代理:没有更改,您仍然需要增加属性message.max.bytesreplica.fetch.max.bytesmessage.max.bytes必须等于或小于replica.fetch.max.bytes (*)。
  • 生产者:增加max.request.size发送更大的消息。
  • 消费者:增加max.partition.fetch.bytes以接收更大的消息。

(*)阅读注释以了解有关message.max.bytes <= replica.fetch.max.bytes更多message.max.bytes

您需要覆盖以下属性:

代理configuration($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

消费者configuration($ KAFKA_HOME / config / consumer.properties)
这一步不适合我。 我把它添加到消费者的应用程序,它工作正常

  • fetch.message.max.bytes

重新启动服务器。

看看这个文档的更多信息: http : //kafka.apache.org/08/configuration.html

这个想法是有相同大小的消息从卡夫卡制作人发送到卡夫卡经纪人,然后由卡夫卡消费者

卡夫卡生产商 – >卡夫卡经纪人 – >卡夫卡消费者

假设如果要求发送15MB的消息,则生产者,代理和消费者三者都需要同步。

卡夫卡生产者发送15 MB – > 卡夫卡经纪允许/存储15 MB – > 卡夫卡消费者收到15 MB

因此,该设置应为A.)在代理上:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.)在消费者:fetch.message.max.bytes = 15728640

记住message.max.bytes属性的一个关键事项必须与消费者的fetch.message.max.bytes属性同步 。 获取大小必须至less与最大消息大小一样大,否则可能存在生产者可以发送大于消费者消费/获取的消息的情况。 这可能值得一看。
您正在使用哪个版本的Kafka? 还提供一些你得到的更多细节跟踪。 有没有像payload size of xxxx larger than 1000000日志?