如何结合 ThreadFactory、CompletionService 做任务分组与监控
结合 ThreadFactory 和 CompletionService,你可以为线程池任务执行提供更精细的线程命名、分组、日志追踪、结果收集与监控。这在复杂系统(如大规模并发任务调度、ETL、Web 爬虫、多租户任务处理)中非常实用。
一、核心组件简介
| 组件 | 作用 |
|---|---|
ThreadFactory | 自定义线程创建逻辑(如命名、设置守护线程、分组) |
CompletionService | 提供任务执行后的结果优先队列,可按完成顺序处理任务 |
ExecutorService | 实际执行任务 |
二、结构图理解
+---------------------+
| CustomThreadFactory | → 创建线程并自定义命名、组
+---------------------+
↓
+--------------------------+
| ThreadPoolExecutor | ← 使用上方工厂创建线程
+--------------------------+
↓ submit(task)
+-----------------------------+
| CompletionService | ← 提交任务、结果按完成顺序可取出
+-----------------------------+
↓ take()
Future<Result>
三、完整实战示例:分组命名 + CompletionService 监控
1. 自定义线程工厂(ThreadFactory)
public class NamedThreadFactory implements ThreadFactory {
private final String groupName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public NamedThreadFactory(String groupName) {
this.groupName = groupName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(groupName + "-thread-" + threadNumber.getAndIncrement());
return t;
}
}
2. 使用 CompletionService 提交任务并监控结果
public class CompletionServiceExample {
public static void main(String[] args) throws Exception {
// 线程池 + 自定义线程工厂
ExecutorService executor = Executors.newFixedThreadPool(
4, new NamedThreadFactory("fetcher")
);
// 结合 CompletionService:结果队列按完成顺序
CompletionService<String> cs = new ExecutorCompletionService<>(executor);
// 提交任务(模拟耗时不同的任务)
for (int i = 1; i <= 5; i++) {
final int taskId = i;
cs.submit(() -> {
long sleep = (long)(Math.random() * 3000);
Thread.sleep(sleep);
return Thread.currentThread().getName() + " 完成任务 #" + taskId + " 用时:" + sleep + "ms";
});
}
// 监控执行结果(按完成顺序)
for (int i = 0; i < 5; i++) {
Future<String> result = cs.take(); // 阻塞直到有任务完成
System.out.println("✅ 已完成任务:" + result.get());
}
executor.shutdown();
}
}
四、进阶功能建议(任务分组 + 多租户场景)
你可以用多个线程池+CompletionService分别处理不同业务:
Map<String, ExecutorService> tenantPools = new HashMap<>();
Map<String, CompletionService<?>> tenantCompletions = new HashMap<>();
for (String tenantId : tenantList) {
ThreadFactory factory = new NamedThreadFactory("tenant-" + tenantId);
ExecutorService pool = Executors.newFixedThreadPool(3, factory);
tenantPools.put(tenantId, pool);
tenantCompletions.put(tenantId, new ExecutorCompletionService<>(pool));
}
这样就可以为每个租户隔离任务线程和结果收集队列,增强可观察性与调度灵活性。
五、如何做任务监控(耗时 / 异常 / 追踪)
你可以封装一个任务包装器,用于统一日志记录、异常捕获、耗时分析:
class MonitoredTask<V> implements Callable<V> {
private final Callable<V> task;
private final String taskName;
public MonitoredTask(Callable<V> task, String taskName) {
this.task = task;
this.taskName = taskName;
}
@Override
public V call() throws Exception {
long start = System.currentTimeMillis();
try {
V result = task.call();
long time = System.currentTimeMillis() - start;
System.out.println("✅ [" + taskName + "] 执行完成,耗时:" + time + "ms");
return result;
} catch (Exception ex) {
System.err.println("❌ [" + taskName + "] 执行失败:" + ex.getMessage());
throw ex;
}
}
}
使用时包一层:
cs.submit(new MonitoredTask<>(() -> doBusinessLogic(), "task-42"));
六、监控建议(生产场景)
| 监控维度 | 方案 |
|---|---|
| 线程名称 / 来源 | 使用 ThreadFactory 命名 |
| 任务完成耗时 | 包装任务 / System.nanoTime() |
| 异常收集 | 包装任务内部 try-catch 日志记录 |
| 任务完成数量 | AtomicInteger 或 cs.take() 计数 |
| 状态暴露 | 集成到 Actuator / Prometheus Exporter |
官方文档与链接
总结
| 功能 | ThreadFactory | CompletionService |
|---|---|---|
| 自定义线程名/组 | ✅ 是 | ❌ 否 |
| 区分不同业务类型线程 | ✅ 是 | ❌ 否 |
| 按完成顺序获取结果 | ❌ 否 | ✅ 是 |
| 自动统计完成任务 | ❌ 否 | ✅ 是 |
| 可组合构建多租户调度系统 | ✅ 是 | ✅ 是 |
更多详细内容请关注其他相关文章!