内容索引
1 kafka
1.1 kafka安装
首先去 Apache kafka 官网http://kafka.apache.org下载 kafka,这里我下载的是 kafka_2.12-2.1.0.tgz。
然后解压文件,修改 config/server.properties 文件,将其中的 advertised.listeners 改为你主机IP,例如:
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://192.168.1.220:9092
改好后,执行下面两条语句就可以启动kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties &
说明:advertised.listeners 是 broker 给 producer 和 consumer 连接用的,如果没有设置,就使用 listeners,而如果主机IP没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。
1.2 kafka docker 安装
这里涉及 zookeeper/kafka/kafkamanager 三个服务的安装。
falkafka-compose.yml
version: '3.6' services: falzookeeper: image: 192.168.1.166:8083/zookeeper-falsec volumes: - "zoodata:/data" - "zoodatalog:/datalog" - "zoologs:/logs" networks: - falnet deploy: placement: constraints: - node.role == worker falkafka: image: 192.168.1.166:8083/kafka-falsec volumes: - "kafka:/kafka" depends_on: - falzookeeper networks: - falnet ports: - "9092:9092" deploy: placement: constraints: - node.role == worker falkafkamanager: image: hlebalbau/kafka-manager:latest depends_on: - falzookeeper ports: - "9000:9000" environment: ZK_HOSTS: "falzookeeper:2181" APPLICATION_SECRET: "random-secret" command: -Dpidfile.path=/dev/null networks: - falnet deploy: placement: constraints: - node.role == worker volumes: zoodata: zoodatalog: zoologs: kafka: networks: falnet: external: true name: falpolicy_falnet
1.3 kafka的使用
# 创建topic ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic zipkin # 查看topic列表 ./kafka-topics.sh -list -zookeeper 127.0.0.1:2181 # 创建生产者 ./kafka-console-producer.sh --broker-list 192.168.30.60:9092 --topic test # 创建消费 ./kafka-console-consumer.sh --bootstrap-server 192.168.30.60:9092 --topic test --from-beginning # 有了生产者和消费者,就可以模拟生产者发送消息,消费者接收消息了,试试吧。
数据存储位置
默认存储在:/tmp/kafka-logs/
注:如果由原来的新版本降级到老版本,但新版本上 kafka-logs 目录里的数据没删除,可能会报错导致无法启动kafka。
除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。
查看主题(topic)详细信息
$ bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
发送消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >This is a message >This is another message
消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
如果你发送消息,就可以在这里接收到了。
提示:没有 my-replicated-topic,脚本居然会自动创建出来这个topic。
1.4 发布/订阅、点对点的使用
- 与ActiveMQ不同,kafka使用 group 来解决这两个功能。同一个group里的多个消费者,只有一个会消费数据;如果要多次消费同一个数据,类似与发布/订阅模式,
消费者需要再创建不同的group。 - 一个group里可以包含多个消费者进程,而不是单个进程。也就是说我可以创建多个storm实例,每个storm都使用同一个group,但不会重复消费数据。
1.5 offset
在 kafka 中,读写topic,都涉及offset,关于 kafka 的 offset 主要有以下几个概念,如下图:
其中,Last Committed Offset 和 Current Position 是与 Consumer Client 有关,High Watermark 和 Log End Offset 与 Producer Client 数据写入和 replica 之间的数据同步有关。
- Last Committed Offset:这是 group 最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了;
- Current Position:group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit;
- Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;
- High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中,这部分数据被认为是不安全的,是不允许 Consumer 消费的(这里说得不是很准确,可以参考:Kafka水位(high watermark)与leader epoch的讨论 这篇文章)。
1.6 允许外网访问
需要设置 advertised.listeners 的 EXTERNAL 为主机IP地址,如 192.168.1.230。
1.7 Kafka分区数是否越多越好?
1.8 FAQ
- Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
解决:1. 删除 /tmp/kafka-logs; 2. netstat -lnp|grep 9092;并且 kill pid; 3. 重启
2 kafka-manager
2.1 下载zip文件方式安装
在第一节,我们使用docker compose文件安装了 kafka-manager,这里我们再说说手动方式安装。
2.1.1 下载编译
cd kafka-manager ./sbt clean dist
完成后,在target目录下我们可以看到 kafka-manager kafka-manager-1.3.0.8.zip
注:java不能只安装jre,否则执行sbt命令会有问题。centos7上默认仅装了jre,我重新装了oracle jdk1.8才执行成功。
2.1.2 安装与配置
cd target/universal/ unzip kafka-manager-1.3.3.7.zip cd kafka-manager-1.3.3.7 vi conf/application.conf
在conf/application.conf
中将kafka-manager.zkhosts
的值设置为你的Kafka的 zookeeper 服务器,你还可以通过环境变量ZK_HOSTS配置这个参数值:
————————————————————————
# kafka-manager.zkhosts=”kafka-manager-zookeeper:2181″
# kafka-manager.zkhosts=${?ZK_HOSTS}
kafka-manager.zkhosts="192.168.1.220:2181,192.168.1.230:2181"
————————————————————————
2.1.3 启动
$ /home/admin/dev/kafka/kafka-manager-1.3.3.18/target/universal/kafka-manager-1.3.3.18/bin/kafka-manager
默认情况下端口为9000
,你还可以通过下面的命令指定配置文件和端口:
$ /home/admin/dev/kafka/kafka-manager-1.3.3.18/target/universal/kafka-manager-1.3.3.18/bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080
运行界面:http://192.168.1.220:9000/
第一次进入web UI要进行kafka cluster的配置,都是一些比较简单的操作,这里就不多说了。
2.2 cluster配置
安装好后,首先要做的就是配置 cluster,这其实很简答,设置下面两个地方就可以跑起来了。
2.2.1 创建cluster
添加的cluster信息存储在 zoomkeeper 的 /data
数据目录下,如果将此目录删除,cluster需要重新配置。
注:由于我的 kafka和 kafka-manager使用的swarm docker部署,在同一个 overlay网络下,因此直接用docker的名称falzookeeper
作为host。
2.2.2 启用 Poll consumer information
这一步的目的是用来实时查看kafka接收到的消息数量,方便调试。
从下面这张图可以看到,当前发往kafka的消息总数为1200,而且还在不断增加中。
2.2.3 删除Topic
在 2.2.3 节的图片中,有个 “Delete Topic” 的按钮,点击后,我发现 kafka存储这个topic的文件夹被标记为delete
,过一段时间后就自动删除了。