如何结合 ThreadFactory、CompletionService 做任务分组与监控
                           
天天向上
发布: 2025-07-12 12:53:25

原创
423 人浏览过

结合 ThreadFactoryCompletionService,你可以为线程池任务执行提供更精细的线程命名、分组、日志追踪、结果收集与监控。这在复杂系统(如大规模并发任务调度、ETL、Web 爬虫、多租户任务处理)中非常实用。


一、核心组件简介

组件作用
ThreadFactory自定义线程创建逻辑(如命名、设置守护线程、分组)
CompletionService提供任务执行后的结果优先队列,可按完成顺序处理任务
ExecutorService实际执行任务

二、结构图理解

+---------------------+
| CustomThreadFactory |  → 创建线程并自定义命名、组
+---------------------+
          ↓
+--------------------------+
| ThreadPoolExecutor       |  ← 使用上方工厂创建线程
+--------------------------+
          ↓ submit(task)
+-----------------------------+
| CompletionService           | ← 提交任务、结果按完成顺序可取出
+-----------------------------+
          ↓ take()
      Future<Result>

三、完整实战示例:分组命名 + CompletionService 监控

1. 自定义线程工厂(ThreadFactory)

public class NamedThreadFactory implements ThreadFactory {
    private final String groupName;
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public NamedThreadFactory(String groupName) {
        this.groupName = groupName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(groupName + "-thread-" + threadNumber.getAndIncrement());
        return t;
    }
}

2. 使用 CompletionService 提交任务并监控结果

public class CompletionServiceExample {
    public static void main(String[] args) throws Exception {
        // 线程池 + 自定义线程工厂
        ExecutorService executor = Executors.newFixedThreadPool(
            4, new NamedThreadFactory("fetcher")
        );

        // 结合 CompletionService:结果队列按完成顺序
        CompletionService<String> cs = new ExecutorCompletionService<>(executor);

        // 提交任务(模拟耗时不同的任务)
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            cs.submit(() -> {
                long sleep = (long)(Math.random() * 3000);
                Thread.sleep(sleep);
                return Thread.currentThread().getName() + " 完成任务 #" + taskId + " 用时:" + sleep + "ms";
            });
        }

        // 监控执行结果(按完成顺序)
        for (int i = 0; i < 5; i++) {
            Future<String> result = cs.take(); // 阻塞直到有任务完成
            System.out.println("✅ 已完成任务:" + result.get());
        }

        executor.shutdown();
    }
}

四、进阶功能建议(任务分组 + 多租户场景)

你可以用多个线程池+CompletionService分别处理不同业务:

Map<String, ExecutorService> tenantPools = new HashMap<>();
Map<String, CompletionService<?>> tenantCompletions = new HashMap<>();

for (String tenantId : tenantList) {
    ThreadFactory factory = new NamedThreadFactory("tenant-" + tenantId);
    ExecutorService pool = Executors.newFixedThreadPool(3, factory);
    tenantPools.put(tenantId, pool);
    tenantCompletions.put(tenantId, new ExecutorCompletionService<>(pool));
}

这样就可以为每个租户隔离任务线程和结果收集队列,增强可观察性与调度灵活性。


五、如何做任务监控(耗时 / 异常 / 追踪)

你可以封装一个任务包装器,用于统一日志记录、异常捕获、耗时分析:

class MonitoredTask<V> implements Callable<V> {
    private final Callable<V> task;
    private final String taskName;

    public MonitoredTask(Callable<V> task, String taskName) {
        this.task = task;
        this.taskName = taskName;
    }

    @Override
    public V call() throws Exception {
        long start = System.currentTimeMillis();
        try {
            V result = task.call();
            long time = System.currentTimeMillis() - start;
            System.out.println("✅ [" + taskName + "] 执行完成,耗时:" + time + "ms");
            return result;
        } catch (Exception ex) {
            System.err.println("❌ [" + taskName + "] 执行失败:" + ex.getMessage());
            throw ex;
        }
    }
}

使用时包一层:

cs.submit(new MonitoredTask<>(() -> doBusinessLogic(), "task-42"));

六、监控建议(生产场景)

监控维度方案
线程名称 / 来源使用 ThreadFactory 命名
任务完成耗时包装任务 / System.nanoTime()
异常收集包装任务内部 try-catch 日志记录
任务完成数量AtomicIntegercs.take() 计数
状态暴露集成到 Actuator / Prometheus Exporter

官方文档与链接


总结

功能ThreadFactoryCompletionService
自定义线程名/组✅ 是❌ 否
区分不同业务类型线程✅ 是❌ 否
按完成顺序获取结果❌ 否✅ 是
自动统计完成任务❌ 否✅ 是
可组合构建多租户调度系统✅ 是✅ 是

更多详细内容请关注其他相关文章!

发表回复 0

Your email address will not be published. Required fields are marked *