本文介绍了如何使用 reactor kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。
在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。
代码示例
import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.support.Acknowledgment; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverPartition; import reactor.kafka.receiver.ReceiverRecord; import Java.time.Duration; import java.util.Collections; import java.util.map; public class KafkaConsumerExample { public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) { TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition // 配置 Consumer 属性 Map<String, Object> consumerProps = Map.of( "bootstrap.servers", bootstrapServers, "group.id", groupId, "key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class, "value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class, "auto.offset.reset", "earliest" // 从最早的 Offset 开始消费 ); // 创建 ReceiverOptions ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps) .subscription(Collections.singleton(topic)) .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning)); // 创建 ReactiveKafkaConsumerTemplate ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer = new ReactiveKafkaConsumerTemplate<>(receiverOptions); // 消费消息并停止 Consumer return kafkaConsumer .receive() .flatMap(record -> { // 获取当前 Partition 的最新 Offset Mono<Map<TopicPartition, Long>> endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition))); return endOffsetsMono.map(topicPartitionToLastOffset -> { long lastOffset = topicPartitionToLastOffset.get(topicPartition); return new RecordWithLastOffset(record, lastOffset); }); }) .takeUntil(recordWithLastOffset -> recordWithLastOffset.record.offset() >= (recordWithLastOffset.lastOffset - 1)) .subscribe(recordWithLastOffset -> { ReceiverRecord<String, String> record = recordWithLastOffset.record; Acknowledgment acknowledgment = record.receiverOffset(); System.out.printf("Received message: topic-partition=%s offset=%d key=%s value=%sn", acknowledgment.topicPartition(), acknowledgment.offset(), record.key(), record.value()); acknowledgment.acknowledge(); }); } private static class RecordWithLastOffset { private final ReceiverRecord<String, String> record; private final long lastOffset; public RecordWithLastOffset(ReceiverRecord<String, String> record, long lastOffset) { this.record = record; this.lastOffset = lastOffset; } } public static void main(String[] args) { String topic = "your-topic-name"; String groupId = "your-group-id"; String bootstrapServers = "localhost:9092"; KafkaConsumerExample example = new KafkaConsumerExample(); Disposable disposable = example.consumeMessages(topic, groupId, bootstrapServers); // 保持程序运行一段时间,以便消费消息 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } // 取消订阅,停止消费 disposable.dispose(); } }
代码解释
- 配置 Consumer 属性: 设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = earliest 确保从 Topic 的起始位置开始消费。
- 创建 ReceiverOptions: 使用配置的 Consumer 属性创建 ReceiverOptions,并通过 subscription 指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过 seekToBeginning 将 Consumer 的 Offset 重置到起始位置。
- 创建 ReactiveKafkaConsumerTemplate: 使用 ReceiverOptions 创建 ReactiveKafkaConsumerTemplate,用于消费 Kafka 消息。
- 消费消息并停止 Consumer:
- kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux<ReceiverRecord<String, String>>。
- flatMap: 对于每个接收到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。
- map: 将ReceiverRecord和获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。
- takeUntil: 使用 takeUntil 操作符,当消费到最新 Offset 的前一个位置时,停止消费。record.offset() >= (lastOffset – 1) 判断当前消息的 Offset 是否已经达到或超过了最新 Offset 的前一个位置。
- subscribe: 订阅 Flux,处理接收到的消息。在 subscribe 方法中,可以执行消息处理逻辑,并使用 record.receiverOffset().acknowledge() 提交 Offset。
- 取消订阅: 使用 disposable.dispose() 取消订阅,停止 Consumer。
注意事项
- 示例代码假设 Topic 只有一个 Partition。如果 Topic 有多个 Partition,需要根据实际情况进行调整。
- endOffsets 方法返回的是一个 Map<TopicPartition, Long>,其中 Long 值是每个 Partition 的最新 Offset。
- Offset 的提交方式有多种,示例代码中使用的是手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。
- 在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。
总结
通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。
评论(已关闭)
评论已关闭