latest 和 earliest 区别
- earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
- latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。参考:kafka auto.offset.reset latest earliest 详解
因此,若消费端程序关闭了,而生产端在不停的生产数据,即使消费端使用了 latest 策略,但由于 offset 还是关闭前的,因此程序一开就会从 关闭前的 offset开始消费。
若要重启程序后从最新的开始消费,不处理前面因关闭程序导致堆积的消息,启动程序后消费端可以立即执行 seek_to_end 方法,python3参考:
from kafka3 import KafkaConsumer consumer = KafkaConsumer( 'test', # 指定topic bootstrap_servers = "127.0.0.1:9092", # kafka集群地址 group_id = "newConsumerTest1", # 消费组id client_id = '8eaa8c81edfd41f28a50f9121ad14572', auto_offset_reset="latest" max_poll_records=10, # 每次最大消费数量 enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交) auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒) ) # 使用seek_to_end函数,seek_to_end会直接将位置定位到最新数据。 # 但是在之前需要poll一次数据,不然会报没有分配partition的错误:No partitions are currently assigned。 # 这说明我们的框架也是懒加载的,只有在具体poll数据的时候才会分配partition。 # seek_to_end 后自动提交的 offset 也会跟上来! res = consumer.poll(10) consumer.seek_to_end() for msg in consumer: # 迭代器,等待下一条消息,没有消息时会阻塞 offset, value = msg.offset, msg.value.decode('utf-8') print value
offset 的作用和意义
offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。
offset 的作用主要有两个:
- 一是用来定位消息。通过指定 offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。
- 二是用来记录消费进度。消费者在消费完一条消息后,需要提交 offset 来告诉 Kafka broker 自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的 offset 来恢复消费状态。
推荐参考:一文教你理解Kafka offset