Broker 是什么?
Kafka 控制器组件(Controller) 即Broker, 是Kafka 的核心组件。 它的主要作用是在ZooKeeper 的帮助下管理和协调整个Kafka 集群。集群中任意一台Broker 都能充当控制器的角色,但是在运行过程中,只能有一个Broker 成为控制器,来执行管理和协调的职责。
配置解释
- listeners:监听器列表,就是建立监听一个通道,别人能够通过这个通道跟你沟通,所以需要设置 IP:Port。
- advertised.listeners:公布监听器,就是让Brokers和Clients们都能够知道的监听器。你想想看,listeners是Broker用来监听网络请求的,那么,其他Broker或者客户端想要与它通信,则需要知道具体的IP:PORT吧?所以,为了让别人知道自己的监听器,那么就需要公开出去,这个公开的形式是通过zk来共享数据。这类需要注意几点:
(1)默认情况下,advertised.listeners不设置会自动使用listeners属性。
(2)advertised.listeners不支持0.0.0.0这种形式, 所以如果listeners属性设置成0.0.0.0,则必须设置advertised.listeners属性。因为0.0.0.0是表示的是监听Broker上任意的网卡的, 你将这个发布出去,那么别的Broker和客户端怎么知道你具体的ip和端口呢?
(3)可以同时配置多个, 并且用逗号隔开。 - listener.security.protocol.map:监听器名称和安全协议之间的映射关系集合。现有的安全协议有4种, 分别如下,
(1)PLAINTEXT => PLAINTEXT 不需要授权,非加密通道
(2)SSL => SSL 使用SSL加密通道
(3)SASL_PLAINTEXT => SASL_PLAINTEXT 使用SASL认证非加密通道
(4)SASL_SSL => SASL_SSL 使用SASL认证并且SSL加密通道
然你也可以自己重新映射监听器名称和安全协议, 格式:监听名称1:安全协议1,监听名称2:安全协议2
- inter.broker.listener.name:用于Broker之间通信的listener的名称。如果未设置,则listener名称由 security.inter.broker.protocol定义(security.inter.broker.protocol默认值是PLAINTEXT)。同时设置 这个和 security.inter.broker.protocol 属性是错误的。
基础安全
没有安全防护的基础设施容易沦为肉鸡或者矿机,近几年爆出 Redis、MongoDB 等中间件被入侵事件,皆是在公共网络上没有设置密码防护所致。
我们先来设置一般安全的防护机制——SASL_PLAINTEXT,在客户端连接 Kafka 节点前需要密码验证,但是整个通讯链路上的数据传输是明文,不加密。
先看看 docker-compose.yaml:
version: '3' services: zookeeper: image: 'bitnami/zookeeper:3.7.1' # 无需映射出来 #ports: # - '2181:2181' environment: - ZOO_ENABLE_AUTH=yes - ZOO_SERVER_USERS=user - ZOO_SERVER_PASSWORDS=pass123 - ZOO_CLIENT_USER=user - ZOO_CLIENT_PASSWORD=pass123 kafka: image: 'bitnami/kafka:2.8.1' ports: - '9093:9093' environment: # 监听器的 CLIENT 上不要设置 9092,否则日志一直输出下面信息: # INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Failed authentication with /10.0.0.2 (SSL handshake failed) - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CLIENT://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,CLIENT://49.99.213.220:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT # 用于 broker 间通信的 SASL 机制 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN # 允许使用明文监听,出于安全原因,Bitnami Apache Kafka docker 镜像禁用了 PLAINTEXT 侦听器,但可以通过下面方式开启 - ALLOW_PLAINTEXT_LISTENER=no - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper3:2181 #Zookeeper credentials - KAFKA_ZOOKEEPER_PROTOCOL=SASL # 可选值有:PLAINTEXT, SASL, SSL, and SASL_SSL. 默认值: PLAINTEXT - KAFKA_ZOOKEEPER_USER=user - KAFKA_ZOOKEEPER_PASSWORD=pass123 #Inter broker credentials - KAFKA_INTER_BROKER_USER=interbrokeruser # Kafka 内部节点通信的用户名,默认值:user - KAFKA_INTER_BROKER_PASSWORD=interbrokerpass # Kafka 内部节点通信的密码,默认值:bitnami #Client credentials(配置 SASL 认证时,使用下面两个变量来配置用户名和密码) - KAFKA_CLIENT_USERS=clientuser1 # 使用 SASL 模式处理客户端通信时创建的用户名,用逗号隔开。 - KAFKA_CLIENT_PASSWORDS=pass123 # 使用 SASL 模式处理客户端通信时创建的密码,用逗号隔开。 depends_on: - zookeeper
ZooKeeper 启用身份认证,配置服务端认证和客户端认证;Kafka 指定内部和外部监听者使用 SASL_PLAINTEXT模式,设置密码。
成功运行后,生产者或消费者与 Kafka 连接时,都必须设置 SASL_PLAINTEXT 模式,同时配置客户端用户密码,来看看 SpringBoot 中的生产者和消费者是如何设置的。
从 application.properties 摘录的关键配置:
spring.kafka.bootstrap-servers=49.99.213.220:9093 # 如果代码中启用了生产者,则必须配置 SASL_PLAINTEXT,否则 producer 会一直打印错误日志 spring.kafka.producer.properties.sasl.mechanism=PLAIN spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"clientuser1\" password=\"pass123\"; # 如果代码中启用了消费者,则必须配置 SASL_PLAINTEXT,否则 consumer 会一直打印错误日志 spring.kafka.consumer.properties.sasl.mechanism=PLAIN spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"clientuser1\" password=\"pass123\";
安全提升
SASL_PLAINTEXT 模式虽然有身份验证这一关,但是传输不加密,在传输过程中容易被人窃取密码等关键信息,Kafka 为此创造更安全的 SASL_SSL 模式。其实就是对传输用 TLS/SSL 协议加密。
首先要制作安全证书,bitnami kafka docker 官网提供了一个脚本便于制作(搜 kafka-generate-ssl.sh ),不过得先安装好 keytool,没有的话,在 ubuntu 上可执行 apt install openjdk-8-jre-headless
一件安装。在看这个脚本前请查阅数字证书与证书工具的使用,熟悉安全证书概念和证书工具的基本用法。
这个脚本分为两个环节:
- 生成信任库和私钥;
- 生成密钥库,导入签名证书
这个脚本可以多次运行,第一次运行遇到提示“Do you need to generate a trust store and associated private key?”,选“y”,完成1和2环节;其他时候运行,选“n”,完成2环节。
遇到提示设置密码时,使用相同密码,对应于 YAML 文件中的 KAFKA_CERTIFICATE_PASSWORD。
脚本也注释说明不同 Kafka 节点或者客户端需要不同的密钥库。
脚本最终用来生成2个密钥库 keystore/kafka.keystore.jks 和 truststore/kafka.truststore.jks。
编写 YAML 文件:
version: '3' services: zookeeper: image: 'bitnami/zookeeper:3.7.1' # 无需映射出来 #ports: # - '2181:2181' environment: - ZOO_ENABLE_AUTH=yes - ZOO_SERVER_USERS=user - ZOO_SERVER_PASSWORDS=pass123 - ZOO_CLIENT_USER=user - ZOO_CLIENT_PASSWORD=pass123 kafka: image: 'bitnami/kafka:2.8.1' ports: - '9093:9093' environment: # 监听器的 CLIENT 上不要设置 9092,否则日志一直输出下面信息: # INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Failed authentication with /10.0.0.2 (SSL handshake failed) - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CLIENT://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka2:9092,CLIENT://47.100.218.220:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_SSL,CLIENT:SASL_SSL - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # 用于 broker 间通信的 SASL 机制 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256 # 允许使用明文监听,出于安全原因,Bitnami Apache Kafka docker 镜像禁用了 PLAINTEXT 侦听器,但可以通过下面方式开启 - ALLOW_PLAINTEXT_LISTENER=no - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper3:2181 #- KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD=123456 #- KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD=123456 # 禁用主机名验证,这样我们就不用去设置各个容器的主机名。变量 ALGORITHM 是个空值,目的就是设置一个空字符串。 - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=${ALGORITHM} #Zookeeper credentials - KAFKA_ZOOKEEPER_PROTOCOL=SASL # 可选值有:PLAINTEXT, SASL, SSL, and SASL_SSL. 默认值: PLAINTEXT - KAFKA_ZOOKEEPER_USER=user - KAFKA_ZOOKEEPER_PASSWORD=pass123 #Inter broker credentials - KAFKA_INTER_BROKER_USER=interbrokeruser # Kafka 内部节点通信的用户名,默认值:user - KAFKA_INTER_BROKER_PASSWORD=interbrokerpass # Kafka 内部节点通信的密码,默认值:bitnami #Client credentials(配置 SASL 认证时,使用下面两个变量来配置用户名和密码) - KAFKA_CLIENT_USERS=clientuser1 # 使用 SASL 模式处理客户端通信时创建的用户名,用逗号隔开。 - KAFKA_CLIENT_PASSWORDS=pass123 # 使用 SASL 模式处理客户端通信时创建的密码,用逗号隔开。 - KAFKA_CERTIFICATE_PASSWORD=123456 - KAFKA_TLS_TYPE=JKS # 或者 PEM volumes: - "/root/keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks" - "/root/truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks" depends_on: - zookeeper
此配置 kafka 容器可以正常起来不报错,但 SpringBoot 中的生产、消费端始终无法连上去,暂时还未解决。
KRaft 模式
KRaft 模式摆脱了 Zookeeper 的束缚,让 Kafka 更轻量级、资源更高效利用。Kafka 3.3 版本正式宣布 KRaft 处于生产可用状态,但是安全模块还未完善,所以还不推荐生产使用。后续版本的 Kafka 可能将弃用 Zookeeper 模式。下面介绍开发环境如何使用 KRaft 模式,后续等 KRaft 的安全模块成熟后更新本文。
version: "3" services: kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_ENABLE_KRAFT=yes - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_BROKER_ID=1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 - ALLOW_PLAINTEXT_LISTENER=yes
与 Zookeeper 模式对比,KRaft 模式的设置简单很多,只需要 Kafka,因为它既可以当节点,又可以当控制器。这是更先进的模式。
KRaft 模式的配置文件参考 kraft/server.properties