我如何发送卡夫卡(超过15MB)的大消息?
我使用Java Producer API将string消息发送到Kafka V. 0.8。 如果邮件大小约为15 MB,则会收到MessageSizeTooLargeException
。 我试图设置message.max.bytes
到40 MB,但我仍然得到exception。 小信息没有问题。
(生产者出现exception,我没有这个应用程序的消费者。)
我能做些什么来摆脱这个exception?
我的示例生产者configuration
private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); }
错误日志:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source)
您需要调整三个(或四个)属性:
- 消费者方:
fetch.message.max.bytes
– 这将确定消费者可以获取的最大消息大小。 - 代理方:
replica.fetch.max.bytes
– 这将允许代理中的副本在群集内发送消息,并确保消息被正确复制。 如果这太小,则消息将永远不会被复制,因此,消费者永远不会看到该消息,因为该消息将永远不会被提交(完全复制)。 - 经纪人方面:
message.max.bytes
– 这是经纪人从制作人可以收到的最大消息大小。 - 代理方(每个主题):
max.message.bytes
– 这是代理允许附加到主题的最大消息大小。 这个大小是经过validation的预压缩。 (默认为代理的message.max.bytes
。)
我发现关于数字2的难题 – 你没有得到任何来自卡夫卡的例外,消息或警告,所以当你发送大量消息时一定要考虑到这一点。
Kafka 0.10和新消费者需要做些小的改动,比起laughing_man的回答 :
- 代理:没有更改,您仍然需要增加属性
message.max.bytes
和replica.fetch.max.bytes
。message.max.bytes
必须等于或小于replica.fetch.max.bytes
(*)。 - 生产者:增加
max.request.size
发送更大的消息。 - 消费者:增加
max.partition.fetch.bytes
以接收更大的消息。
(*)阅读注释以了解有关message.max.bytes
<= replica.fetch.max.bytes
更多message.max.bytes
您需要覆盖以下属性:
代理configuration($ KAFKA_HOME / config / server.properties)
- replica.fetch.max.bytes
- message.max.bytes
消费者configuration($ KAFKA_HOME / config / consumer.properties)
这一步不适合我。 我把它添加到消费者的应用程序,它工作正常
- fetch.message.max.bytes
重新启动服务器。
看看这个文档的更多信息: http : //kafka.apache.org/08/configuration.html
这个想法是有相同大小的消息从卡夫卡制作人发送到卡夫卡经纪人,然后由卡夫卡消费者
卡夫卡生产商 – >卡夫卡经纪人 – >卡夫卡消费者
假设如果要求发送15MB的消息,则生产者,代理和消费者三者都需要同步。
卡夫卡生产者发送15 MB – > 卡夫卡经纪允许/存储15 MB – > 卡夫卡消费者收到15 MB
因此,该设置应为A.)在代理上:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640
B.)在消费者:fetch.message.max.bytes = 15728640
记住message.max.bytes
属性的一个关键事项必须与消费者的fetch.message.max.bytes
属性同步 。 获取大小必须至less与最大消息大小一样大,否则可能存在生产者可以发送大于消费者消费/获取的消息的情况。 这可能值得一看。
您正在使用哪个版本的Kafka? 还提供一些你得到的更多细节跟踪。 有没有像payload size of xxxx larger than 1000000
日志?