我们如何使用API从IDE在Kafka中创build主题
我们如何使用API在IDE中使用Kafka创build主题,因为当我这样做时:
bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181
我得到的错误:
bash: bin/kafka-create-topic.sh: No such file or directory
我按照开发者设置。
在卡夫卡0.8.1+ – 今天的卡夫卡的最新版本 – 您可以编程方式通过AdminCommand
创build一个新的主题。 CreateTopicCommand
(旧版Kafka 0.8.0的一部分)的function被移到了AdminCommand
。
卡夫卡0.8.1的Scala例子:
import kafka.admin.AdminUtils import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient.ZkClient // Create a ZooKeeper client val sessionTimeoutMs = 10000 val connectionTimeoutMs = 10000 // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer) // Create a topic named "myTopic" with 8 partitions and a replication factor of 3 val topicName = "myTopic" val numPartitions = 8 val replicationFactor = 3 val topicConfig = new Properties AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)
以sbt为例构build依赖关系:
libraryDependencies ++= Seq( "com.101tec" % "zkclient" % "0.4", "org.apache.kafka" % "kafka_2.10" % "0.8.1.1" exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"), ... )
编辑:添加了Kafka 0.9.0.0(截至2016年1月最新版本)的Java示例。
Maven的依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>
码:
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class KafkaJavaExample { public static void main(String[] args) { String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 8 * 1000; // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. ZkClient zkClient = new ZkClient( zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); // Security for Kafka was added in Kafka 0.9.0.0 boolean isSecureKafkaCluster = false; ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); String topic = "my-topic"; int partitions = 2; int replication = 3; Properties topicConfig = new Properties(); // add per-topic configurations settings here AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig); zkClient.close(); } }
编辑2:添加了Kafka 0.10.2.0的Java示例(截至2017年4月的最新版本)。
Maven的依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> </dependency>
码:
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import java.util.Properties; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class KafkaJavaExample { public static void main(String[] args) { String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 8 * 1000; String topic = "my-topic"; int partitions = 2; int replication = 3; Properties topicConfig = new Properties(); // add per-topic configurations settings here // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. ZkClient zkClient = new ZkClient( zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); // Security for Kafka was added in Kafka 0.9.0.0 boolean isSecureKafkaCluster = false; ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); zkClient.close(); } }
为了通过java api和Kafka 0.8+创build主题,请尝试以下操作,
先input以下声明
import kafka.utils.ZKStringSerializer$;
以下面的方式为ZkClient创build对象,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
您可以尝试使用kafka.admin.CreateTopicCommand scala类从Java代码创build主题…提供必要的参数。
String [] arguments = new String[8]; arguments[0] = "--zookeeper"; arguments[1] = "10.***.***.***:2181"; arguments[2] = "--replica"; arguments[3] = "1"; arguments[4] = "--partition"; arguments[5] = "1"; arguments[6] = "--topic"; arguments[7] = "test-topic-Biks"; CreateTopicCommand.main(arguments);
注意:你应该添加jopt-simple-4.5
& zkclient-0.1
的maven依赖关系
从0.11.0.0开始,你需要的是:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
此工件现在包含AdminClient
( org.apache.kafka.clients.admin
)。
AdminClient
可以处理许多Kafkapipe理任务,包括主题创build:
Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); AdminClient admin = AdminClient.create(config); Map<String, String> configs = new HashMap<>(); int partitions = 1; int replication = 1; admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));
此命令的输出是一个CreateTopicsResult
,您可以使用它来获取整个操作或每个单独主题创build的Future
:
- 为了获得整个操作的未来,使用
CreateTopicsResult#all()
。 - 为所有主题单独获取
Future
,使用CreateTopicsResult#values()
。
例如:
CreateTopicsResult result = ... KafkaFuture<Void> all = result.all();
要么:
CreateTopicsResult result = ... for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { try { entry.getValue().get(); log.info("topic {} created", entry.getKey()); } catch (InterruptedException | ExecutionException e) { if (Throwables.getRootCause(e) instanceof TopicExistsException) { log.info("topic {} existed", entry.getKey()); } } }
KafkaFuture
是“一个支持调用链和其他asynchronous编程模式的灵活的未来”,“最终将成为Java 8 CompletebleFuture
之上的一个薄弱之处CompletebleFuture
。
如果您使用Kafka 0.10.0.0+,则从Java创build主题需要传递RackAwareModetypes的参数。 这是一个Scala案例对象,从Java中获取它的实例是非常棘手的(例如, 如何从Java获得一个Scala案例对象,但是它不适用于我们的案例)。
幸运的是,rackAwareMode是一个可选参数。 但是Java不支持可选参数。 我们如何解决这个问题? 这是一个解决scheme:
AdminUtils.createTopic(zkUtils, topic, 1, 1, AdminUtils.createTopic$default$5(), AdminUtils.createTopic$default$6());
与miguno的答案一起使用,你很好走。
几个方法你的电话将无法正常工作。
-
如果您的Kafka集群没有足够的节点来支持3的复制值。
-
如果有一个chrootpath前缀,你必须把它追加到zookeeper端口之后
-
运行时你不在Kafka安装目录下(这是最可能的)
从Kafka 0.8 Producer示例下面的示例将创build一个名为page_visits
的主题,并且如果在Kafka Brokerconfiguration文件中将auto.create.topics.enable
属性设置为true
(默认),也会开始生成
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data); } producer.close(); } }
在scala中创build主题。 如果您在代理configuration文件中以seudo模式运行
auto.create.topics.enable = true
它将启用在服务器上自动创build主题。 如果设置为true,则尝试生成,使用或获取不存在的主题的元数据将自动使用默认的复制因子和分区数创build它。
下面的代码片段。
import scala.util.Random import java.util.Properties import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.KeyedMessage import java.util.Date class SimpleProducer { def sendmessages(){ val rnd = new Random(); val props = new Properties(); props.put("metadata.broker.list", "192.1.1.1:6667"); props.put("serializer.class", "kafka.serializer.StringEncoder"); //props.put("partitioner.class", "rtbi.dis.producers.SimplePartitioner") val config = new ProducerConfig(props); val producer = new Producer[String, String](config); for (event<-1 to 5000) { val runtime = new Date().getTime; val ip = "192.1.1.1" + rnd.nextInt(255); val msg = runtime + ",www.example.com," + ip; val data = new KeyedMessage[String, String]("mytopic", ip, msg); //here mytopic is a topic producer.send(data); } producer.close(); } } object SimpleProducer extends App{ val s= new SimpleProducer().sendmessages(); }
你从哪个IDE尝试?
请提供完整的path,下面是从terminal命令将创build一个主题
-
cd kafka/bin
-
./kafka-create-topic.sh --topic test --zookeeper localhost:2181
从Kafka 0.10.1开始,Michael提到的ZKStringSerializer是私有的(Scala)。 您可以在ZkUtils中使用工厂方法createZkClient或createZkClientAndConnection。
卡夫卡0.10.1的Scala例子:
import kafka.utils.ZkUtils val sessionTimeoutMs = 10000 val connectionTimeoutMs = 10000 val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection( "localhost:2181", sessionTimeoutMs, connectionTimeoutMs)
然后按照Michael的build议创build主题:
import kafka.admin.AdminUtils val zkUtils = new ZkUtils(zkClient, zkConnection, false) val numPartitions = 4 val replicationFactor = 1 val topicConfig = new Properties val topic = "my-topic" AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)