kafka 负载均衡(分区)

By | 2024年1月16日

Topic 指定分区

每个主题的分区数可以在创建主题时手动指定,若不指定则由 Kafka 配置文件中的配置参数 num.partitions 决定(默认为 1)。主题的分区数一旦确定便只可增不可减。

我们也可以不手动创建topic,在执行代码 kafkaTemplate.send(“topic1”, message) 发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。

SpringBoot 手动创建多个分区的 Topic:

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaInitialConfiguration {

    // 创建8分区1副本的 topic
    @Bean
    public NewTopic mytopic1() {
        return new NewTopic("mytopic1",8, (short) 1 );
    }

    // 创建11分区1副本的 topic
    @Bean
    public NewTopic mytopic2() {
        return new NewTopic("mytopic2",11, (short) 1 );
    }
}

使用 NewTopic 创建 Topic 一直 KafkaAdmin 上报错,搞了一天,最终醒悟:没有配 spring.kafka.admin 导致的,原来 Kafka 客户端不仅有 producter、consumer 角色,还有 admin 角色,admin 角色才可以创建 topic,SASL_PLAINTEXT 配置参考:

spring.kafka.admin.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.admin.properties.sasl.mechanism=PLAIN
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"yyy\";
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"yyy\";
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"yyy\";

生产者层面的负载均衡

生产者发送消息时,代码上可以指定 partition 和 key:

  • 若指定 partition 值,则不会走分区器;
  • 在没有指明 partition 值,但有key的情况下,它将 key 的 hash 值与 topic 的 partition 数进行取余得到partition值,相同 key 的消息会在同一个分区里,这保证了消息的顺序性。
  • 在既没有分区值也没有key的情况下,Kafka采用粘性分区器,随机选择一个分区,并尽可能一直使用该分区,并且每一个分区都有一个batch,待该分区的batch(16K)已满或者时间范围到了,默认0ms,Kafka再随机一个分区进行使用。

若一个 topic 设置了2个分区,要均匀的投递消息到这两个分区上,那么可以设一个计数器变量 counter,然后计算 counter%2 值,值只能是 0 或 1,不会大于分区数,因此可以直接将值作为 partition 值,发送消息时带上此值。

消费者层面的负载均衡

Kafka 中主题订阅者的基本单位是消费者组,每个分区只能由消费者组中的一个消费者进行消费,多个消费者组之间对于分区的消费互不影响。Kafka 在消费者层面的负载均衡(再平衡)是以消费者组为单位展开的,消费者组中消费者数量的增减都会使得 Kafka 将分区副本按照一定规则重新分配到消费者组中。这里需要注意的是,不同消费者组之前的分区重分配是互不影响,独立进行的。

若消费者组中有一个挂掉了,其他几个2、3秒内就会自动再平衡。

Kafka 提供了三个开箱即用的分区分配器:RangeAssignor(默认)、RoundRobinAssignor 以及 StickyAssignor。下面我将逐一介绍这三种分区分配器的负载均衡策略。

RangeAssignor分配策略

原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

为了更加通俗的讲解RangeAssignor策略,我们不妨再举一些示例。假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3

这样分配的很均匀,那么此种分配策略能够一直保持这种良好的特性呢?我们再来看下另外一种情况。假设上面例子中2个主题都只有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2

可以明显的看到这样的分配并不均匀,如果将类似的情形扩大,有可能会出现部分消费者过载的情况。对此我们再来看下另一种RoundRobinAssignor策略的分配效果如何。

RoundRobinAssignor分配策略

将消费者组内所有消费者订阅的所有主题的分区按照字典序排序,之后以轮询的方式分配到每个消费者中,如果某个消费者没有订阅对应的主题,那么则不会将分区分配给它。

可以注意到,RoundRobinAssignor 在一个消费者组内所有消费者都订阅了同样主题的情况下是均匀的,在这种条件下避免了 RangeAssignor 的分配不均的问题。若组内有订阅其他主题的消费者,为了保证均匀,可以直接踢出(即换一个组名)

kafka-python 中可以试用 partition_assignment_strategy 来指定分配器,参考:Python KafkaConsumer 参数

其他分配器的详细说明请看:Kafka 是如何实现负载均衡的?

分区数与消费者数

如果分区数大于或等于组中的消费者数,那么一个消费者会负责多个分区;如果分区数小于消费者数,有些消费者就处于空闲状态。

当分区数大于消费者数时,在工作中出现了丢失数据或kafka分区中数据晚消费情况:工作中配置了kafka分区100个,storm spout(消费者) 配了30个,此时发现日志会丢失,或者很晚很晚才才被消费(应该是触发再平衡了)。将 kafka分区数与消费者数改成一样后,问题解决

参考

Kafka 是如何实现负载均衡的?
SpringBoot整合kafka之kafka分区实战
SpringBoot集成kafka全面实战

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注