boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

使用 Reactor Kafka 消费指定范围消息后停止 Consumer


avatar
作者 2025年9月14日 9

使用 Reactor Kafka 消费指定范围消息后停止 Consumer

本文介绍了如何使用 reactor kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。

在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。

代码示例

import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.support.Acknowledgment; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverPartition; import reactor.kafka.receiver.ReceiverRecord;  import Java.time.Duration; import java.util.Collections; import java.util.map;  public class KafkaConsumerExample {      public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) {         TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition          // 配置 Consumer 属性         Map<String, Object> consumerProps = Map.of(                 "bootstrap.servers", bootstrapServers,                 "group.id", groupId,                 "key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,                 "value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,                 "auto.offset.reset", "earliest" // 从最早的 Offset 开始消费         );          // 创建 ReceiverOptions         ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)                 .subscription(Collections.singleton(topic))                 .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));          // 创建 ReactiveKafkaConsumerTemplate         ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer = new ReactiveKafkaConsumerTemplate<>(receiverOptions);          // 消费消息并停止 Consumer         return kafkaConsumer                 .receive()                 .flatMap(record -> {                     // 获取当前 Partition 的最新 Offset                     Mono<Map<TopicPartition, Long>> endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition)));                      return endOffsetsMono.map(topicPartitionToLastOffset -> {                         long lastOffset = topicPartitionToLastOffset.get(topicPartition);                         return new RecordWithLastOffset(record, lastOffset);                     });                 })                 .takeUntil(recordWithLastOffset -> recordWithLastOffset.record.offset() >= (recordWithLastOffset.lastOffset - 1))                 .subscribe(recordWithLastOffset -> {                     ReceiverRecord<String, String> record = recordWithLastOffset.record;                     Acknowledgment acknowledgment = record.receiverOffset();                      System.out.printf("Received message: topic-partition=%s offset=%d key=%s value=%sn",                             acknowledgment.topicPartition(),                             acknowledgment.offset(),                             record.key(),                             record.value());                      acknowledgment.acknowledge();                 });     }      private static class RecordWithLastOffset {         private final ReceiverRecord<String, String> record;         private final long lastOffset;          public RecordWithLastOffset(ReceiverRecord<String, String> record, long lastOffset) {             this.record = record;             this.lastOffset = lastOffset;         }     }       public static void main(String[] args) {         String topic = "your-topic-name";         String groupId = "your-group-id";         String bootstrapServers = "localhost:9092";          KafkaConsumerExample example = new KafkaConsumerExample();         Disposable disposable = example.consumeMessages(topic, groupId, bootstrapServers);          // 保持程序运行一段时间,以便消费消息         try {             Thread.sleep(10000);         } catch (InterruptedException e) {             e.printStackTrace();         }          // 取消订阅,停止消费         disposable.dispose();     } }

代码解释

使用 Reactor Kafka 消费指定范围消息后停止 Consumer

Ajelix

处理Excel和GoogleSheets表格的AI工具

使用 Reactor Kafka 消费指定范围消息后停止 Consumer44

查看详情 使用 Reactor Kafka 消费指定范围消息后停止 Consumer

  1. 配置 Consumer 属性: 设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = earliest 确保从 Topic 的起始位置开始消费。
  2. 创建 ReceiverOptions: 使用配置的 Consumer 属性创建 ReceiverOptions,并通过 subscription 指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过 seekToBeginning 将 Consumer 的 Offset 重置到起始位置。
  3. 创建 ReactiveKafkaConsumerTemplate: 使用 ReceiverOptions 创建 ReactiveKafkaConsumerTemplate,用于消费 Kafka 消息。
  4. 消费消息并停止 Consumer:
    • kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux<ReceiverRecord<String, String>>。
    • flatMap: 对于每个接收到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。
    • map: 将ReceiverRecord和获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。
    • takeUntil: 使用 takeUntil 操作符,当消费到最新 Offset 的前一个位置时,停止消费。record.offset() >= (lastOffset – 1) 判断当前消息的 Offset 是否已经达到或超过了最新 Offset 的前一个位置。
    • subscribe: 订阅 Flux,处理接收到的消息。在 subscribe 方法中,可以执行消息处理逻辑,并使用 record.receiverOffset().acknowledge() 提交 Offset。
  5. 取消订阅: 使用 disposable.dispose() 取消订阅,停止 Consumer。

注意事项

  • 示例代码假设 Topic 只有一个 Partition。如果 Topic 有多个 Partition,需要根据实际情况进行调整。
  • endOffsets 方法返回的是一个 Map<TopicPartition, Long>,其中 Long 值是每个 Partition 的最新 Offset。
  • Offset 的提交方式有多种,示例代码中使用的是手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。
  • 在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。

总结

通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。



评论(已关闭)

评论已关闭