内容纲要
什么是 ScheduledExecutorService
ScheduledExecutorService
是 Java 并发包(java.util.concurrent
)中的一个核心接口,它扩展了 ExecutorService
,专门用于执行定时任务和周期性任务。它提供了比传统 Timer
类更强大、更灵活的任务调度能力。
继承关系
Executor
↓
ExecutorService
↓
ScheduledExecutorService
↓
ScheduledThreadPoolExecutor (主要实现类)
核心方法详解
1. 延迟执行方法
// 延迟执行一次,无返回值
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 延迟执行一次,有返回值
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
示例:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
// 3秒后执行一次
scheduler.schedule(() -> {
System.out.println("延迟执行任务: " + LocalDateTime.now());
}, 3, TimeUnit.SECONDS);
// 5秒后执行并返回结果
ScheduledFuture<String> future = scheduler.schedule(() -> {
return "任务完成时间: " + LocalDateTime.now();
}, 5, TimeUnit.SECONDS);
try {
String result = future.get(); // 阻塞等待结果
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
2. 周期性执行方法
scheduleAtFixedRate - 固定速率执行
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)
特点: 按照固定的时间间隔执行,不受任务执行时间影响。如果任务执行时间超过间隔时间,下次执行会立即开始。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 1秒后开始,每2秒执行一次
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定速率执行: " + LocalDateTime.now());
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 2, TimeUnit.SECONDS);
scheduleWithFixedDelay - 固定延迟执行
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit)
特点: 在上一次任务执行完成后,等待固定时间再执行下一次任务。
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟执行: " + LocalDateTime.now());
try {
Thread.sleep(1000); // 模拟任务耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 2, TimeUnit.SECONDS);
// 实际执行间隔 = 任务执行时间(1秒) + 固定延迟(2秒) = 3秒
创建方式详解
1. Executors 工厂方法
// 单线程调度器
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();
// 多线程调度器
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
// 带线程工厂的调度器
ScheduledExecutorService custom = Executors.newScheduledThreadPool(3, r -> {
Thread t = new Thread(r);
t.setName("自定义调度线程-" + t.getId());
t.setDaemon(true); // 设置为守护线程
return t;
});
2. 直接创建 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
5, // 核心线程数
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "调度器线程-" + counter.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 设置是否在关闭时取消定期任务
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// 设置是否在关闭时取消延迟任务
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
实际应用场景
1. 心跳检测系统
public class HeartbeatMonitor {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
private final Map<String, Long> lastHeartbeat = new ConcurrentHashMap<>();
public void startMonitoring() {
// 每30秒检查一次心跳
scheduler.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
lastHeartbeat.entrySet().removeIf(entry -> {
if (currentTime - entry.getValue() > 90000) { // 90秒超时
System.out.println("节点 " + entry.getKey() + " 心跳超时");
return true;
}
return false;
});
}, 30, 30, TimeUnit.SECONDS);
}
public void updateHeartbeat(String nodeId) {
lastHeartbeat.put(nodeId, System.currentTimeMillis());
}
public void shutdown() {
scheduler.shutdown();
}
}
2. 缓存清理系统
public class CacheManager<K, V> {
private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>();
private final ScheduledExecutorService cleaner =
Executors.newSingleThreadScheduledExecutor();
private static class CacheItem<V> {
final V value;
final long expireTime;
CacheItem(V value, long ttl) {
this.value = value;
this.expireTime = System.currentTimeMillis() + ttl;
}
boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
}
public CacheManager() {
// 每分钟清理一次过期缓存
cleaner.scheduleWithFixedDelay(() -> {
cache.entrySet().removeIf(entry -> {
if (entry.getValue().isExpired()) {
System.out.println("清理过期缓存: " + entry.getKey());
return true;
}
return false;
});
}, 1, 1, TimeUnit.MINUTES);
}
public void put(K key, V value, long ttlMs) {
cache.put(key, new CacheItem<>(value, ttlMs));
}
public V get(K key) {
CacheItem<V> item = cache.get(key);
if (item != null && !item.isExpired()) {
return item.value;
}
cache.remove(key); // 立即清理过期项
return null;
}
public void shutdown() {
cleaner.shutdown();
}
}
3. 数据统计系统
public class MetricsCollector {
private final AtomicLong requestCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
public void startCollection() {
// 每分钟输出统计信息
scheduler.scheduleAtFixedRate(() -> {
long requests = requestCount.getAndSet(0);
long errors = errorCount.getAndSet(0);
double errorRate = requests > 0 ? (double) errors / requests * 100 : 0;
System.out.printf("[%s] 请求数: %d, 错误数: %d, 错误率: %.2f%%\n",
LocalDateTime.now(), requests, errors, errorRate);
}, 1, 1, TimeUnit.MINUTES);
// 每小时生成详细报告
scheduler.scheduleAtFixedRate(() -> {
generateDetailedReport();
}, 1, 1, TimeUnit.HOURS);
}
public void recordRequest() {
requestCount.incrementAndGet();
}
public void recordError() {
errorCount.incrementAndGet();
}
private void generateDetailedReport() {
// 生成详细报告的逻辑
System.out.println("生成小时报告: " + LocalDateTime.now());
}
public void shutdown() {
scheduler.shutdown();
}
}
4. 重试机制实现
public class RetryableTask {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public <T> CompletableFuture<T> executeWithRetry(
Callable<T> task,
int maxRetries,
long delayMs) {
CompletableFuture<T> future = new CompletableFuture<>();
executeWithRetry(task, maxRetries, delayMs, future, 0);
return future;
}
private <T> void executeWithRetry(
Callable<T> task,
int maxRetries,
long delayMs,
CompletableFuture<T> future,
int currentAttempt) {
try {
T result = task.call();
future.complete(result);
} catch (Exception e) {
if (currentAttempt >= maxRetries) {
future.completeExceptionally(e);
} else {
System.out.println("任务失败," + delayMs + "ms后重试 (第" +
(currentAttempt + 1) + "次)");
scheduler.schedule(() -> {
executeWithRetry(task, maxRetries, delayMs * 2, // 指数退避
future, currentAttempt + 1);
}, delayMs, TimeUnit.MILLISECONDS);
}
}
}
public void shutdown() {
scheduler.shutdown();
}
}
// 使用示例
RetryableTask retryTask = new RetryableTask();
CompletableFuture<String> result = retryTask.executeWithRetry(() -> {
if (Math.random() < 0.7) { // 70%概率失败
throw new RuntimeException("模拟失败");
}
return "成功执行";
}, 3, 1000);
result.whenComplete((r, ex) -> {
if (ex != null) {
System.out.println("最终失败: " + ex.getMessage());
} else {
System.out.println("最终成功: " + r);
}
});
异常处理机制
public class SafeScheduledExecutor {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(5);
public ScheduledFuture<?> safeScheduleAtFixedRate(
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
return scheduler.scheduleAtFixedRate(() -> {
try {
task.run();
} catch (Exception e) {
System.err.println("定时任务执行异常: " + e.getMessage());
e.printStackTrace();
// 可以添加报警逻辑
// alertService.sendAlert("定时任务异常", e);
}
}, initialDelay, period, unit);
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池无法正常关闭");
}
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
最佳实践
1. 资源管理
// 使用 try-with-resources(需要自定义包装类)
public class AutoCloseableScheduler implements AutoCloseable {
private final ScheduledExecutorService scheduler;
public AutoCloseableScheduler(int poolSize) {
this.scheduler = Executors.newScheduledThreadPool(poolSize);
}
public ScheduledExecutorService getScheduler() {
return scheduler;
}
@Override
public void close() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 使用方式
try (AutoCloseableScheduler scheduler = new AutoCloseableScheduler(5)) {
scheduler.getScheduler().scheduleAtFixedRate(() -> {
// 任务逻辑
}, 0, 1, TimeUnit.SECONDS);
// 其他业务逻辑
Thread.sleep(10000);
} // 自动关闭
2. 线程池监控
public class MonitoredScheduler {
private final ScheduledThreadPoolExecutor executor;
private final ScheduledExecutorService monitor =
Executors.newSingleThreadScheduledExecutor();
public MonitoredScheduler(int corePoolSize) {
this.executor = new ScheduledThreadPoolExecutor(corePoolSize);
startMonitoring();
}
private void startMonitoring() {
monitor.scheduleAtFixedRate(() -> {
System.out.printf(
"线程池状态 - 活跃线程: %d, 队列大小: %d, 完成任务: %d\n",
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
);
}, 30, 30, TimeUnit.SECONDS);
}
public ScheduledExecutorService getExecutor() {
return executor;
}
public void shutdown() {
monitor.shutdown();
executor.shutdown();
}
}
3. 配置化调度器
@Configuration
public class SchedulerConfig {
@Bean
@Primary
public ScheduledExecutorService primaryScheduler() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
10, // 核心线程数
new ThreadFactoryBuilder()
.setNameFormat("primary-scheduler-%d")
.setDaemon(false)
.build()
);
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
return executor;
}
@Bean("heartbeatScheduler")
public ScheduledExecutorService heartbeatScheduler() {
return Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("heartbeat-%d")
.setDaemon(true)
.build()
);
}
}
常见陷阱和注意事项
1. 异常处理陷阱
// ❌ 错误:异常会导致后续调度停止
scheduler.scheduleAtFixedRate(() -> {
if (Math.random() < 0.1) {
throw new RuntimeException("随机异常"); // 这会终止后续执行
}
System.out.println("正常执行");
}, 0, 1, TimeUnit.SECONDS);
// ✅ 正确:捕获所有异常
scheduler.scheduleAtFixedRate(() -> {
try {
if (Math.random() < 0.1) {
throw new RuntimeException("随机异常");
}
System.out.println("正常执行");
} catch (Exception e) {
logger.error("定时任务异常", e);
}
}, 0, 1, TimeUnit.SECONDS);
2. 内存泄漏风险
// ❌ 错误:忘记关闭会导致内存泄漏
public class BadService {
private ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public void startTask() {
scheduler.scheduleAtFixedRate(() -> {
// 任务逻辑
}, 0, 1, TimeUnit.SECONDS);
}
// 没有关闭方法!
}
// ✅ 正确:提供关闭方法
public class GoodService {
private ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
@PreDestroy
public void shutdown() {
if (scheduler != null && !scheduler.isShutdown()) {
scheduler.shutdown();
}
}
}
3. 时间精度问题
// 系统时间调整可能影响调度
// 对于对时间敏感的任务,考虑使用 System.nanoTime()
scheduler.scheduleAtFixedRate(() -> {
long startTime = System.nanoTime();
// 任务执行
doWork();
long duration = System.nanoTime() - startTime;
System.out.println("任务耗时: " + duration / 1_000_000 + "ms");
}, 0, 5, TimeUnit.SECONDS);
性能优化建议
1. 线程池大小调优
// CPU 密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
ScheduledExecutorService cpuIntensiveScheduler =
Executors.newScheduledThreadPool(cpuCores);
// I/O 密集型任务
ScheduledExecutorService ioIntensiveScheduler =
Executors.newScheduledThreadPool(cpuCores * 2);
2. 任务分组
public class TaskGroupScheduler {
private final ScheduledExecutorService fastTaskScheduler =
Executors.newScheduledThreadPool(5);
private final ScheduledExecutorService slowTaskScheduler =
Executors.newScheduledThreadPool(2);
public void scheduleFastTask(Runnable task, long period) {
fastTaskScheduler.scheduleAtFixedRate(task, 0, period, TimeUnit.SECONDS);
}
public void scheduleSlowTask(Runnable task, long period) {
slowTaskScheduler.scheduleAtFixedRate(task, 0, period, TimeUnit.MINUTES);
}
}
总结
ScheduledExecutorService
是 Java 中处理定时任务的首选工具,相比传统的 Timer
,它提供了更好的异常处理、更灵活的线程管理和更强大的功能。掌握其使用要点:
- 选择合适的方法:
schedule
用于一次性延迟执行,scheduleAtFixedRate
用于固定速率执行,scheduleWithFixedDelay
用于固定间隔执行 - 异常处理:始终在任务中捕获异常,防止调度中断
- 资源管理:及时关闭线程池,避免内存泄漏
- 监控和调优:根据任务特性选择合适的线程池大小和配置
通过合理使用 ScheduledExecutorService
,可以构建出高效、稳定的定时任务系统,满足各种业务场景的需求。