boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

如何在 Reactor 中向现有 Flux 引入数据并合并流


avatar
作者 2025年9月2日 8

如何在 Reactor 中向现有 Flux 引入数据并合并流

本文旨在深入探讨如何在 reactor 框架中,特别是面对由外部库提供的现有 Flux 时,有效地引入新数据并将其与现有流合并。文章将阐明直接“发射”到 Flux 的局限性,重点讲解通过创建新的数据流并使用 Flux.merge 等操作符进行合并的策略,同时强调了处理一次性订阅 Flux 的关键注意事项与解决方案。

1. 理解 Reactor Flux 的发布者特性

在 Reactor 编程模型中,Flux 和 Mono 是数据发布者(Publisher),它们负责按照 Reactive Streams 规范将数据序列发布给订阅者(Subscriber)。与传统的命令式编程中的队列或列表不同,Flux 并非一个可以直接“写入”或“发射”数据进去的容器。因此,像 aFluxMap.emit(myObj) 这样的方法在 Flux 或 Mono 接口中是不存在的。

如果你希望将自定义数据引入到响应式流中,你需要做的是创建一个 新的 发布者,由这个发布者来产生你的数据。

2. 创建自定义数据源

为了动态地向响应式流中注入数据,Reactor 提供了多种机制来创建可控制的发布者。其中最常用且灵活的方式是使用 Sinks API 或 FluxProcessor。

2.1 使用 Sinks.many() (推荐)

Sinks 是 Reactor 3.4 引入的更现代、更安全的 API,用于创建多值(Sinks.many())或单值(Sinks.one())的发布者,并提供了一个 FluxSink 类似的接口来发射数据。

import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks;  public class CustomDataSource {      // 定义一个 Sinks.Many 对象,用于发射 MyRawType 类型的数据     // 这里使用 unicast() 模式,表示只有一个订阅者     private final Sinks.Many<MyRawType> rawTypeSink = Sinks.many().unicast().onBackpressureBuffer();      // 暴露一个 Flux 供外部订阅     public Flux<MyRawType> getRawTypeFlux() {         return rawTypeSink.asFlux();     }      // 外部调用此方法来发射数据     public void emitRawType(MyRawType data) {         rawTypeSink.tryEmitNext(data).orThrow(); // 尝试发射数据,如果失败则抛出异常     }      // 示例:MyRawType 是你的原始数据类型     static class MyRawType {         String id;         // ... constructor, getters, etc.     }      public static void main(String[] args) {         CustomDataSource dataSource = new CustomDataSource();         Flux<MyRawType> myRawFlux = dataSource.getRawTypeFlux();          myRawFlux.map(raw -> {             // 模拟将 MyRawType 转换为 MappedType             System.out.println("Converting raw: " + raw.id);             return new MappedType("Mapped-" + raw.id);         }).subscribe(mapped -> System.out.println("Received MappedType: " + mapped.name));          // 动态发射数据         dataSource.emitRawType(new MyRawType("A"));         dataSource.emitRawType(new MyRawType("B"));         // ...     }      // 示例:MappedType 是外部库期望的类型     static class MappedType {         String name;         // ... constructor, getters, etc.         public MappedType(String name) { this.name = name; }     } }

2.2 使用 FluxProcessor (传统方式)

FluxProcessor 是一类特殊的 Flux,它同时实现了 Subscriber 和 Publisher 接口,可以作为数据处理链中的桥梁。UnicastProcessor 是一个常见的选择,但它有“一次性订阅”的限制(详见后续章节)。

import reactor.core.publisher.Flux; import reactor.core.publisher.UnicastProcessor; import reactor.core.publisher.FluxSink;  public class CustomDataSourceProcessor {      private final UnicastProcessor<MyRawType> myProcessor = UnicastProcessor.create();     private final FluxSink<MyRawType> mySink = myProcessor.sink();      public Flux<MyRawType> getRawTypeFlux() {         return myProcessor;     }      public void emitRawType(MyRawType data) {         mySink.next(data);     }      // MyRawType 和 MappedType 定义同上     static class MyRawType { String id; public MyRawType(String id) { this.id = id; } }     static class MappedType { String name; public MappedType(String name) { this.name = name; } }      public static void main(String[] args) {         CustomDataSourceProcessor dataSource = new CustomDataSourceProcessor();         Flux<MyRawType> myRawFlux = dataSource.getRawTypeFlux();          myRawFlux.map(raw -> {             System.out.println("Converting raw: " + raw.id);             return new MappedType("Mapped-" + raw.id);         }).subscribe(mapped -> System.out.println("Received MappedType: " + mapped.name));          dataSource.emitRawType(new MyRawType("X"));         dataSource.emitRawType(new MyRawType("Y"));     } }

3. 合并现有 Flux 与新数据流

一旦你创建了自己的数据源(例如 myRawFlux),下一步就是将其与外部库提供的 Flux<MappedType> 进行整合。这里的关键是,你的自定义数据在与外部库的 Flux<MappedType> 合并之前,通常需要先转换为相同的类型 (MappedType)。

假设外部库提供的方法如下:

public class Library {     public static Flux<MappedType> createMappingToMappedType() {         // 模拟一个持续产生 MappedType 的 Flux         return Flux.just(new MappedType("Lib-1"), new MappedType("Lib-2"))                    .delayElements(Java.time.Duration.ofMillis(100));     } }

现在,我们将你的自定义数据流(经过转换后)与 Library.createMappingToMappedType() 返回的 Flux 进行合并。

import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.time.Duration;  public class FluxMergingExample {      // 假设这是你的原始数据类型和目标映射类型     static class MyRawType { String id; public MyRawType(String id) { this.id = id; } }     static class MappedType { String name; public MappedType(String name) { this.name = name; } }      // 模拟外部库     static class Library {         public static Flux<MappedType> createMappingToMappedType() {             System.out.println("Library.createMappingToMappedType() called.");             return Flux.interval(Duration.ofMillis(200)) // 每200ms产生一个元素                        .map(i -> new MappedType("Lib-Item-" + i))                        .take(3); // 只取3个元素         }     }      // 模拟将原始类型转换为映射类型的方法     private static MappedType convertRawToMappedType(MyRawType raw) {         System.out.println("Converting raw: " + raw.id);         return new MappedType("My-Converted-" + raw.id);     }      public static void main(String[] args) throws InterruptedException {         // 1. 创建你的自定义数据源         Sinks.Many<MyRawType> myRawSink = Sinks.many().unicast().onBackpressureBuffer();         Flux<MyRawType> myRawFlux = myRawSink.asFlux();          // 2. 将你的原始数据流转换为 MappedType         Flux<MappedType> myConvertedFlux = myRawFlux.map(FluxMergingExample::convertRawToMappedType);          // 3. 获取外部库的 Flux         Flux<MappedType> aFluxMap = Library.createMappingToMappedType();          // 4. 合并两个 MappedType 流         // Flux.merge 用于并行合并,元素会根据到达时间交叉输出         Flux<MappedType> combinedFlux = Flux.merge(aFluxMap, myConvertedFlux);          // 5. 订阅并处理合并后的流         combinedFlux.doOnNext(converted -> System.out.println("Received combined MappedType: " + converted.name))                     .doOnComplete(() -> System.out.println("Combined Flux completed!"))                     .subscribe();          // 6. 动态发射你的数据         System.out.println("Emitting custom data...");         myRawSink.tryEmitNext(new MyRawType("A")).orThrow();         Thread.sleep(100); // 稍作等待         myRawSink.tryEmitNext(new MyRawType("B")).orThrow();         Thread.sleep(300); // 稍作等待,让库的Flux也能发射一些         myRawSink.tryEmitNext(new MyRawType("C")).orThrow();         myRawSink.tryEmitComplete(); // 完成你的数据源          // 等待所有异步操作完成         Thread.sleep(1000);     } }

在 Reactor 中,有几个常用的操作符用于合并流:

  • **`Flux



评论(已关闭)

评论已关闭