有没有办法在每次运行之前删除主题中的所有数据或删除主题?
有没有办法在每次运行之前删除主题中的所有数据或删除主题?
我可以修改KafkaConfig.scala文件来更改logRetentionHours
属性吗? 消费者读取消息时消息是否被删除?
我正在使用生产者从某处获取数据并将数据发送到消费者使用的特定主题,我可以在每次运行时删除该主题的所有数据吗? 每次我只想要新的数据。 有没有办法重新初始化这个话题?
不要认为它是受支持的。 看看这个JIRA问题 “添加删除主题支持”。
要手动删除:
- closures群集
- 清洁kafka日志目录(由kafka configuration文件中的
log.dir
属性指定)以及zookeeper数据 - 重新启动群集
对于任何给定的主题,你可以做的是
- 停止卡夫卡
- 清理特定于分区的kafka日志,kafka将其日志文件以“logDir / topic-partition”格式存储,因此对于名为“MyTopic”的主题,分区id 0的日志将存储在
/tmp/kafka-logs/MyTopic-0
其中/tmp/kafka-logs
由log.dir
属性指定 - 重新启动kafka
这NOT
一个好的和推荐的方法,但它应该工作。 在Kafka代理configuration文件中, log.retention.hours.per.topic
属性用于定义The number of hours to keep a log file before deleting it for some specific topic
另外,消费者读取消息时,消息是否被删除?
从Kafka文档 :
Kafka集群保留所有已发布的消息,无论消息是否已被消耗了一段可configuration的时间。 例如,如果日志保留时间设置为两天,那么消息发布后的两天内可用于消耗,之后将被丢弃以释放空间。 卡夫卡的性能在数据大小方面是有效的,因此保留大量数据不成问题。
实际上,保留在每个消费者基础上的唯一元数据是消费者在日志中的位置,称为“抵消”。 这个偏移量是由消费者控制的:通常消费者会在读取消息时线性地推进其偏移量,但实际上位置是由消费者控制的,并且它可以按任意顺序消费消息。 例如,消费者可以重置为较旧的偏移以重新处理。
为了find卡夫卡0.8 简单的消费者的例子,他们说读取的开始偏移量
Kafka包含两个常量,
kafka.api.OffsetRequest.EarliestTime()
在日志中查找数据的开始,并从那里开始stream式传输,kafka.api.OffsetRequest.LatestTime()
将仅传输新消息。
您还可以在那里find示例代码来pipe理客户端的偏移量。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; }
正如我在这里提到的Purge Kafka Queue :
在Kafka 0.8.2中进行testing,快速入门示例:首先,将一行添加到config文件夹下的server.properties文件中:
delete.topic.enable=true
那么你可以运行这个命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
我们几乎尝试了其他答案所描述的中等水平的成功。 真正为我们工作的(Apache Kafka 0.8.1)是class命令
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic yourtopic –zookeeper localhost:2181
作为一个肮脏的解决方法,您可以调整每个主题的运行时间保留设置,例如bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
( retention.bytes = 0也可能工作)
过了一段时间,卡夫卡应该释放空间。 不知道这与重新创build主题相比是否有任何意义。
PS。 一旦卡夫卡用清洁完成,更好地把保留设置回来。
以下是用于清空和删除Kafka主题的脚本,假定localhost作为动物园pipe理员服务器,而Kafka_Home设置为安装目录:
下面的脚本将通过将其保留时间设置为1秒来清空主题,然后删除configuration:
#!/bin/bash echo "Enter name of topic to empty:" read topicName /$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add config retention.ms=1000 sleep 5 /$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms
要完全删除主题,您必须停止任何适用的kafka代理,并从kafka日志目录(默认:/ tmp / kafka-logs)中删除它的目录,然后运行此脚本以从zookeeper删除主题。 为了validation它是否已经从zookeeper中删除,ls / brokers / topics的输出应该不再包含主题:
#!/bin/bash echo "Enter name of topic to delete from zookeeper:" read topicName /$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF rmr /brokers/topics/$topicName ls /brokers/topics quit EOF
有关主题及其分区的所有数据都存储在tmp/kafka-logs/
。 而且它们以格式topic-partionNumber
存储,所以如果你想删除一个主题newTopic
,你可以:
- 停止卡夫卡
- 删除文件
rm -rf /tmp/kafka-logs/newTopic-*
testing与卡夫卡0.10
1. stop zookeeper & Kafka server** , 2. then go to **kafka-logs** folder , there you will see list of kafka topic folders, delete folder with topic name 3. go to **zookeeper-data** folder , delete data inside that. 4. start zookeeper & kafka server again.
注意:如果您要删除kafka-logs中的主题文件夹,而不是从zookeeper-data文件夹中删除主题文件夹,那么您将看到主题仍然存在。
1-停止ZooKeeper和Kafka
2-在server.properties中,更改log.retention.hours值。 您可以评论log.retention.hours
并添加log.retention.ms=1000
。 它会保留卡夫卡主题的logging只有一秒钟。
3-开始动物园pipe理员和卡夫卡。
4-检查消费者控制台。 当我第一次打开控制台时,logging在那里。 但是当我再次打开控制台时,logging被删除。
5-稍后,您可以将log.retention.hours
的值设置为您所需的数字。
在手动删除一个kafka集群的话题,你可能会检查出来https://github.com/darrenfu/bigdata/issues/6在大多数解决scheme中错过了很多重要的步骤是删除;/config/topics/<topic_name>
在ZK。
对于酿造用户
如果你像我一样使用brew
,浪费了大量的时间来search臭名昭着的kafka-logs
文件夹, kafka-logs
不用担心了。 (请让我知道,如果这对你和多种不同版本的自制,卡夫卡等:))
你可能会在下面find它:
位置:
/usr/local/var/lib/kafka-logs
如何真正findpath
(这对基本上每个通过brew安装的应用程序也是有帮助的)
1) brew services list
kafka开始matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist
2)打开并阅读你在上面find的plist
3)find定义server.properties
位置的行打开它,在我的情况下:
-
/usr/local/etc/kafka/server.properties
4)查找log.dirs
行:
log.dirs =的/ usr /本地的/ var / lib中/卡夫卡-日志
5)转到该位置并删除所需主题的日志
6)重新启动卡夫卡与brew services restart kafka
启动卡夫卡