0%

kafka基本操作

启动kafka

kafka的运行需要java环境和zookeeper,所以需要先下载好jdk和zookeeper,然后启动zookeeper server。详细操作可以参考这里

启动一个kafka server(broker)的命令:

1
bin/kafka-server-start.sh config/server.properties

该命令需要传递一个kafka server配置文件的路径,使用默认的配置即可。

创建topic

kafka提供了一个名为”kafka-topics.sh”的脚本,用于在kafka server上创建topic,在命令行输入命令:

1
2
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic hello-kafka

该命令创建了一个叫做hello-kafka的topic。

创建topic后,也可以使用这个脚本获取服务器中的主题列表:

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

启动生产者以发送消息

kafka提供一个kafka-console-producer.sh的脚步用于启动一个producer

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

这个脚步需要两个参数,一个是kafka server(broker)的ip和端口,另外一个参数就是主题的名称。

启动producer后,脚本会从stdin获取输入,然后发送到kafka集群中。默认情况下,每个新行都作为新消息发布,可以在config/producer.properties文件中配置producer的一些属性。

启动消费者以接收消息

kafka也提供了一个Kafka-console-consumer.sh脚本来启动一个消费者

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name

单节点多代理配置

之前的基本操作的说明都是在单节点(一台物理机器)-单代理(一个kafka broker)下的场景下的,现在说明如何在一台物理机上启动多个Kafka server(broker)。

首先需要启动zookeeper服务器,然后由于需要启动多个kafka broker,需要为每个broker创建一个配置文件,可以将config/server.properties文件复制多份,并重新命令为config/server-one.properties,config/server-two.properties,注意需要修改配置文件中的port字段,让broker监听不同的端口。broker默认监听的端口是9092。

在多代理下可以对一个partition创建副本,可以使用Kafka-topics.sh脚本的”—describe”参数来检查哪个broker是partition的leader:

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-name

在单节点多代理下创建生产者和消费者的方式和在单节点单代理的场景下相同

修改已有Topic的配置

修改主题使用Kafka-topics.sh脚本来完成

1
2
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic-name 
--partitions count

这个命令可以修改topic-name的分区数量和备份数量。

删除Topic

1
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic-name

注意,如果配置delete.topic.enable为false,则delete操作不会生效。