Kafka Offset

By | 2024年2月20日

latest 和 earliest 区别

  1. earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
  2. latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。参考:kafka auto.offset.reset latest earliest 详解

因此,若消费端程序关闭了,而生产端在不停的生产数据,即使消费端使用了 latest 策略,但由于 offset 还是关闭前的,因此程序一开就会从 关闭前的 offset开始消费。

若要重启程序后从最新的开始消费,不处理前面因关闭程序导致堆积的消息,启动程序后消费端可以立即执行 seek_to_end 方法,python3参考:

from kafka3 import KafkaConsumer

consumer = KafkaConsumer(
   'test', # 指定topic
    bootstrap_servers = "127.0.0.1:9092", # kafka集群地址
    group_id = "newConsumerTest1", # 消费组id
    client_id = '8eaa8c81edfd41f28a50f9121ad14572',
    auto_offset_reset="latest"
    max_poll_records=10, # 每次最大消费数量
    enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
    auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
)

# 使用seek_to_end函数,seek_to_end会直接将位置定位到最新数据。
# 但是在之前需要poll一次数据,不然会报没有分配partition的错误:No partitions are currently assigned。
# 这说明我们的框架也是懒加载的,只有在具体poll数据的时候才会分配partition。
# seek_to_end 后自动提交的 offset 也会跟上来!
res = consumer.poll(10)
consumer.seek_to_end()

for msg in consumer: # 迭代器,等待下一条消息,没有消息时会阻塞
    offset, value = msg.offset, msg.value.decode('utf-8')
    print value

offset 的作用和意义

offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。

offset 的作用主要有两个:

  • 一是用来定位消息。通过指定 offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。
  • 二是用来记录消费进度。消费者在消费完一条消息后,需要提交 offset 来告诉 Kafka broker 自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的 offset 来恢复消费状态。

推荐参考:一文教你理解Kafka offset

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注