Java ScheduledExecutorService 完全掌握指南

内容纲要

什么是 ScheduledExecutorService

ScheduledExecutorService 是 Java 并发包(java.util.concurrent)中的一个核心接口,它扩展了 ExecutorService,专门用于执行定时任务和周期性任务。它提供了比传统 Timer 类更强大、更灵活的任务调度能力。

继承关系

Executor 
  ↓
ExecutorService 
  ↓
ScheduledExecutorService
  ↓
ScheduledThreadPoolExecutor (主要实现类)

核心方法详解

1. 延迟执行方法

// 延迟执行一次,无返回值
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

// 延迟执行一次,有返回值
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

示例:

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

// 3秒后执行一次
scheduler.schedule(() -> {
    System.out.println("延迟执行任务: " + LocalDateTime.now());
}, 3, TimeUnit.SECONDS);

// 5秒后执行并返回结果
ScheduledFuture<String> future = scheduler.schedule(() -> {
    return "任务完成时间: " + LocalDateTime.now();
}, 5, TimeUnit.SECONDS);

try {
    String result = future.get(); // 阻塞等待结果
    System.out.println(result);
} catch (Exception e) {
    e.printStackTrace();
}

2. 周期性执行方法

scheduleAtFixedRate - 固定速率执行

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
                                      long initialDelay, 
                                      long period, 
                                      TimeUnit unit)

特点: 按照固定的时间间隔执行,不受任务执行时间影响。如果任务执行时间超过间隔时间,下次执行会立即开始。

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

// 1秒后开始,每2秒执行一次
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
    System.out.println("固定速率执行: " + LocalDateTime.now());
    // 模拟耗时操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}, 1, 2, TimeUnit.SECONDS);

scheduleWithFixedDelay - 固定延迟执行

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
                                         long initialDelay, 
                                         long delay, 
                                         TimeUnit unit)

特点: 在上一次任务执行完成后,等待固定时间再执行下一次任务。

scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("固定延迟执行: " + LocalDateTime.now());
    try {
        Thread.sleep(1000); // 模拟任务耗时
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}, 1, 2, TimeUnit.SECONDS);
// 实际执行间隔 = 任务执行时间(1秒) + 固定延迟(2秒) = 3秒

创建方式详解

1. Executors 工厂方法

// 单线程调度器
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();

// 多线程调度器
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

// 带线程工厂的调度器
ScheduledExecutorService custom = Executors.newScheduledThreadPool(3, r -> {
    Thread t = new Thread(r);
    t.setName("自定义调度线程-" + t.getId());
    t.setDaemon(true); // 设置为守护线程
    return t;
});

2. 直接创建 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
    5,  // 核心线程数
    new ThreadFactory() {
        private final AtomicInteger counter = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "调度器线程-" + counter.getAndIncrement());
            t.setDaemon(false);
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 设置是否在关闭时取消定期任务
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// 设置是否在关闭时取消延迟任务
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

实际应用场景

1. 心跳检测系统

public class HeartbeatMonitor {
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    private final Map<String, Long> lastHeartbeat = new ConcurrentHashMap<>();

    public void startMonitoring() {
        // 每30秒检查一次心跳
        scheduler.scheduleAtFixedRate(() -> {
            long currentTime = System.currentTimeMillis();
            lastHeartbeat.entrySet().removeIf(entry -> {
                if (currentTime - entry.getValue() > 90000) { // 90秒超时
                    System.out.println("节点 " + entry.getKey() + " 心跳超时");
                    return true;
                }
                return false;
            });
        }, 30, 30, TimeUnit.SECONDS);
    }

    public void updateHeartbeat(String nodeId) {
        lastHeartbeat.put(nodeId, System.currentTimeMillis());
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

2. 缓存清理系统

public class CacheManager<K, V> {
    private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleaner = 
        Executors.newSingleThreadScheduledExecutor();

    private static class CacheItem<V> {
        final V value;
        final long expireTime;

        CacheItem(V value, long ttl) {
            this.value = value;
            this.expireTime = System.currentTimeMillis() + ttl;
        }

        boolean isExpired() {
            return System.currentTimeMillis() > expireTime;
        }
    }

    public CacheManager() {
        // 每分钟清理一次过期缓存
        cleaner.scheduleWithFixedDelay(() -> {
            cache.entrySet().removeIf(entry -> {
                if (entry.getValue().isExpired()) {
                    System.out.println("清理过期缓存: " + entry.getKey());
                    return true;
                }
                return false;
            });
        }, 1, 1, TimeUnit.MINUTES);
    }

    public void put(K key, V value, long ttlMs) {
        cache.put(key, new CacheItem<>(value, ttlMs));
    }

    public V get(K key) {
        CacheItem<V> item = cache.get(key);
        if (item != null && !item.isExpired()) {
            return item.value;
        }
        cache.remove(key); // 立即清理过期项
        return null;
    }

    public void shutdown() {
        cleaner.shutdown();
    }
}

3. 数据统计系统

public class MetricsCollector {
    private final AtomicLong requestCount = new AtomicLong(0);
    private final AtomicLong errorCount = new AtomicLong(0);
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(2);

    public void startCollection() {
        // 每分钟输出统计信息
        scheduler.scheduleAtFixedRate(() -> {
            long requests = requestCount.getAndSet(0);
            long errors = errorCount.getAndSet(0);
            double errorRate = requests > 0 ? (double) errors / requests * 100 : 0;

            System.out.printf("[%s] 请求数: %d, 错误数: %d, 错误率: %.2f%%\n", 
                LocalDateTime.now(), requests, errors, errorRate);
        }, 1, 1, TimeUnit.MINUTES);

        // 每小时生成详细报告
        scheduler.scheduleAtFixedRate(() -> {
            generateDetailedReport();
        }, 1, 1, TimeUnit.HOURS);
    }

    public void recordRequest() {
        requestCount.incrementAndGet();
    }

    public void recordError() {
        errorCount.incrementAndGet();
    }

    private void generateDetailedReport() {
        // 生成详细报告的逻辑
        System.out.println("生成小时报告: " + LocalDateTime.now());
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

4. 重试机制实现

public class RetryableTask {
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();

    public <T> CompletableFuture<T> executeWithRetry(
            Callable<T> task, 
            int maxRetries, 
            long delayMs) {

        CompletableFuture<T> future = new CompletableFuture<>();
        executeWithRetry(task, maxRetries, delayMs, future, 0);
        return future;
    }

    private <T> void executeWithRetry(
            Callable<T> task, 
            int maxRetries, 
            long delayMs,
            CompletableFuture<T> future, 
            int currentAttempt) {

        try {
            T result = task.call();
            future.complete(result);
        } catch (Exception e) {
            if (currentAttempt >= maxRetries) {
                future.completeExceptionally(e);
            } else {
                System.out.println("任务失败," + delayMs + "ms后重试 (第" + 
                    (currentAttempt + 1) + "次)");

                scheduler.schedule(() -> {
                    executeWithRetry(task, maxRetries, delayMs * 2, // 指数退避
                        future, currentAttempt + 1);
                }, delayMs, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

// 使用示例
RetryableTask retryTask = new RetryableTask();
CompletableFuture<String> result = retryTask.executeWithRetry(() -> {
    if (Math.random() < 0.7) { // 70%概率失败
        throw new RuntimeException("模拟失败");
    }
    return "成功执行";
}, 3, 1000);

result.whenComplete((r, ex) -> {
    if (ex != null) {
        System.out.println("最终失败: " + ex.getMessage());
    } else {
        System.out.println("最终成功: " + r);
    }
});

异常处理机制

public class SafeScheduledExecutor {
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(5);

    public ScheduledFuture<?> safeScheduleAtFixedRate(
            Runnable task, 
            long initialDelay, 
            long period, 
            TimeUnit unit) {

        return scheduler.scheduleAtFixedRate(() -> {
            try {
                task.run();
            } catch (Exception e) {
                System.err.println("定时任务执行异常: " + e.getMessage());
                e.printStackTrace();
                // 可以添加报警逻辑
                // alertService.sendAlert("定时任务异常", e);
            }
        }, initialDelay, period, unit);
    }

    public void shutdown() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
                if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("线程池无法正常关闭");
                }
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

最佳实践

1. 资源管理

// 使用 try-with-resources(需要自定义包装类)
public class AutoCloseableScheduler implements AutoCloseable {
    private final ScheduledExecutorService scheduler;

    public AutoCloseableScheduler(int poolSize) {
        this.scheduler = Executors.newScheduledThreadPool(poolSize);
    }

    public ScheduledExecutorService getScheduler() {
        return scheduler;
    }

    @Override
    public void close() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

// 使用方式
try (AutoCloseableScheduler scheduler = new AutoCloseableScheduler(5)) {
    scheduler.getScheduler().scheduleAtFixedRate(() -> {
        // 任务逻辑
    }, 0, 1, TimeUnit.SECONDS);

    // 其他业务逻辑
    Thread.sleep(10000);
} // 自动关闭

2. 线程池监控

public class MonitoredScheduler {
    private final ScheduledThreadPoolExecutor executor;
    private final ScheduledExecutorService monitor = 
        Executors.newSingleThreadScheduledExecutor();

    public MonitoredScheduler(int corePoolSize) {
        this.executor = new ScheduledThreadPoolExecutor(corePoolSize);
        startMonitoring();
    }

    private void startMonitoring() {
        monitor.scheduleAtFixedRate(() -> {
            System.out.printf(
                "线程池状态 - 活跃线程: %d, 队列大小: %d, 完成任务: %d\n",
                executor.getActiveCount(),
                executor.getQueue().size(),
                executor.getCompletedTaskCount()
            );
        }, 30, 30, TimeUnit.SECONDS);
    }

    public ScheduledExecutorService getExecutor() {
        return executor;
    }

    public void shutdown() {
        monitor.shutdown();
        executor.shutdown();
    }
}

3. 配置化调度器

@Configuration
public class SchedulerConfig {

    @Bean
    @Primary
    public ScheduledExecutorService primaryScheduler() {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
            10, // 核心线程数
            new ThreadFactoryBuilder()
                .setNameFormat("primary-scheduler-%d")
                .setDaemon(false)
                .build()
        );

        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        return executor;
    }

    @Bean("heartbeatScheduler")
    public ScheduledExecutorService heartbeatScheduler() {
        return Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder()
                .setNameFormat("heartbeat-%d")
                .setDaemon(true)
                .build()
        );
    }
}

常见陷阱和注意事项

1. 异常处理陷阱

// ❌ 错误:异常会导致后续调度停止
scheduler.scheduleAtFixedRate(() -> {
    if (Math.random() < 0.1) {
        throw new RuntimeException("随机异常"); // 这会终止后续执行
    }
    System.out.println("正常执行");
}, 0, 1, TimeUnit.SECONDS);

// ✅ 正确:捕获所有异常
scheduler.scheduleAtFixedRate(() -> {
    try {
        if (Math.random() < 0.1) {
            throw new RuntimeException("随机异常");
        }
        System.out.println("正常执行");
    } catch (Exception e) {
        logger.error("定时任务异常", e);
    }
}, 0, 1, TimeUnit.SECONDS);

2. 内存泄漏风险

// ❌ 错误:忘记关闭会导致内存泄漏
public class BadService {
    private ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();

    public void startTask() {
        scheduler.scheduleAtFixedRate(() -> {
            // 任务逻辑
        }, 0, 1, TimeUnit.SECONDS);
    }
    // 没有关闭方法!
}

// ✅ 正确:提供关闭方法
public class GoodService {
    private ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();

    @PreDestroy
    public void shutdown() {
        if (scheduler != null && !scheduler.isShutdown()) {
            scheduler.shutdown();
        }
    }
}

3. 时间精度问题

// 系统时间调整可能影响调度
// 对于对时间敏感的任务,考虑使用 System.nanoTime()
scheduler.scheduleAtFixedRate(() -> {
    long startTime = System.nanoTime();

    // 任务执行
    doWork();

    long duration = System.nanoTime() - startTime;
    System.out.println("任务耗时: " + duration / 1_000_000 + "ms");
}, 0, 5, TimeUnit.SECONDS);

性能优化建议

1. 线程池大小调优

// CPU 密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
ScheduledExecutorService cpuIntensiveScheduler = 
    Executors.newScheduledThreadPool(cpuCores);

// I/O 密集型任务
ScheduledExecutorService ioIntensiveScheduler = 
    Executors.newScheduledThreadPool(cpuCores * 2);

2. 任务分组

public class TaskGroupScheduler {
    private final ScheduledExecutorService fastTaskScheduler = 
        Executors.newScheduledThreadPool(5);
    private final ScheduledExecutorService slowTaskScheduler = 
        Executors.newScheduledThreadPool(2);

    public void scheduleFastTask(Runnable task, long period) {
        fastTaskScheduler.scheduleAtFixedRate(task, 0, period, TimeUnit.SECONDS);
    }

    public void scheduleSlowTask(Runnable task, long period) {
        slowTaskScheduler.scheduleAtFixedRate(task, 0, period, TimeUnit.MINUTES);
    }
}

总结

ScheduledExecutorService 是 Java 中处理定时任务的首选工具,相比传统的 Timer,它提供了更好的异常处理、更灵活的线程管理和更强大的功能。掌握其使用要点:

  1. 选择合适的方法schedule 用于一次性延迟执行,scheduleAtFixedRate 用于固定速率执行,scheduleWithFixedDelay 用于固定间隔执行
  2. 异常处理:始终在任务中捕获异常,防止调度中断
  3. 资源管理:及时关闭线程池,避免内存泄漏
  4. 监控和调优:根据任务特性选择合适的线程池大小和配置

通过合理使用 ScheduledExecutorService,可以构建出高效、稳定的定时任务系统,满足各种业务场景的需求。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward