
本文旨在深入探讨 webflux 中 `repeat` 和 `then` 操作符的独特行为及其相互作用。`repeat` 操作符会使其上游序列重新订阅并重复执行,而 `then` 操作符则在等待其上游流(包括所有重复)完全完成后,才执行一次并转换为一个新的 `mono` 流。理解这两个操作符的相对位置对流的执行顺序和结果至关重要。
在 Webflux 响应式编程中,操作符的组合和顺序对流的行为有着决定性的影响。repeat 和 then 是两个功能强大但有时行为可能出乎意料的操作符。本教程将通过具体示例,详细解析它们各自的特性以及在不同组合下的交互逻辑。
repeat 操作符的基础行为
repeat 操作符的作用是使其上游的响应式序列在完成后重新订阅,从而重复发出元素。它会将一个 Mono 转换为一个 Flux,或者使一个 Flux 重复其元素序列。
考虑以下示例:
import reactor.core.publisher.Mono; public class RepeatExample { public static void main(String[] args) { // 示例 1: repeat 在所有 doOnNext 之后 System.out.println("--- 示例 1 ---"); Mono.just(5) .doOnNext(i -> System.out.println("next 1: " + i)) .doOnNext(i -> System.out.println("next 2: " + i)) .doOnNext(i -> System.out.println("next 3: " + i)) .repeat(2) // 原始序列重复 2 次,总共执行 3 次 .subscribe(); // 示例 2: repeat 在 doOnNext 之间 System.out.println("n--- 示例 2 ---"); Mono.just(5) .doOnNext(i -> System.out.println("next 1: " + i)) .repeat(2) // 原始序列重复 2 次,总共执行 3 次 .doOnNext(i -> System.out.println("next 2: " + i)) .doOnNext(i -> System.out.println("next 3: " + i)) .subscribe(); } }
输出:
--- 示例 1 --- next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5 --- 示例 2 --- next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5
分析: 从这两个示例可以看出,repeat(2) 表示在原始序列完成之后,会再重新订阅并执行两次,总共执行三次。无论 repeat 操作符放在 doOnNext 链的哪个位置,它都会使其上游的整个序列重新订阅。这意味着,一旦 repeat 被应用,它会将其上游的 Mono 或 Flux 转换为一个重复序列的 Flux,而后续的 doOnNext 操作符则会作用于这个重复序列的每个元素。因此,即使 repeat 位于 doOnNext(“next 1”) 之后,doOnNext(“next 2”) 和 doOnNext(“next 3”) 依然会在每次重复时被执行。
then 操作符的特性
then 操作符用于在当前响应式序列完成时,切换到一个新的 Mono 流。它会等待上游序列发出所有元素并成功完成(onComplete)后,才订阅并执行 then 中传入的 Mono。then 的一个关键特点是它会把流的类型从 Flux 或 Mono 转换为一个 Mono,并且这个 Mono 只会发出一个元素(或不发出任何元素)。
repeat 与 then 的交互行为分析
当 repeat 和 then 结合使用时,它们的相对位置会极大地影响流的执行逻辑。
场景一:then 在 repeat 之前
import reactor.core.publisher.Mono; public class ThenBeforeRepeatExample { public static void main(String[] args) { System.out.println("--- 场景一: then 在 repeat 之前 ---"); Mono.just(5) .doOnNext(i -> System.out.println("next 1: " + i)) .doOnNext(i -> System.out.println("next 2: " + i)) .then(Mono.just("hello")) // 转换流,等待上游 Mono<Integer> 完成后发出 "hello" .doOnNext(s -> System.out.println("next 3: " + s)) // 作用于 "hello" .repeat(2) // 重复整个 then 后的 Mono<String> 序列 .subscribe(); } }
输出:
--- 场景一: then 在 repeat 之前 --- next 1: 5 next 2: 5 next 3: hello next 1: 5 next 2: 5 next 3: hello next 1: 5 next 2: 5 next 3: hello
分析: 在此场景中,then(Mono.just(“hello”)) 会在上游的 Mono.just(5) 完成(即 doOnNext(“next 1”) 和 doOnNext(“next 2”) 执行完毕)后,发出一个 “hello” 字符串。此时,整个流的类型已经从 Mono<Integer> 转换为 Mono<String>。doOnNext(“next 3”) 接着处理这个 “hello”。最后,repeat(2) 作用于这个已经转换并发出 “hello” 的 Mono<String> 序列。因此,整个 Mono.just(5) -> doOnNext1 -> doOnNext2 -> then -> doOnNext3 的逻辑单元被重复了三次。
场景二:then 在 repeat 之后
import reactor.core.publisher.Mono; public class ThenAfterRepeatExample { public static void main(String[] args) { System.out.println("--- 场景二: then 在 repeat 之后 ---"); Mono.just(5) .doOnNext(i -> System.out.println("next 1: " + i)) // Mono<Integer> .repeat(2) // 转换为 Flux<Integer>,重复上游 Mono 3 次 .doOnNext(i -> System.out.println("next 2: " + i)) // 作用于 Flux<Integer> 的每个元素 .then(Mono.just("hello")) // 等待上游 Flux<Integer> (包括所有重复) 完成后,发出 "hello" .doOnNext(s -> System.out.println("next 3: " + s)) // 作用于 then 发出的 "hello" .subscribe(); } }
输出:
--- 场景二: then 在 repeat 之后 --- next 1: 5 next 2: 5 next 1: 5 next 2: 5 next 1: 5 next 2: 5 next 3: hello
分析: 这是最能体现 repeat 和 then 交互特性的场景。
- Mono.just(5).doOnNext(“next 1”) 构成了一个初始的 Mono<Integer>。
- repeat(2) 将这个 Mono<Integer> 转换为一个 Flux<Integer>,使其上游序列(即 Mono.just(5).doOnNext(“next 1”))重新订阅并执行三次。
- 紧随其后的 doOnNext(“next 2”) 作用于这个 Flux<Integer>,因此在每次重复时,next 1 和 next 2 都会被打印。
- 关键在于 then(Mono.just(“hello”))。根据 then 的定义,它会等待其上游的 Flux<Integer> 完全完成(包括所有的重复)后,才订阅并发出 “hello”。由于 then 将流转换为 Mono,它只会被触发一次。
- 最后,doOnNext(“next 3”) 作用于 then 发出的这个单次 “hello”,因此 next 3: hello 只会出现一次。
关键点与注意事项
- repeat 的作用范围: repeat 总是作用于其上游的整个序列,使其重新订阅。它将 Mono 转换为 Flux,或者使 Flux 重复其所有元素。
- then 的执行时机与类型转换: then 操作符会等待其上游流(无论是 Mono 还是 Flux,包括所有重复)完全发出元素并完成之后,才执行一次并发出其自身的 Mono 元素。它会将整个流转换为一个 Mono。
- 操作符的顺序至关重要: repeat 和 then 的相对位置决定了流的类型转换和执行边界。
- then 在 repeat 之前:repeat 作用于一个已经被 then 转换过的 Mono,导致整个转换后的序列重复。
- then 在 repeat 之后:repeat 先执行,生成一个重复的 Flux。then 等待这个 Flux 完全结束后,才执行一次。
- 调试辅助: 使用 doOnNext 进行调试时,务必清楚它所作用的上下文是 Mono 还是 Flux,以及它是否处于 repeat 的影响范围内。
总结
Webflux 的 repeat 和 then 操作符虽然功能强大,但在组合使用时需要对其内部机制有清晰的理解。repeat 负责上游序列的重新订阅,而 then 则在等待上游流(包括所有重复)完成后进行一次性转换。掌握它们各自的特性以及它们如何改变流的类型和执行边界,是编写高效、可预测的响应式应用程序的关键。通过本文的示例和分析,希望能够帮助开发者更深入地理解这两个操作符的精妙之处。


