使用CompletableFuture结合ExecutorService按组并行执行任务,先将任务按groupId分组到map中,再对每组提交异步任务并等待全部完成。

在Java中实现并发任务分组执行,关键在于将任务按组划分,并确保每组任务可以并行执行,同时组与组之间可控制执行顺序或并发级别。常用的方式是结合ExecutorService、CompletableFuture和集合分组逻辑来完成。
使用CompletableFuture进行任务分组并行处理
通过CompletableFuture可以方便地对每组任务进行异步执行,并等待整组完成。
示例场景:假设有多个任务,需要按类别(如用户ID分组)分别执行,每组内部任务并行,组间也可并行。
- 先将任务按某个键(如groupId)分组到Map中
- 对每个组创建一个
CompletableFuture,内部使用线程池并行处理该组所有任务 - 收集所有组的future,调用
join()等待全部完成
代码示例:
立即学习“Java免费学习笔记(深入)”;
Map<String, List<Runnable>> groupedTasks = // 按组分类的任务 ExecutorService groupPool = Executors.newFixedThreadPool(4); List<CompletableFuture<Void>> groupFutures = new ArrayList<>(); for (Map.Entry<String, List<Runnable>> entry : groupedTasks.entrySet()) { String groupId = entry.getKey(); List<Runnable> tasks = entry.getValue(); CompletableFuture<Void> groupFuture = CompletableFuture.runAsync(() -> { ExecutorService taskPool = Executors.newFixedThreadPool(2); // 每组内并发度 List<CompletableFuture<Void>> taskFutures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, taskPool)) .collect(Collectors.toList()); // 等待本组所有任务完成 CompletableFuture.allOf(taskFutures.toArray(new CompletableFuture[0])).join(); taskPool.shutdown(); }, groupPool); groupFutures.add(groupFuture); } // 等待所有组执行完毕 CompletableFuture.allOf(groupFutures.toArray(new CompletableFuture[0])).join(); groupPool.shutdown();
控制组间执行顺序(可选串行)
如果要求组之间按顺序执行,可以不用并行流,而是逐个处理每组:
- 遍历分组map(保持顺序可用
LinkedHashMap) - 每组提交到线程池后调用
future.get()同步等待
这样就能实现“组内并行、组间串行”。
使用CountDownLatch协调任务组
若不使用CompletableFuture,也可以用CountDownLatch手动控制同步。
为每组创建一个latch,初始值为任务数,每个任务完成后countDown,主线程await。
适合更底层控制场景,但代码较繁琐,推荐优先使用CompletableFuture。
注意事项
线程池管理:避免频繁创建过多线程池,建议复用或使用共享池。
异常处理:CompletableFuture中任务抛异常不会中断主流程,需调用exceptionally()或检查get()结果。
资源释放:务必调用shutdown()关闭线程池,防止内存泄漏。
基本上就这些。根据实际需求选择组内并行、组间并行或串行策略,CompletableFuture配合分组map是最清晰实用的方式。


