什么决定了卡夫卡消费者抵消?

我对卡夫卡相对较新。 我已经做了一些实验,但有些事情我不清楚消费者抵消。 根据我的理解,当一个消费者启动时,它将开始读取的偏移量由configuration设置auto.offset.reset (如果我错了,请纠正我)。

现在说,例如,主题中有10条消息(偏移0到9),消费者在发生故障之前(或者在杀死消费者之前)恰好消耗了5条消息。 然后说我重新启动消费者的过程。 我的问题是:

如果auto.offset.reset被设置为smallest ,它是否总是开始从偏移0消耗?

如果auto.offset.reset被设置为largest ,它是否会开始消耗偏移5?

这种情况下的行为总是具有确定性吗? 如果我的问题中有任何不清楚的地方,请不要犹豫。 提前致谢。

这比你描述的要复杂一点。 auto.offset.resetconfiguration只在您的使用者组没有提交的某个有效偏移量的情况下(2个支持的偏移量存储现在是Kafka和Zookeeper)。 而且这也取决于你使用的是什么types的消费者。

如果你使用高级的Java消费者,那么想象下面的情况:

  1. 消费者群组1中的消费者已经消费了5条消息并且死亡。 下一次你启动这个消费者它甚至不会使用auto.offset.resetconfiguration,并将继续从它死亡的地方,因为它只会从偏移量存储(Kafka或ZK,如我所提到)获取存储的偏移量。

  2. 你有一个主题的消息(如你所描述的),你开始在一个新的消费者组group2中的消费者。 没有偏移存储在任何地方,这次auto.offset.resetconfiguration将决定是从主题的开始( smallest )开始还是从主题的结尾开始( largest

还有一件事影响了偏移值对应smallestlargestconfiguration是日志保留策略。 想象一下,你有一个保留configuration为1小时的话题。 您生成5条消息,然后一个小时后发布5条消息。 largest偏移将保持与前面的例子相同,但是smallest偏移将不能为0因为Kafka将已经移除这些消息,因此最小的可用偏移将是5

上面提到的所有内容都与SimpleConsumer无关,每次运行它时,都将决定从何处开始使用auto.offset.resetconfiguration。

只是一个更新:从Kafka 0.9和更高版本,Kafka正在使用一个新的Java版本的消费者和auto.offset.reset参数名称已更改; 从手册:

在Kafka中没有初始偏移时,或者当前的偏移在服务器上不存在(例如,因为该数据已被删除),该怎么做:

最早 :自动将偏移量重置为最早的偏移量

最新 :自动将偏移量重置为最新的偏移量

none :如果消费者组没有find先前的偏移量,则向消费者抛出exception

其他任何事情:向消费者抛出exception。

在查看上面的答案之后,我花了一些时间来find这个,所以我认为它可能是有用的社区发布。

还有更多的偏差。保留。分钟。 如果自上次提交以来的时间是> auto.offset.reset ,则比auto.offset.reset也会auto.offset.reset