Kafka 笔记

By | 2021年12月31日

1 kafka

1.1 kafka安装

首先去 Apache kafka 官网http://kafka.apache.org下载 kafka,这里我下载的是 kafka_2.12-2.1.0.tgz。
然后解压文件,修改 config/server.properties 文件,将其中的 advertised.listeners 改为你主机IP,例如:

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.1.220:9092

改好后,执行下面两条语句就可以启动kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &

说明:advertised.listeners 是 broker 给 producer 和 consumer 连接用的,如果没有设置,就使用 listeners,而如果主机IP没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。

1.2 kafka docker 安装

这里涉及 zookeeper/kafka/kafkamanager 三个服务的安装。

falkafka-compose.yml

version: '3.6'

services:
  falzookeeper:
    image: 192.168.1.166:8083/zookeeper-falsec
    volumes:
      - "zoodata:/data"
      - "zoodatalog:/datalog"
      - "zoologs:/logs"
    networks:
      - falnet
    deploy:
      placement:
        constraints:
          - node.role == worker

  falkafka:
    image: 192.168.1.166:8083/kafka-falsec
    volumes:
      - "kafka:/kafka"
    depends_on:
      - falzookeeper
    networks:
      - falnet
    ports:
      - "9092:9092"    
    deploy:
      placement:
        constraints:
          - node.role == worker

  falkafkamanager:
    image: hlebalbau/kafka-manager:latest
    depends_on:
      - falzookeeper
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "falzookeeper:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null
    networks:
      - falnet
    deploy:
      placement:
        constraints:
          - node.role == worker

volumes:
  zoodata:
  zoodatalog:
  zoologs:
  kafka:

networks:
  falnet:
    external: true
    name: falpolicy_falnet

1.3 kafka的使用

# 创建topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic zipkin

# 查看topic列表
./kafka-topics.sh -list -zookeeper 127.0.0.1:2181

# 创建生产者
./kafka-console-producer.sh --broker-list 192.168.30.60:9092 --topic test

# 创建消费
./kafka-console-consumer.sh --bootstrap-server 192.168.30.60:9092 --topic test --from-beginning

# 有了生产者和消费者,就可以模拟生产者发送消息,消费者接收消息了,试试吧。

数据存储位置
默认存储在:/tmp/kafka-logs/
注:如果由原来的新版本降级到老版本,但新版本上 kafka-logs 目录里的数据没删除,可能会报错导致无法启动kafka。

除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

查看主题(topic)详细信息
$ bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

发送消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>This is a message
>This is another message

消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

如果你发送消息,就可以在这里接收到了。
提示:没有 my-replicated-topic,脚本居然会自动创建出来这个topic。

1.4 发布/订阅、点对点的使用

  1. 与ActiveMQ不同,kafka使用 group 来解决这两个功能。同一个group里的多个消费者,只有一个会消费数据;如果要多次消费同一个数据,类似与发布/订阅模式,
    消费者需要再创建不同的group。
  2. 一个group里可以包含多个消费者进程,而不是单个进程。也就是说我可以创建多个storm实例,每个storm都使用同一个group,但不会重复消费数据。

1.5 offset

在 kafka 中,读写topic,都涉及offset,关于 kafka 的 offset 主要有以下几个概念,如下图:

其中,Last Committed Offset 和 Current Position 是与 Consumer Client 有关,High Watermark 和 Log End Offset 与 Producer Client 数据写入和 replica 之间的数据同步有关。

  • Last Committed Offset:这是 group 最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了;
  • Current Position:group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit;
  • Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;
  • High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中,这部分数据被认为是不安全的,是不允许 Consumer 消费的(这里说得不是很准确,可以参考:Kafka水位(high watermark)与leader epoch的讨论 这篇文章)。

1.6 允许外网访问

需要设置 advertised.listeners 的 EXTERNAL 为主机IP地址,如 192.168.1.230。

1.7 Kafka分区数是否越多越好?

分区越多则并发消费吞吐量越大,也意味着更多内存消耗、打开更多文件句柄、更长恢复期和端对端的延迟,前3个可接受,第4个端对端延迟指从 producer 生产消息到 customer 接收消息之间的时间段,这是由于每条数据发布到副本需要时间,在保证所有副本复制成功之后Kafka才会暴露数据,默认情况下kafka没有设置副本,因此应该不受影响。

1.8 FAQ

  •  Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    解决:

    1. 删除 /tmp/kafka-logs;
    2. netstat -lnp|grep 9092;并且 kill pid;
    3. 重启

2 kafka-manager

2.1 下载zip文件方式安装

在第一节,我们使用docker compose文件安装了 kafka-manager,这里我们再说说手动方式安装。

2.1.1 下载编译

cd kafka-manager
./sbt clean dist

完成后,在target目录下我们可以看到 kafka-manager kafka-manager-1.3.0.8.zip
注:java不能只安装jre,否则执行sbt命令会有问题。centos7上默认仅装了jre,我重新装了oracle jdk1.8才执行成功。

2.1.2 安装与配置

cd target/universal/
unzip kafka-manager-1.3.3.7.zip
cd kafka-manager-1.3.3.7
vi conf/application.conf

conf/application.conf中将kafka-manager.zkhosts的值设置为你的Kafka的 zookeeper 服务器,你还可以通过环境变量ZK_HOSTS配置这个参数值:
————————————————————————
# kafka-manager.zkhosts=”kafka-manager-zookeeper:2181″
# kafka-manager.zkhosts=${?ZK_HOSTS}
kafka-manager.zkhosts="192.168.1.220:2181,192.168.1.230:2181"
————————————————————————

2.1.3 启动

$ /home/admin/dev/kafka/kafka-manager-1.3.3.18/target/universal/kafka-manager-1.3.3.18/bin/kafka-manager默认情况下端口为9000,你还可以通过下面的命令指定配置文件和端口:
$ /home/admin/dev/kafka/kafka-manager-1.3.3.18/target/universal/kafka-manager-1.3.3.18/bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

运行界面:http://192.168.1.220:9000/

第一次进入web UI要进行kafka cluster的配置,都是一些比较简单的操作,这里就不多说了。

2.2 cluster配置

安装好后,首先要做的就是配置 cluster,这其实很简答,设置下面两个地方就可以跑起来了。

2.2.1 创建cluster

添加的cluster信息存储在 zoomkeeper 的 /data数据目录下,如果将此目录删除,cluster需要重新配置。

注:由于我的 kafka和 kafka-manager使用的swarm docker部署,在同一个 overlay网络下,因此直接用docker的名称falzookeeper作为host。

2.2.2 启用 Poll consumer information

这一步的目的是用来实时查看kafka接收到的消息数量,方便调试。


从下面这张图可以看到,当前发往kafka的消息总数为1200,而且还在不断增加中。

2.2.3 删除Topic

在 2.2.3 节的图片中,有个 “Delete Topic” 的按钮,点击后,我发现 kafka存储这个topic的文件夹被标记为delete,过一段时间后就自动删除了。

发表评论

您的电子邮箱地址不会被公开。