本文探讨了在reactor框架中,如何向一个由外部库提供的现有Flux动态注入数据,以及如何将自定义数据流与外部Flux进行有效合并。文章将详细介绍如何利用Sinks创建可控的发射器,并通过Flux.merge()等操作符将多个数据源整合,同时会针对UnicastProcessor等一次性订阅源的特殊情况提供解决方案和注意事项。
在响应式编程中,Flux代表一个0到N个元素的异步序列。当面对由外部库提供、我们无法直接控制其数据发射机制的Flux时,如何将我们自己的数据注入其中,或将其与我们自己的数据流结合,是一个常见的挑战。通常,Flux本身不提供直接的emit方法,因为它的设计理念是作为数据流的消费者而非直接的生产者(对于外部数据源而言)。
挑战:向现有Flux注入数据
假设我们有一个外部库方法,它返回一个Flux<MappedType>:
Flux<MappedType> aFluxMap = Library.createMappingToMappedType();
我们希望能够将自己的原始对象(例如myObj)发送到这个aFluxMap中,让它们被处理并转换为MappedType,然后继续后续操作。直观上,我们可能期望有一个类似aFluxMap.emit(myObj)的方法,但这样的方法在Flux或Mono中并不存在。
一种常见的误解是尝试使用FluxProcessor和FluxSink来解决:
FluxProcessor p = UnicastProcessor.create().serialize(); FluxSink sink = p.sink(); sink.next(mess); // 这会将消息发送到新创建的Flux 'p'
这种方法的问题在于,它创建了一个新的Flux (p) 并向其发送消息,而不是将消息发送到我们已有的aFluxMap。我们需要的是将我们自己的数据流与aFluxMap关联起来。
解决方案:合并数据流
Reactor框架提供了强大的操作符来组合和合并不同的数据流。解决上述问题的核心思路是:
- 创建一个我们自己可以控制的Flux,用于发射我们的自定义数据。
- 使用合并操作符(如merge、concat、zip等)将这个自定义Flux与外部库提供的aFluxMap结合起来。
1. 创建可控的自定义数据流
在Reactor 3.4及更高版本中,推荐使用Sinks API来创建可控的发射器。Sinks.many()可以创建一个多值发射器,它提供了tryEmitNext、tryEmitComplete等方法来安全地发射数据。
import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; // 假设 MappedType 是一个已定义的类型 class MappedType { private String value; // 构造函数、getter等 public MappedType(String value) { this.value = value; } public String getValue() { return value; } @Override public String toString() { return "MappedType(" + value + ")"; } } public class CustomEmitter { private final Sinks.Many<String> myDataSink = Sinks.many().multicast().onBackpressureBuffer(); private final Flux<String> myDataFlux = myDataSink.asFlux(); // 模拟外部库的Flux public Flux<MappedType> createExternalFlux() { return Flux.just("External1", "External2") .map(s -> new MappedType("External-" + s)); } // 发射自定义数据的方法 public void emitMyData(String data) { myDataSink.tryEmitNext(data); } public Flux<String> getMyDataFlux() { return myDataFlux; } public static void main(String[] args) { CustomEmitter emitter = new CustomEmitter(); // 1. 获取外部库的Flux Flux<MappedType> aFluxMap = emitter.createExternalFlux(); // 2. 创建我们自己的Flux(这里我们直接使用原始字符串,稍后进行转换) Flux<String> customRawDataFlux = emitter.getMyDataFlux(); // 3. 将自定义原始数据转换为 MappedType Flux<MappedType> customMappedDataFlux = customRawDataFlux .map(raw -> { System.out.println("Converting custom raw data: " + raw); // 模拟转换逻辑 return new MappedType("Custom-" + raw); }); // 4. 合并两个Flux Flux<MappedType> combinedFlux = Flux.merge(aFluxMap, customMappedDataFlux); // 订阅合并后的Flux并处理数据 combinedFlux.doOnNext(converted -> System.out.println("Received: " + converted)) .subscribe( null, // onNext error -> System.err.println("Error: " + error), () -> System.out.println("Combined Flux completed.") ); // 动态发射自定义数据 emitter.emitMyData("MyDataA"); emitter.emitMyData("MyDataB"); // 模拟外部Flux完成后,手动完成自定义Flux // 如果不手动complete,程序会一直运行等待更多数据 // myDataSink.tryEmitComplete(); // 实际应用中根据业务逻辑决定何时完成 } }
在上述示例中:
- 我们创建了一个Sinks.Many<String>来作为自定义数据源。
- emitMyData方法允许我们随时向这个Sink发射数据。
- customMappedDataFlux负责将我们发射的原始String数据转换为MappedType。
- Flux.merge(aFluxMap, customMappedDataFlux)将外部Flux和我们自己的Flux合并成一个单一的Flux。merge操作符会并发地从两个源接收元素,并按照它们到达的顺序发射。
2. 常用合并操作符
- Flux.merge(Publisher… sources): 并发地合并多个Publisher的元素,元素到达的顺序决定了它们在输出Flux中的顺序。适用于对顺序不敏感但需要快速处理所有数据的场景。
- Flux.concat(Publisher… sources): 顺序地连接多个Publisher。它会等待前一个Publisher完成,然后才订阅下一个Publisher。适用于需要严格保持数据源顺序的场景。
- Flux.zip(Publisher… sources, function<Object[], O> combinator): 将多个Publisher的元素两两配对,并使用提供的combinator函数将它们组合成一个新的元素。它会等待所有源都发出一个元素后才进行组合。适用于需要将不同类型或不同来源的数据关联起来的场景。
根据具体需求选择合适的合并操作符。在大多数动态注入数据的场景中,merge是最常用的,因为它允许并发处理来自不同源的数据。
注意事项与常见问题
1. UnicastProcessor的订阅限制
在原始问题中提到了一个重要的更新: Library.createMappingToMappedType() 返回的aFluxMap的内部源可能是一个UnicastProcessor,并且该UnicastProcessor可能已经被库内部订阅。当尝试通过p.flatMap(raw -> aFluxMap).subscribe()再次订阅aFluxMap时,会遇到”UnicastProcessor can be subscribe once”的异常。
解释: UnicastProcessor是一个单播处理器,它只允许一个订阅者。一旦被订阅,它就会开始发射数据。如果尝试第二次订阅,就会抛出异常。
解决方案:
- 理解merge的工作方式: Flux.merge(aFluxMap, customMappedDataFlux)操作符通常只会对aFluxMap进行一次订阅。这意味着如果aFluxMap本身是一个有效的Flux(即使其内部使用了UnicastProcessor,但它对外暴露的接口允许一次订阅),那么merge操作应该能够成功。
- 避免重复订阅: 原始问题中的p.flatMap(raw -> aFluxMap)会导致对aFluxMap进行多次订阅(p每发射一个raw,就会尝试订阅一次aFluxMap),这正是导致UnicastProcessor报错的原因。务必避免在flatMap等操作中对单播源进行重复订阅。
- 如果aFluxMap本身就是已订阅的UnicastProcessor: 如果Library.createMappingToMappedType()返回的Flux本身就是一个已经启动并被订阅过的UnicastProcessor,那么任何对其的再次订阅都会失败。在这种极端情况下,你可能无法直接“合并”它,而只能作为它的一个消费者(即,只订阅一次)。
- 应对策略: 如果aFluxMap是一个已订阅且不能再次订阅的单播源,你无法通过merge来将你的数据“注入”到它的上游。你只能将你的数据与aFluxMap的输出进行合并。这意味着aFluxMap的数据流是独立的,你的数据流也是独立的,它们在下游汇合。上述Flux.merge(aFluxMap, customMappedDataFlux)的方案正是这样做的,它将两个独立的、可订阅的Flux合并。
2. 背压(Backpressure)
在使用Sinks.many().multicast().onBackpressureBuffer()时,我们选择了onBackpressureBuffer策略,这意味着如果下游消费者处理速度慢于上游发射速度,Sink会缓冲元素。根据实际需求,你可能需要选择其他背压策略,例如onBackpressuredrop(丢弃元素)或onBackpressureError(发出错误)。
3. 资源管理与完成信号
当你的自定义数据流不再有数据需要发射时,记得调用myDataSink.tryEmitComplete()来发出完成信号。这对于下游的Flux操作符(如merge)来说很重要,它们需要知道何时所有上游源都已完成,以便自己也能完成。
总结
在Reactor中,向一个由外部库提供的现有Flux动态注入数据,并非通过直接的emit方法实现,而是通过创建一个可控的自定义Flux,然后利用Flux.merge()等操作符将其与外部Flux合并。这种模式遵循了响应式编程的原则,即通过组合和转换数据流来构建复杂的业务逻辑。同时,需要特别注意UnicastProcessor等单订阅源的特性,避免不当的重复订阅操作。
评论(已关闭)
评论已关闭