kafka 安全

By | 2023年2月11日

Broker 是什么?

Kafka 控制器组件(Controller) 即Broker, 是Kafka 的核心组件。 它的主要作用是在ZooKeeper 的帮助下管理和协调整个Kafka 集群。集群中任意一台Broker 都能充当控制器的角色,但是在运行过程中,只能有一个Broker 成为控制器,来执行管理和协调的职责。

配置解释

  • listeners:监听器列表,就是建立监听一个通道,别人能够通过这个通道跟你沟通,所以需要设置 IP:Port。
  • advertised.listeners:公布监听器,就是让Brokers和Clients们都能够知道的监听器。你想想看,listeners是Broker用来监听网络请求的,那么,其他Broker或者客户端想要与它通信,则需要知道具体的IP:PORT吧?所以,为了让别人知道自己的监听器,那么就需要公开出去,这个公开的形式是通过zk来共享数据。这类需要注意几点:
    (1)默认情况下,advertised.listeners不设置会自动使用listeners属性。
    (2)advertised.listeners不支持0.0.0.0这种形式, 所以如果listeners属性设置成0.0.0.0,则必须设置advertised.listeners属性。因为0.0.0.0是表示的是监听Broker上任意的网卡的, 你将这个发布出去,那么别的Broker和客户端怎么知道你具体的ip和端口呢?
    (3)可以同时配置多个, 并且用逗号隔开。
  • listener.security.protocol.map:监听器名称和安全协议之间的映射关系集合。现有的安全协议有4种, 分别如下,
    (1)PLAINTEXT => PLAINTEXT 不需要授权,非加密通道
    (2)SSL => SSL 使用SSL加密通道
    (3)SASL_PLAINTEXT => SASL_PLAINTEXT 使用SASL认证非加密通道
    (4)SASL_SSL => SASL_SSL 使用SASL认证并且SSL加密通道
    然你也可以自己重新映射监听器名称和安全协议, 格式:监听名称1:安全协议1,监听名称2:安全协议2
  • inter.broker.listener.name:用于Broker之间通信的listener的名称。如果未设置,则listener名称由 security.inter.broker.protocol定义(security.inter.broker.protocol默认值是PLAINTEXT)。同时设置 这个和 security.inter.broker.protocol 属性是错误的。

基础安全

没有安全防护的基础设施容易沦为肉鸡或者矿机,近几年爆出 Redis、MongoDB 等中间件被入侵事件,皆是在公共网络上没有设置密码防护所致。

我们先来设置一般安全的防护机制——SASL_PLAINTEXT,在客户端连接 Kafka 节点前需要密码验证,但是整个通讯链路上的数据传输是明文,不加密。

先看看 docker-compose.yaml:

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3.7.1'
    # 无需映射出来
    #ports:
    #  - '2181:2181'
    environment:
      - ZOO_ENABLE_AUTH=yes
      - ZOO_SERVER_USERS=user
      - ZOO_SERVER_PASSWORDS=pass123
      - ZOO_CLIENT_USER=user
      - ZOO_CLIENT_PASSWORD=pass123
  kafka:
    image: 'bitnami/kafka:2.8.1'
    ports:
      - '9093:9093'
    environment:
      # 监听器的 CLIENT 上不要设置 9092,否则日志一直输出下面信息:
      # INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Failed authentication with /10.0.0.2 (SSL handshake failed) 
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CLIENT://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,CLIENT://49.99.213.220:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT
      # 用于 broker 间通信的 SASL 机制
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # 允许使用明文监听,出于安全原因,Bitnami Apache Kafka docker 镜像禁用了 PLAINTEXT 侦听器,但可以通过下面方式开启
      - ALLOW_PLAINTEXT_LISTENER=no
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper3:2181
      #Zookeeper credentials
      - KAFKA_ZOOKEEPER_PROTOCOL=SASL     # 可选值有:PLAINTEXT, SASL, SSL, and SASL_SSL. 默认值: PLAINTEXT
      - KAFKA_ZOOKEEPER_USER=user
      - KAFKA_ZOOKEEPER_PASSWORD=pass123
      #Inter broker credentials
      - KAFKA_INTER_BROKER_USER=interbrokeruser        # Kafka 内部节点通信的用户名,默认值:user
      - KAFKA_INTER_BROKER_PASSWORD=interbrokerpass    # Kafka 内部节点通信的密码,默认值:bitnami
      #Client credentials(配置 SASL 认证时,使用下面两个变量来配置用户名和密码)
      - KAFKA_CLIENT_USERS=clientuser1       # 使用 SASL 模式处理客户端通信时创建的用户名,用逗号隔开。
      - KAFKA_CLIENT_PASSWORDS=pass123       # 使用 SASL 模式处理客户端通信时创建的密码,用逗号隔开。
    depends_on:
      - zookeeper

ZooKeeper 启用身份认证,配置服务端认证和客户端认证;Kafka 指定内部和外部监听者使用 SASL_PLAINTEXT模式,设置密码。

成功运行后,生产者或消费者与 Kafka 连接时,都必须设置 SASL_PLAINTEXT 模式,同时配置客户端用户密码,来看看 SpringBoot 中的生产者和消费者是如何设置的。

从 application.properties 摘录的关键配置:

spring.kafka.bootstrap-servers=49.99.213.220:9093

# 如果代码中启用了生产者,则必须配置 SASL_PLAINTEXT,否则 producer 会一直打印错误日志
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"clientuser1\" password=\"pass123\";

# 如果代码中启用了消费者,则必须配置 SASL_PLAINTEXT,否则 consumer 会一直打印错误日志
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"clientuser1\" password=\"pass123\";

安全提升

SASL_PLAINTEXT 模式虽然有身份验证这一关,但是传输不加密,在传输过程中容易被人窃取密码等关键信息,Kafka 为此创造更安全的 SASL_SSL 模式。其实就是对传输用 TLS/SSL 协议加密。

首先要制作安全证书,bitnami kafka docker 官网提供了一个脚本便于制作(搜 kafka-generate-ssl.sh ),不过得先安装好 keytool,没有的话,在 ubuntu 上可执行 apt install openjdk-8-jre-headless 一件安装。在看这个脚本前请查阅数字证书与证书工具的使用,熟悉安全证书概念和证书工具的基本用法。

这个脚本分为两个环节:

  1. 生成信任库和私钥;
  2. 生成密钥库,导入签名证书

这个脚本可以多次运行,第一次运行遇到提示“Do you need to generate a trust store and associated private key?”,选“y”,完成1和2环节;其他时候运行,选“n”,完成2环节。

遇到提示设置密码时,使用相同密码,对应于 YAML 文件中的 KAFKA_CERTIFICATE_PASSWORD。

脚本也注释说明不同 Kafka 节点或者客户端需要不同的密钥库。

脚本最终用来生成2个密钥库 keystore/kafka.keystore.jks 和 truststore/kafka.truststore.jks。

编写 YAML 文件:

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3.7.1'
    # 无需映射出来
    #ports:
    #  - '2181:2181'
    environment:
      - ZOO_ENABLE_AUTH=yes
      - ZOO_SERVER_USERS=user
      - ZOO_SERVER_PASSWORDS=pass123
      - ZOO_CLIENT_USER=user
      - ZOO_CLIENT_PASSWORD=pass123
  kafka:
    image: 'bitnami/kafka:2.8.1'
    ports:
      - '9093:9093'
    environment:
      # 监听器的 CLIENT 上不要设置 9092,否则日志一直输出下面信息:
      # INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Failed authentication with /10.0.0.2 (SSL handshake failed) 
      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CLIENT://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka2:9092,CLIENT://47.100.218.220:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_SSL,CLIENT:SASL_SSL
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      # 用于 broker 间通信的 SASL 机制
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256
      # 允许使用明文监听,出于安全原因,Bitnami Apache Kafka docker 镜像禁用了 PLAINTEXT 侦听器,但可以通过下面方式开启
      - ALLOW_PLAINTEXT_LISTENER=no
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper3:2181
      #- KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD=123456
      #- KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD=123456
      # 禁用主机名验证,这样我们就不用去设置各个容器的主机名。变量 ALGORITHM 是个空值,目的就是设置一个空字符串。
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=${ALGORITHM}
      #Zookeeper credentials
      - KAFKA_ZOOKEEPER_PROTOCOL=SASL     # 可选值有:PLAINTEXT, SASL, SSL, and SASL_SSL. 默认值: PLAINTEXT
      - KAFKA_ZOOKEEPER_USER=user
      - KAFKA_ZOOKEEPER_PASSWORD=pass123
      #Inter broker credentials
      - KAFKA_INTER_BROKER_USER=interbrokeruser        # Kafka 内部节点通信的用户名,默认值:user
      - KAFKA_INTER_BROKER_PASSWORD=interbrokerpass    # Kafka 内部节点通信的密码,默认值:bitnami
      #Client credentials(配置 SASL 认证时,使用下面两个变量来配置用户名和密码)
      - KAFKA_CLIENT_USERS=clientuser1         # 使用 SASL 模式处理客户端通信时创建的用户名,用逗号隔开。
      - KAFKA_CLIENT_PASSWORDS=pass123         # 使用 SASL 模式处理客户端通信时创建的密码,用逗号隔开。
      - KAFKA_CERTIFICATE_PASSWORD=123456
      - KAFKA_TLS_TYPE=JKS  # 或者 PEM
    volumes:
      - "/root/keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks"
      - "/root/truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks"
    depends_on:
      - zookeeper

此配置 kafka 容器可以正常起来不报错,但 SpringBoot 中的生产、消费端始终无法连上去,暂时还未解决。

KRaft 模式

KRaft 模式摆脱了 Zookeeper 的束缚,让 Kafka 更轻量级、资源更高效利用。Kafka 3.3 版本正式宣布 KRaft 处于生产可用状态,但是安全模块还未完善,所以还不推荐生产使用。后续版本的 Kafka 可能将弃用 Zookeeper 模式。下面介绍开发环境如何使用 KRaft 模式,后续等 KRaft 的安全模块成熟后更新本文。

version: "3"
services:
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
      - ALLOW_PLAINTEXT_LISTENER=yes

与 Zookeeper 模式对比,KRaft 模式的设置简单很多,只需要 Kafka,因为它既可以当节点,又可以当控制器。这是更先进的模式。

KRaft 模式的配置文件参考 kraft/server.properties

参考

Kafka 的 Docker 部署
Use of SASL/PLAIN in production

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注