什么决定了卡夫卡消费者抵消?
我对卡夫卡相对较新。 我已经做了一些实验,但有些事情我不清楚消费者抵消。 根据我的理解,当一个消费者启动时,它将开始读取的偏移量由configuration设置auto.offset.reset
(如果我错了,请纠正我)。
现在说,例如,主题中有10条消息(偏移0到9),消费者在发生故障之前(或者在杀死消费者之前)恰好消耗了5条消息。 然后说我重新启动消费者的过程。 我的问题是:
如果auto.offset.reset
被设置为smallest
,它是否总是开始从偏移0消耗?
如果auto.offset.reset
被设置为largest
,它是否会开始消耗偏移5?
这种情况下的行为总是具有确定性吗? 如果我的问题中有任何不清楚的地方,请不要犹豫。 提前致谢。
这比你描述的要复杂一点。 auto.offset.reset
configuration只在您的使用者组没有提交的某个有效偏移量的情况下(2个支持的偏移量存储现在是Kafka和Zookeeper)。 而且这也取决于你使用的是什么types的消费者。
如果你使用高级的Java消费者,那么想象下面的情况:
-
消费者群组1中的消费者已经消费了5条消息并且死亡。 下一次你启动这个消费者它甚至不会使用
auto.offset.reset
configuration,并将继续从它死亡的地方,因为它只会从偏移量存储(Kafka或ZK,如我所提到)获取存储的偏移量。 -
你有一个主题的消息(如你所描述的),你开始在一个新的消费者组
group2
中的消费者。 没有偏移存储在任何地方,这次auto.offset.reset
configuration将决定是从主题的开始(smallest
)开始还是从主题的结尾开始(largest
)
还有一件事影响了偏移值对应smallest
和largest
configuration是日志保留策略。 想象一下,你有一个保留configuration为1小时的话题。 您生成5条消息,然后一个小时后发布5条消息。 largest
偏移将保持与前面的例子相同,但是smallest
偏移将不能为0
因为Kafka将已经移除这些消息,因此最小的可用偏移将是5
。
上面提到的所有内容都与SimpleConsumer
无关,每次运行它时,都将决定从何处开始使用auto.offset.reset
configuration。
只是一个更新:从Kafka 0.9和更高版本,Kafka正在使用一个新的Java版本的消费者和auto.offset.reset参数名称已更改; 从手册:
在Kafka中没有初始偏移时,或者当前的偏移在服务器上不存在(例如,因为该数据已被删除),该怎么做:
最早 :自动将偏移量重置为最早的偏移量
最新 :自动将偏移量重置为最新的偏移量
none :如果消费者组没有find先前的偏移量,则向消费者抛出exception
其他任何事情:向消费者抛出exception。
在查看上面的答案之后,我花了一些时间来find这个,所以我认为它可能是有用的社区发布。
还有更多的偏差。保留。分钟。 如果自上次提交以来的时间是> auto.offset.reset
,则比auto.offset.reset
也会auto.offset.reset