内容纲要
Java高并发完整指南
从入门到专家 - 完全掌握Java高并发开发
学习路线图
[[高并发学习路线图.svg]]
目录
核心概念
什么是高并发
高并发是指系统能够在短时间内处理大量并发请求的能力,涉及以下几个维度:
| 维度 | 说明 | 衡量指标 |
|---|---|---|
| QPS | 每秒查询率 | 请求/秒 |
| TPS | 每秒事务数 | 事务/秒 |
| RT | 响应时间 | 毫秒 |
| 并发数 | 同时处理的请求数 | 个 |
CAP理论
Consistency (一致性)
/ \
/ \
Availability Partition Tolerance
(可用性) (分区容错性)
- CP: 牺牲可用性,保证一致性(Redis、Zookeeper)
- AP: 牺牲一致性,保证可用性(Cassandra、DNS)
- CA: 理论上存在,实际网络分区不可避免
高并发三驾马车
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 缓存 │ │ 异步 │ │ 队列 │
│ Redis/MC │────│ MQ/EventBus │────│ Kafka/RQ │
└─────────────┘ └─────────────┘ └─────────────┘
Java并发基础
1. 线程创建与生命周期
// 方式1: 继承Thread(不推荐)
class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程运行");
}
}
// 方式2: 实现Runnable(推荐)
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("线程运行");
}
}
// 方式3: 使用Lambda(推荐)
Thread thread = new Thread(() -> {
System.out.println("线程运行");
});
// 方式4: 实现Callable(可获取返回结果)
Callable<String> task = () -> "返回结果";
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
String result = future.get();
线程状态转换图:
NEW
│
▼ start()
RUNNABLE
│
├───────▶ BLOCKED (等待锁)
│ │
│ ▼ 获得锁
▼ │
RUNNING ◀─┘
│
├───────▶ WAITING (wait/join/park)
│ │
│ ▼ notify/unpark
▼ │
TIMED_WAITING ◀──────┘
│
▼
TERMINATED
2. 线程通信 - wait/notify
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean condition = false;
// 等待线程
public void waitMethod() throws InterruptedException {
synchronized (lock) {
while (!condition) { // 必须用while,防止虚假唤醒
lock.wait();
}
// 执行业务逻辑
}
}
// 通知线程
public void notifyMethod() {
synchronized (lock) {
condition = true;
lock.notifyAll(); // 推荐用notifyAll,避免信号丢失
}
}
}
3. volatile关键字
// volatile特性:可见性、有序性、不保证原子性
public class VolatileExample {
private volatile boolean running = true;
private volatile int counter = 0;
// ✅ 正确:布尔值的读写是原子的
public void stop() {
running = false;
}
public void run() {
while (running) {
// ...
}
}
// ❌ 错误:volatile不保证复合操作的原子性
public void increment() {
counter++; // 不是线程安全的!
}
// ✅ 正确:使用Atomic类
private AtomicInteger atomicCounter = new AtomicInteger(0);
public void safeIncrement() {
atomicCounter.incrementAndGet();
}
}
volatile vs synchronized:
| 特性 | volatile | synchronized |
|---|---|---|
| 可见性 | ✓ | ✓ |
| 原子性 | ×(复合操作) | ✓ |
| 有序性 | ✓ | ✓ |
| 阻塞 | 不阻塞 | 阻塞 |
| 性能 | 高 | 较低 |
线程池深入
1. ThreadPoolExecutor完整参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // corePoolSize: 核心线程数(常驻)
20, // maximumPoolSize: 最大线程数
60L, // keepAliveTime: 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
new ThreadFactory() { // 线程工厂
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("worker-" + counter.incrementAndGet());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
2. 工作队列类型对比
// 1. 无界队列(不推荐,可能导致OOM)
new LinkedBlockingQueue<>() // 默认无界
// 2. 有界队列(推荐)
new LinkedBlockingQueue<>(100) // 队列容量100
new ArrayBlockingQueue<>(100)
// 3. 同步队列(直接传递,S=corePoolSize)
new SynchronousQueue<>() // 不存储任务,直接提交执行
// 4. 优先级队列
new PriorityBlockingQueue<>(11, Comparator.comparing(Task::getPriority))
线程池工作流程:
提交任务
│
▼
┌────────────────┐
│ 核心线程数满? │
└─────┬──────────┘
│ 否 │ 是
▼ ▼
创建核心线程 ┌────────────────┐
│ 工作队列满? │
└─────┬──────────┘
│ 否 │ 是
▼ ▼
放入队列 ┌────────────────┐
│ 最大线程数满? │
└─────┬──────────┘
│ 否 │ 是
▼ ▼
创建非核心线程 拒绝策略
3. 拒绝策略详解
// 1. AbortPolicy(默认):抛出RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy()
// 2. CallerRunsPolicy:由提交任务的线程执行
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. DiscardPolicy:直接丢弃任务
new ThreadPoolExecutor.DiscardPolicy()
// 4. DiscardOldestPolicy:丢弃队列中最老的任务,再尝试提交
new ThreadPoolExecutor.DiscardOldestPolicy()
// 5. 自定义拒绝策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
log.warn("任务被拒绝: {}", r);
// 尝试放入其他队列
if (!fallbackQueue.offer(r)) {
// 执行降级逻辑
fallbackExecutor.execute(() -> {
try {
r.run();
();
}
}
}
}
4. 合理的线程池配置
public class ThreadPoolConfig {
/**
* CPU密集型线程池
* 线程数 = CPU核心数 + 1
*/
public static ExecutorService cpuIntensivePool() {
int cores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cores,
cores + 1,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* IO密集型线程池
* 线程数 = CPU核心数 * 2
*/
public static ExecutorService ioIntensivePool() {
int cores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cores * 2,
cores * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* 混合型线程池
* 根据任务类型动态调整
*/
public static ExecutorService mixedPool() {
int cores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cores,
cores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
5. 线程池监控
public class ThreadPoolMonitor {
public static void monitor(ThreadPoolExecutor executor) {
// 当前线程数
int poolSize = executor.getPoolSize();
// 活跃线程数
int activeCount = executor.getActiveCount();
// 队列大小
int queueSize = executor.getQueue().size();
// 已完成任务数
long completedTaskCount = executor.getCompletedTaskCount();
// 拒绝任务数(需要自定义计数器)
long rejectedTaskCount = getRejectedCount(executor);
System.out.printf(
"PoolSize=%d, Active=%d, Queue=%d, Completed=%d, Rejected=%d%n",
poolSize, activeCount, queueSize, completedTaskCount, rejectedTaskCount
);
}
// 使用Micrometer监控
public static ThreadPoolExecutor createMonitoredPool(MeterRegistry registry) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
Gauge.builder("threadpool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
.register(registry);
Gauge.builder("threadpool.active.count", executor, ThreadPoolExecutor::getActiveCount)
.register(registry);
Gauge.builder("threadpool.queue.size", executor, e -> e.getQueue().size())
.register(registry);
return executor;
}
}
6. ForkJoinPool(工作窃取)
public class ForkJoinExample {
// 并行计算数组元素和
public static long sum(long[] array) {
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
return pool.invoke(task);
}
static class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 任务足够小,直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 任务太大,拆分
int mid = (start + end) >>> 1;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行左半部分
long rightResult = right.compute(); // 同步执行右半部分
long leftResult = left.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}
}
锁机制详解
1. synchronized关键字
// 对象锁
public class ObjectLock {
public synchronized void method1() {
// 实例方法锁,等价于 synchronized(this)
}
public void method2() {
synchronized (this) {
// 代码块锁
}
}
private final Object lock = new Object();
public void method3() {
synchronized (lock) {
// 使用独立对象锁,减少锁竞争
}
}
}
// 类锁
public class ClassLock {
public static synchronized void staticMethod() {
// 静态方法锁,等价于 synchronized(ClassLock.class)
}
public void method() {
synchronized (ClassLock.class) {
// 类级别锁
}
}
}
synchronized优化(JVM层次):
无锁 ──▶ 偏向锁 ──▶ 轻量级锁 ──▶ 重量级锁
│ │ │ │
▼ ▼ ▼ ▼
无竞争 一个线程 竞争但CAS 真正竞争
持有时间短 持有时间长
-XX:+UseBiasedLocking // 启用偏向锁(默认)
-XX:BiasedLockingStartupDelay=0 // 偏向锁启动延迟
-XX:AutoBoxCacheMax=20000 // 自动装箱缓存
2. ReentrantLock(可重入锁)
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
// 基本使用
public void method() {
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock(); // 必须在finally中释放
}
}
// 可中断的锁获取
public void interruptibleMethod() throws InterruptedException {
while (!lock.tryLock(100, TimeUnit.MILLISECONDS)) {
// 处理等待逻辑
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
}
try {
// 临界区
} finally {
lock.unlock();
}
}
// 公平锁(按请求顺序获取)
private final ReentrantLock fairLock = new ReentrantLock(true);
// Condition实现等待/通知
public void await() throws InterruptedException {
lock.lock();
try {
while (!conditionMet()) {
condition.await(); // 释放锁并等待
}
// 执行业务逻辑
} finally {
lock.unlock();
}
}
public void signal() {
lock.lock();
try {
updateCondition();
condition.signal(); // 唤醒一个等待线程
// 或 condition.signalAll(); // 唤醒所有
} finally {
lock.unlock();
}
}
// 读写锁
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
public String read() {
readLock.lock();
try {
return data;
} finally {
readLock.unlock();
}
}
public void write(String newData) {
writeLock.lock();
try {
this.data = newData;
} finally {
writeLock.unlock();
}
}
}
锁选择指南:
| 场景 | 推荐锁 | 原因 |
|---|---|---|
| 简单同步 | synchronized |
JVM自动优化,代码简洁 |
| 需要中断 | ReentrantLock |
lockInterruptibly() |
| 公平锁 | ReentrantLock(true) |
synchronized不支持 |
| 多条件 | ReentrantLock + Condition |
wait/notify只有一个条件 |
| 读多写少 | ReentrantReadWriteLock |
读写分离提升性能 |
| 高并发CAS | StampedLock |
乐观读,性能最优 |
3. StampedLock(读写锁的升级版)
public class StampedLockExample {
private final StampedLock lock = new StampedLock();
private double x, y;
// 乐观读(完全无锁)
public double distanceFromOrigin() {
long stamp = lock.tryOptimisticRead();
double currentX = x, currentY = y;
if (!lock.validate(stamp)) { // 验证戳是否有效
stamp = lock.readLock();
try {
currentX = x;
currentY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 悲观读
public void move(double deltaX, double deltaY) {
long stamp = lock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
lock.unlockWrite(stamp);
}
}
// 读锁升级为写锁
public void moveIfAtOrigin(double newX, double newY) {
long stamp = lock.readLock();
try {
while (x == 0 && y == 0) {
long ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0L) { // 升级成功
stamp = ws;
x = newX;
y = newY;
return;
}
// 升级失败,先释放读锁再获取写锁
lock.unlockRead(stamp);
stamp = lock.writeLock();
}
} finally {
lock.unlock(stamp);
}
}
}
4. 锁优化技巧
// 1. 锁粒度细化
public class FineGrainedLock {
private final Object lockA = new Object();
private final Object lockB = new Object();
private int countA = 0;
private int countB = 0;
public void incrementA() {
synchronized (lockA) { // 只锁A相关的操作
countA++;
}
}
public void incrementB() {
synchronized (lockB) { // 只锁B相关的操作
countB++;
}
}
}
// 2. 锁分离
public class ReadWriteSeparation {
private volatile String value;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读操作用读锁
public String getValue() {
lock.readLock().lock();
try {
return value;
} finally {
lock.readLock().unlock();
}
}
// 写操作用写锁
public void setValue(String newValue) {
lock.writeLock().lock();
try {
this.value = newValue;
} finally {
lock.writeLock().unlock();
}
}
}
// 3. 锁消除(JVM自动)
// 局部变量不需要锁
public void lockElimination() {
StringBuffer sb = new StringBuffer(); // 局部变量,JVM自动消除锁
sb.append("a").append("b");
}
// 4. 锁粗化(JVM自动)
// 连续的锁操作合并为一个大锁
public synchronized void lockCoarsening() {
method1(); // synchronized
method2(); // synchronized
// JVM会合并为一个锁
}
// 5. 减少锁持有时间
public class ReduceLockHoldTime {
private final Lock lock = new ReentrantLock();
public void process() {
lock.lock();
try {
// 只在必要时持有锁
updateSharedData();
} finally {
lock.unlock();
}
// 不需要锁的操作放在外面
doIndependentWork();
}
}
并发集合
1. ConcurrentHashMap详解
public class ConcurrentHashMapExample {
// Java 8+ 实现原理
// 1. Node数组 + 链表/红黑树
// 2. 只锁一个槽位(分段锁的升级)
// 3. CAS + synchronized
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 基本操作
public void basicOps() {
map.put("key1", 1);
Integer value = map.get("key1");
map.remove("key1");
// 原子操作
map.putIfAbsent("key", 1); // 不存在才put
map.computeIfAbsent("key", k -> 0); // 不存在才计算
map.computeIfPresent("key", (k, v) -> v + 1); // 存在才计算
map.compute("key", (k, v) -> v == null ? 1 : v + 1); // 总是计算
map.merge("key", 1, Integer::sum); // 合并值
map.replace("key", 1, 2); // CAS更新
}
// 批量操作
public void batchOps() {
Map<String, Integer> updates = Map.of("a", 1, "b", 2, "c", 3);
// 遍历
map.forEach((k, v) -> System.out.println(k + "=" + v));
// 搜索
String result = map.search(1, (k, v) -> v > 10 ? k : null);
// 归约
Integer sum = map.reduceValues(1, Integer::sum);
// 批量插入
map.putAll(updates);
}
}
ConcurrentHashMap演进:
| 版本 | 实现方式 | 锁粒度 |
|---|---|---|
| Java 7 | Segment分段锁 | 16个段 |
| Java 8+ | Node数组 + CAS | 单个Node |
2. 其他并发集合
// 1. ConcurrentSkipListMap - 跳表实现,有序
ConcurrentSkipListMap<String, Integer> skipMap = new ConcurrentSkipListMap<>();
skipMap.put("c", 3);
skipMap.put("a", 1);
skipMap.put("b", 2);
// 自动排序: a=1, b=2, c=3
// 2. CopyOnWriteArrayList - 写时复制
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("a"); // 写操作复制数组
String first = list.get(0); // 读操作无锁
// 适用场景:读远多于写
// 不适用:频繁写操作
// 3. ConcurrentLinkedQueue - 无界无锁队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();
queue.peek();
// 4. BlockingQueue - 阻塞队列
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(100);
// 生产者
blockingQueue.put("item"); // 队列满时阻塞
// 消费者
String item = blockingQueue.take(); // 队列空时阻塞
String item = blockingQueue.poll(1, TimeUnit.SECONDS); // 超时等待
// 5. ArrayBlockingQueue - 有界阻塞队列
BlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(10);
// 6. PriorityBlockingQueue - 优先级队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(
11, Comparator.comparingInt(Task::getPriority).reversed()
);
// 7. DelayQueue - 延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(5000, "task1")); // 5秒后执行
// 8. SynchronousQueue - 同步队列(直接传递)
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 生产者put会阻塞,直到有消费者take
// 9. ConcurrentSkipListSet - 有序并发集合
ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<>();
set.add("c");
set.add("a");
set.add("b");
// 自动排序: a, b, c
// 10. ConcurrentHashSet(需要自己实现)
Set<String> concurrentSet = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>()
);
无锁编程
1. Atomic类家族
// 1. 基本类型原子类
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet(); // i++
atomicInt.getAndIncrement(); // ++i
atomicInt.getAndAdd(10); // i += 10
atomicInt.compareAndSet(0, 1); // CAS操作
// 2. 引用类型原子类
AtomicReference<String> atomicRef = new AtomicReference<>("initial");
atomicRef.compareAndSet("initial", "updated");
// 3. 数组类型原子类
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
atomicArray.getAndIncrement(0);
// 4. 对象属性原子类
public class AtomicFieldExample {
private volatile int value;
private static final AtomicIntegerFieldUpdater<AtomicFieldExample>
UPDATER = AtomicIntegerFieldUpdater.newUpdater(
AtomicFieldExample.class, "value"
);
public void increment() {
UPDATER.getAndIncrement(this);
}
}
// 5. ABA问题及解决
// ABA问题:A -> B -> A,CAS认为没变化
AtomicStampedReference<String> stampedRef = new AtomicStampedReference<>("A", 0);
int stamp = stampedRef.getStamp();
boolean success = stampedRef.compareAndSet("A", "B", stamp, stamp + 1);
// 6. 累加器(LongAdder替代AtomicLong)
LongAdder adder = new LongAdder();
adder.increment(); // 高并发下性能更好
long sum = adder.sum(); // 获取值时合并
// 7. 累加器组
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
accumulator.accumulate(10);
2. CAS原理与实现
// CAS(Compare-And-Swap)底层实现
public class CAS {
// 简化的CAS实现逻辑
public final boolean compareAndSwap(Object obj, long offset,
int expected, int update) {
// 原子操作:如果obj的offset处值==expected,则设为update
// 返回是否成功
return unsafe.compareAndSwapInt(obj, offset, expected, update);
}
}
// 自旋锁实现
public class SpinLock {
private final AtomicReference<Thread> owner = new AtomicReference<>();
public void lock() {
Thread current = Thread.currentThread();
// CAS自旋
while (!owner.compareAndSet(null, current)) {
// 可以加入yield或park避免CPU空转
Thread.yield();
// 或 LockSupport.parkNanos(1000);
}
}
public void unlock() {
Thread current = Thread.currentThread();
owner.compareAndSet(current, null);
}
}
3. 并发工具类
// 1. CountDownLatch - 倒计时门闩
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 等待3个任务
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 执行任务
doWork();
} finally {
latch.countDown(); // 完成一个任务
}
}).start();
}
latch.await(); // 等待所有任务完成
System.out.println("所有任务完成");
}
}
// 2. CyclicBarrier - 循环栅栏
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有线程到达,开始下一轮");
});
for (int i = 0; i < parties; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("线程" + id + "到达");
barrier.await(); // 等待其他线程
// 执行并行任务
doParallelWork();
barrier.await(); // 第二次汇聚
System.out.println("线程" + id + "完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
// 3. Semaphore - 信号量
public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(5); // 最多5个并发
public void accessResource() throws InterruptedException {
semaphore.acquire(); // 获取许可
try {
// 访问受限资源
doWork();
} finally {
semaphore.release(); // 释放许可
}
}
// 超时获取
public boolean accessWithTimeout() throws InterruptedException {
if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
try {
doWork();
return true;
} finally {
semaphore.release();
}
}
return false;
}
}
// 4. Exchanger - 交换数据
public Exchanger<String> exchanger = new Exchanger<>();
// 线程1
String data1 = "from-thread-1";
String received1 = exchanger.exchange(data1);
// 线程2
String data2 = "from-thread-2";
String received2 = exchanger.exchange(data2); // 得到data1
// 5. Phaser - 阶段同步(CyclicBarrier升级版)
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser();
// 动态注册参与方
phaser.register();
new Thread(() -> {
int phase = phaser.arriveAndAwaitAdvance(); // 到达并等待
System.out.println("阶段" + phase + "完成");
}).start();
}
}
// 6. CompletableFuture - 异步编程
public class CompletableFutureExample {
public static void main(String[] args) {
// 1. 创建异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "World";
});
// 2. 组合任务
CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> {
return a + " " + b;
});
// 3. 等待结果
combined.thenAccept(result -> System.out.println(result));
// 4. 异常处理
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Error");
}).exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return "default";
});
// 5. 超时控制
try {
String result = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000);
return "done";
}).get(3, TimeUnit.SECONDS); // 3秒超时
} catch (TimeoutException e) {
System.out.println("超时");
}
// 6. 批量任务
List<CompletableFuture<String>> futures = List.of(
CompletableFuture.supplyAsync(() -> "a"),
CompletableFuture.supplyAsync(() -> "b"),
CompletableFuture.supplyAsync(() -> "c")
);
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.join(); // 等待所有完成
// 7. 任一完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
futures.toArray(new CompletableFuture[0])
);
Object firstResult = anyOf.join();
}
}
JMM内存模型
1. JMM核心概念
┌─────────────┐ ┌─────────────┐
│ Thread 1 │ │ Thread 2 │
│ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Local │ │ │ │ Local │ │
│ │ Memory │ │ │ │ Memory │ │
│ └────┬────┘ │ │ └────┬────┘ │
└──────┼──────┘ └──────┼──────┘
│ │
Read/Write Read/Write
│ │
▼ ▼
┌─────────────────────────────────────┐
│ Main Memory (堆) │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ 共享 │ │ 共享 │ │ 共享 │ │
│ │ 变量 │ │ 变量 │ │ 变量 │ │
│ └──────┘ └──────┘ └──────┘ │
└─────────────────────────────────────┘
2. happens-before规则
/**
* happens-before规则:
* 1. 程序顺序规则:单线程内,代码顺序在前的happens-before在后的
* 2. 监视器锁规则:unlock happens before 同一个锁的lock
* 3. volatile规则:volatile写 happens before volatile读
* 4. 线程启动规则:Thread.start() happens before 子线程第一条指令
* 5. 线程终止规则:子线程结束 happens before 父线程Thread.join()
* 6. 线程中断规则:Thread.interrupt() happens before 检测到中断
* 7. 对象终结规则:构造函数结束 happens before finalize()
* 8. 传递性:A happens before B, B happens before C => A happens before C
*/
// 示例1:volatile保证可见性
public class VolatileVisibility {
private volatile boolean flag = false;
public void writer() {
flag = true; // volatile写
}
public void reader() {
while (!flag) { // volatile读,能保证看到writer的修改
// ...
}
}
}
// 示例2:final域保证可见性
public class FinalFieldExample {
private final int x;
private final int y;
public FinalFieldExample(int x, int y) {
this.x = x; // final域必须在构造函数中初始化
this.y = y;
}
// 引用对象:final引用必须保证对象不逃逸
private final Reference ref;
private final int value;
public FinalFieldExample() {
this.ref = new Reference();
this.value = 42; // 不可见性危险!
// ref和value的初始化可能重排序
}
// 正确做法:不要在构造函数中让this引用逃逸
public FinalFieldExample safe() {
FinalFieldExample instance = new FinalFieldExample();
safePublish(instance);
return instance;
}
}
3. 内存屏障
// JVM会插入内存屏障来禁止重排序
// 1. StoreStore屏障:禁止store1重排序到store2
volatile x = 1;
// StoreStore屏障
y = 2;
// 2. StoreLoad屏障:禁止store重排序到load
volatile x = 1;
// StoreLoad屏障(最昂贵)
int a = y;
// 3. LoadLoad屏障:禁止load1重排序到load2
int a = x;
// LoadLoad屏障
int b = y;
// 4. LoadStore屏障:禁止load重排序到store
int a = x;
// LoadStore屏障
y = 1;
4. 双重检查锁(DCL)
// 正确的DCL实现
public class Singleton {
private static volatile Singleton instance; // 必须用volatile
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton();
}
}
}
return instance;
}
/**
* 为什么需要volatile?
* 1. new Singleton()分为三步:
* a. 分配内存
* b. 初始化对象
* c. 引用指向内存
* 2. 可能重排序为: a -> c -> b
* 3. 线程1执行a->c,线程2看到instance!=null,拿到未初始化对象
* 4. volatile禁止重排序,保证b在c之前执行
*/
}
// Java 5+可以使用内部类Holder(推荐)
public class Singleton {
private Singleton() {}
private static class Holder {
static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
}
异步编程
1. CompletableFuture深度使用
public class AsyncProgramming {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
// 1. 链式调用
public CompletableFuture<String> chainExample() {
return CompletableFuture.supplyAsync(() -> fetchUser(), executor)
.thenApply(user -> transformUser(user)) // 同步转换
.thenCompose(user -> fetchOrders(user)) // 异步转换
.thenApply(orders -> formatResult(orders))
.whenComplete((result, ex) -> { // 完成回调
if (ex != null) {
log.error("Error", ex);
} else {
log.info("Success: {}", result);
}
});
}
// 2. 并行执行
public CompletableFuture<Result> parallelExample() {
CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA());
CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB());
CompletableFuture<C> futureC = CompletableFuture.supplyAsync(() -> doC());
return CompletableFuture.allOf(futureA, futureB, futureC)
.thenApply(v -> combine(
futureA.join(),
futureB.join(),
futureC.join()
));
}
// 3. 超时控制
public CompletableFuture<String> withTimeout(CompletableFuture<String> future,
long timeout, TimeUnit unit) {
CompletableFuture<String> result = new CompletableFuture<>();
future.whenComplete((value, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
result.complete(value);
}
});
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
result.completeExceptionally(new TimeoutException());
}, timeout, unit);
return result;
}
// 4. 重试机制
public CompletableFuture<String> withRetry(Supplier<String> task,
int maxRetries) {
CompletableFuture<String> future = new CompletableFuture<>();
retry(task, 0, maxRetries, future);
return future;
}
private void retry(Supplier<String> task, int attempt, int maxRetries,
CompletableFuture<String> future) {
CompletableFuture.supplyAsync(task)
.whenComplete((result, ex) -> {
if (ex == null) {
future.complete(result);
} else if (attempt < maxRetries) {
retry(task, attempt + 1, maxRetries, future);
} else {
future.completeExceptionally(ex);
}
});
}
// 5. 批量处理
public CompletableFuture<Void> batchProcess(List<String> items,
Function<String, String> processor) {
List<CompletableFuture<String>> futures = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> processor.apply(item), executor))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
// 6. 节流(限流)
public CompletableFuture<String> throttledExecute(Runnable task,
RateLimiter limiter) {
if (limiter.tryAcquire()) {
return CompletableFuture.runAsync(task, executor)
.thenApply(v -> "success");
} else {
return CompletableFuture.failedFuture(
new RateLimitExceededException()
);
}
}
// 7. 断路器
public CompletableFuture<String> circuitBreakerExecute(
Supplier<String> supplier, CircuitBreaker cb) {
if (cb.isOpen()) {
return CompletableFuture.failedFuture(
new CircuitBreakerOpenException()
);
}
return CompletableFuture.supplyAsync(supplier, executor)
.whenComplete((result, ex) -> {
if (ex != null) {
cb.recordFailure();
} else {
cb.recordSuccess();
}
});
}
}
2. 响应式编程(Reactor)
// 使用Project Reactor
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactiveExample {
// 1. 创建Mono
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException());
// 2. 创建Flux
Flux<String> flux = Flux.just("a", "b", "c");
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// 3. 操作符
Flux<String> processed = flux
.filter(s -> s.startsWith("a")) // 过滤
.map(String::toUpperCase) // 转换
.flatMap(this::asyncOperation) // 异步转换
.distinct() // 去重
.take(5) // 限制数量
.timeout(Duration.ofSeconds(5)); // 超时
// 4. 订阅
processed.subscribe(
value -> System.out.println(value), // onNext
error -> error.printStackTrace(), // onError
() -> System.out.println("Done") // onComplete
);
// 5. 背压
Flux<Integer> source = Flux.range(1, 1000);
source.onBackpressureBuffer(100) // 缓冲100个
.onBackpressureDrop() // 丢弃多余的
.onBackpressureLatest() // 只保留最新的
.subscribe(value -> process(value));
// 6. 调度器(线程切换)
Flux.just(1, 2, 3)
.subscribeOn(Schedulers.parallel()) // 订阅在并行线程
.publishOn(Schedulers.single()) // 发布在单线程
.subscribe(System.out::println);
}
性能优化
1. JVM调优
# JVM参数参考
# 1. 内存设置
-Xms4g # 初始堆大小
-Xmx4g # 最大堆大小(建议Xms=Xmx)
-Xmn1g # 新生代大小
-XX:NewRatio=2 # 新生代/老年代比例
-XX:SurvivorRatio=8 # Eden/Survivor比例
# 2. GC选择
-XX:+UseG1GC # 使用G1GC(推荐)
-XX:MaxGCPauseMillis=200 # 最大GC停顿时间
-XX:G1HeapRegionSize=16m # G1 Region大小
# 或使用ZGC(Java 15+)
-XX:+UseZGC
# 或使用ShenandoahGC
-XX:+UseShenandoahGC
# 3. 线程相关
-XX:+UseBiasedLocking # 偏向锁(默认)
-XX:AutoBoxCacheMax=20000 # Integer缓存范围
# 4. JIT编译
-XX:+TieredCompilation # 分层编译(默认)
-XX:CompileThreshold=1000 # C1编译阈值
-XX:CICompilerCount=4 # 编译线程数
# 5. 其他优化
-XX:+UseStringDeduplication # 字符串去重
-XX:+UseCompressedOops # 压缩指针(64位)
-XX:+UseCompressedClassPointers # 压缩类指针
-Djava.awt.headless=true # 无头模式
2. 性能分析工具
# 1. Jprofiler
java -agentpath:/path/libjprofilerti.so=port=8849 -jar app.jar
# 2. VisualVM
jvisualvm
# 3. Arthas(阿里开源,推荐)
# 下载
wget https://arthas.aliyun.com/arthas-boot.jar
# 启动
java -jar arthas-boot.jar
# 常用命令
trace com.example.Service method # 追踪方法
watch com.example.Service method returnObj # 监控返回值
thread # 查看线程
dump # dump堆
sysprop # 查看系统属性
# 4. JFR(JDK自带)
# 开始录制
jcmd <pid> JFR.start name=recording duration=60s filename=recording.jfr
# 分析
jfr print --events gc.heap.summary recording.jfr
# 5. Async-profiler(火焰图)
# Linux下
./profiler.sh -d 30 -f flame.svg <pid>
3. 性能优化清单
public class PerformanceOptimization {
// 1. 避免频繁创建对象
public class ObjectPool {
private final Queue<MyObject> pool = new ConcurrentLinkedQueue<>();
public MyObject borrow() {
MyObject obj = pool.poll();
return obj != null ? obj : new MyObject();
}
public void returnObject(MyObject obj) {
obj.reset(); // 重置对象状态
pool.offer(obj);
}
}
// 2. 使用基本类型替代包装类
// Bad
List<Integer> list = new ArrayList<>();
// Good
int[] array = new int[100];
// 3. 预分配集合大小
// Bad
List<String> list = new ArrayList<>(); // 默认10
// Good
List<String> list = new ArrayList<>(1000);
// 4. 使用StringBuilder拼接字符串
// Bad
String result = "";
for (String s : strings) {
result += s; // 每次创建新对象
}
// Good
StringBuilder sb = new StringBuilder();
for (String s : strings) {
sb.append(s);
}
String result = sb.toString();
// 5. 减少方法调用
// Bad
public int sum(int[] arr) {
int sum = 0;
for (int i = 0; i < arr.length; i++) {
sum += arr[i];
}
return sum;
}
// Good(循环展开)
public int sumOptimized(int[] arr) {
int sum = 0;
int i = 0;
for (; i <= arr.length - 4; i += 4) {
sum += arr[i] + arr[i + 1] + arr[i + 2] + arr[i + 3];
}
for (; i < arr.length; i++) {
sum += arr[i];
}
return sum;
}
// 6. 使用缓存
private static final Map<String, Integer> CACHE = new ConcurrentHashMap<>();
public Integer compute(String key) {
return CACHE.computeIfAbsent(key, this::expensiveComputation);
}
// 7. 批量操作
// Bad
for (String id : ids) {
db.update(id); // N次IO
}
// Good
db.batchUpdate(ids); // 1次IO
// 8. 使用无锁数据结构
// Bad
private int counter;
public void increment() {
synchronized (this) {
counter++;
}
}
// Good
private AtomicInteger counter = new AtomicInteger();
public void increment() {
counter.incrementAndGet();
}
// 9. 异步非阻塞
// Bad
public void process() {
String result1 = remoteCall1(); // 阻塞
String result2 = remoteCall2(); // 阻塞
process(result1, result2);
}
// Good
public CompletableFuture<Void> processAsync() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::remoteCall1);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::remoteCall2);
return CompletableFuture.allOf(f1, f2)
.thenRun(() -> process(f1.join(), f2.join()));
}
// 10. 连接池
// Bad
public void doQuery() {
Connection conn = DriverManager.getConnection(url); // 每次新建
// ...
conn.close();
}
// Good
private final DataSource dataSource = createDataSource();
public void doQuery() {
try (Connection conn = dataSource.getConnection()) {
// ...
}
}
}
分布式高并发
1. 分布式锁
// Redis分布式锁(Redisson)
import org.redisson.Redisson;
import org.redisson.api.RLock;
public class DistributedLockExample {
private final RedissonClient redisson = Redisson.create(config);
public void doWithLock() {
RLock lock = redisson.getLock("myLock");
try {
// 尝试获取锁,最多等待10秒,锁30秒后自动释放
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (acquired) {
try {
// 执行业务逻辑
doWork();
} finally {
lock.unlock();
}
} else {
// 获取锁失败
handleFailure();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 读写锁
public void readWriteLock() {
RReadWriteLock rwLock = redisson.getReadWriteLock("rwLock");
RLock rLock = rwLock.readLock();
RLock wLock = rwLock.writeLock();
// 读锁
rLock.lock();
try {
// 读操作
} finally {
rLock.unlock();
}
// 写锁
wLock.lock();
try {
// 写操作
} finally {
wLock.unlock();
}
}
}
2. 分布式缓存
// Spring Cache + Redis
@Service
public class CacheService {
@Cacheable(value = "user", key = "#id")
public User getUser(Long id) {
return userRepository.findById(id);
}
@CachePut(value = "user", key = "#user.id")
public User updateUser(User user) {
return userRepository.save(user);
}
@CacheEvict(value = "user", key = "#id")
public void deleteUser(Long id) {
userRepository.deleteById(id);
}
// 批量清除
@CacheEvict(value = "user", allEntries = true)
public void clearCache() {}
}
// Redis配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// Key序列化
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
// Value序列化
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30)) // 30分钟过期
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}
3. 消息队列
// Kafka生产者
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String key, String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(
topic, key, message
);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送失败", ex);
} else {
log.info("发送成功: offset={}", result.getRecordMetadata().offset());
}
});
}
// 批量发送
public void sendBatch(String topic, List<String> messages) {
for (String message : messages) {
kafkaTemplate.send(topic, message);
}
kafkaTemplate.flush();
}
}
// Kafka消费者
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
try {
processMessage(message);
} catch (Exception e) {
log.error("处理失败", e);
throw e; // 抛出异常让Kafka重试
}
}
// 批量消费
@KafkaListener(
topics = "my-topic",
groupId = "my-group",
batch = "true"
)
public void consumeBatch(List<String> messages) {
for (String message : messages) {
processMessage(message);
}
}
}
4. 限流
// Guava RateLimiter
public class RateLimiterExample {
private final RateLimiter rateLimiter = RateLimiter.create(100.0); // 100 QPS
public void execute() {
if (rateLimiter.tryAcquire()) {
doWork();
} else {
handleRateLimit();
}
}
}
// 滑动窗口限流
public class SlidingWindowRateLimiter {
private final ConcurrentNavigableMap<Long, AtomicInteger> window = new ConcurrentSkipListMap<>();
private final int maxRequests;
private final long windowSizeMillis;
public SlidingWindowRateLimiter(int maxRequests, long windowSizeMillis) {
this.maxRequests = maxRequests;
this.windowSizeMillis = windowSizeMillis;
}
public boolean tryAcquire() {
long now = System.currentTimeMillis();
long start = now - windowSizeMillis;
// 清理过期数据
window.headMap(start, false).clear();
// 检查是否超限
if (window.size() >= maxRequests) {
return false;
}
// 记录当前请求
window.compute(now, (k, v) -> v == null ? new AtomicInteger(1) : new AtomicInteger(v.incrementAndGet()));
return true;
}
}
// Redis + Lua实现分布式限流
public class RedisRateLimiter {
private final StringRedisTemplate redisTemplate;
private static final String LUA_SCRIPT =
"local key = KEYS[1]\n" +
"local limit = tonumber(ARGV[1])\n" +
"local expire = tonumber(ARGV[2])\n" +
"local current = redis.call('incr', key)\n" +
"if current == 1 then\n" +
" redis.call('expire', key, expire)\n" +
"end\n" +
"if current > limit then\n" +
" return 0\n" +
"else\n" +
" return 1\n" +
"end";
public boolean tryAcquire(String key, int limit, long expireSeconds) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
Long result = redisTemplate.execute(script,
Collections.singletonList(key),
String.valueOf(limit),
String.valueOf(expireSeconds)
);
return result != null && result == 1;
}
}
实战案例
案例1:高并发计数器
public class HighPerformanceCounter {
// 方案1:LongAdder(高并发写场景)
private final LongAdder counter = new LongAdder();
public void increment() {
counter.increment();
}
public long get() {
return counter.sum();
}
// 方案2:分段计数(避免热点竞争)
private final int segments = 16;
private final LongAdder[] counters = new LongAdder[segments];
public SegmentedCounter() {
for (int i = 0; i < segments; i++) {
counters[i] = new LongAdder();
}
}
public void increment() {
int index = Thread.currentThread().hashCode() & (segments - 1);
counters[index].increment();
}
public long get() {
long sum = 0;
for (LongAdder counter : counters) {
sum += counter.sum();
}
return sum;
}
// 方案3:Redis计数器(分布式场景)
public class RedisCounter {
private final StringRedisTemplate redisTemplate;
public long increment(String key) {
return redisTemplate.opsForValue().increment(key);
}
public long get(String key) {
String value = redisTemplate.opsForValue().get(key);
return value != null ? Long.parseLong(value) : 0;
}
}
}
案例2:高并发缓存
public class HighPerformanceCache<K, V> {
private final ConcurrentHashMap<K, CompletableFuture<V>> cache = new ConcurrentHashMap<>();
private final Function<K, V> loader;
private final Duration expireAfter;
public HighPerformanceCache(Function<K, V> loader, Duration expireAfter) {
this.loader = loader;
this.expireAfter = expireAfter;
}
public V get(K key) {
CompletableFuture<V> future = cache.get(key);
if (future == null) {
CompletableFuture<V> newFuture = CompletableFuture.supplyAsync(() -> {
try {
return loader.apply(key);
} catch (Exception e) {
cache.remove(key, newFuture);
throw e;
}
});
future = cache.putIfAbsent(key, newFuture);
if (future == null) {
future = newFuture;
}
}
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
// 使用示例
public static void main(String[] args) {
HighPerformanceCache<String, User> userCache = new HighPerformanceCache<>(
userId -> userRepository.findById(userId), // 加载函数
Duration.ofMinutes(10) // 过期时间
);
User user = userCache.get("user-123");
}
}
案例3:高并发限流器
public class AdvancedRateLimiter {
private final int qps;
private final Semaphore semaphore;
private final AtomicInteger currentRequests = new AtomicInteger(0);
private final ScheduledExecutorService scheduler;
public AdvancedRateLimiter(int qps) {
this.qps = qps;
this.semaphore = new Semaphore(qps);
this.scheduler = Executors.newScheduledThreadPool(1);
// scheduler.scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
if (semaphore.tryAcquire()) {
currentRequests.incrementAndGet();
return true;
}
return false;
}
public void acquire() throws InterruptedException {
semaphore.acquire();
currentRequests.incrementAndGet();
}
public void release() {
semaphore.release();
currentRequests.decrementAndGet();
}
public int getCurrentRequests() {
return currentRequests.get();
}
// Token Bucket算法
public static class TokenBucket {
private final long capacity;
private final long refillRate; // tokens per second
private long tokens;
private long lastRefillTime;
public TokenBucket(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = capacity;
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized boolean tryConsume(int tokens) {
refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
private void refill() {
long now = System.currentTimeMillis();
long elapsed = (now - lastRefillTime) / 1000;
if (elapsed > 0) {
long newTokens = Math.min(capacity, this.tokens + elapsed * refillRate);
this.tokens = newTokens;
lastRefillTime = now;
}
}
}
}
案例4:高并发秒杀
@Service
public class SeckillService {
private final RedissonClient redisson;
private final StringRedisTemplate redisTemplate;
private final OrderService orderService;
/**
* 秒杀流程:
* 1. 预热:将库存加载到Redis
* 2. 校验:检查用户是否已购买、活动是否有效
* 3. 扣减:Redis原子扣减库存
* 4. 下单:异步创建订单
* 5. 回滚:如果下单失败,恢复库存
*/
@Transactional
public SeckillResult seckill(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userKey = "seckill:user:" + userId + ":" + productId;
String lockKey = "seckill:lock:" + productId;
// 1. 检查用户是否已购买
if (Boolean.TRUE.equals(redisTemplate.hasKey(userKey))) {
return SeckillResult.failure("已购买");
}
// 2. 分布式锁
RLock lock = redisson.getLock(lockKey);
boolean locked = false;
try {
locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
if (!locked) {
return SeckillResult.failure("系统繁忙");
}
// 3. 扣减库存
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 恢复库存
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.failure("库存不足");
}
// 4. 标记用户已购买
redisTemplate.opsForValue().set(userKey, "1", Duration.ofDays(1));
// 5. 异步下单
CompletableFuture.runAsync(() -> {
try {
orderService.createOrder(userId, productId);
} catch (Exception e) {
log.error("下单失败,回滚库存", e);
redisTemplate.opsForValue().increment(stockKey);
redisTemplate.delete(userKey);
}
});
return SeckillResult.success();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return SeckillResult.failure("系统繁忙");
} finally {
if (locked) {
lock.unlock();
}
}
}
/**
* 预热库存
*/
public void warmUp(Long productId, int stock) {
String stockKey = "seckill:stock:" + productId;
redisTemplate.opsForValue().set(stockKey, String.valueOf(stock));
}
/**
* 本地库存缓存 + Redis最终一致性
*/
public static class LocalCacheSeckillService {
private final Map<Long, AtomicInteger> localStock = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
private final AsyncStockUpdater asyncUpdater;
public SeckillResult seckillWithLocalCache(Long productId) {
AtomicInteger stock = localStock.get(productId);
// 先扣本地库存
if (stock == null || stock.decrementAndGet() < 0) {
return SeckillResult.failure("库存不足");
}
try {
// 异步更新Redis
asyncUpdater.decrementAsync(productId);
// 创建订单
// ...
return SeckillResult.success();
} catch (Exception e) {
// 失败恢复本地库存
stock.incrementAndGet();
return SeckillResult.failure("系统繁忙");
}
}
}
}
面试题库
基础题
Q1: 线程和进程的区别?
- 进程是资源分配的基本单位,线程是CPU调度的基本单位
- 进程间独立,线程共享进程资源
- 进程间通信复杂,线程间通信简单
- 线程上下文切换开销小
Q2: synchronized和ReentrantLock的区别?
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 实现 | JVM层面 | API层面 |
| 可中断 | × | ✓ |
| 公平锁 | × | ✓ |
| 多条件 | × | ✓ |
| 手动释放 | 自动 | 必须手动 |
Q3: volatile的作用?
- 保证可见性:写操作立即刷新到主内存
- 保证有序性:禁止指令重排序
- 不保证原子性:复合操作不安全
Q4: 线程池参数含义?
- corePoolSize:核心线程数(常驻)
- maximumPoolSize:最大线程数
- keepAliveTime:空闲线程存活时间
- workQueue:工作队列
- threadFactory:线程工厂
- handler:拒绝策略
Q5: HashMap和ConcurrentHashMap的区别?
- HashMap非线程安全,ConcurrentHashMap线程安全
- Java 7:ConcurrentHashMap分段锁(16段)
- Java 8+:ConcurrentHashMap使用CAS+synchronized,锁粒度到Node级别
进阶题
Q6: AQS原理?
AQS(AbstractQueuedSynchronizer)核心:
1. state变量(volatile int):表示同步状态
2. CLH队列:等待线程队列
3. 独占/共享模式
4. acquire/release:获取/释放状态
模板方法模式:
- tryAcquire:尝试获取(子类实现)
- addWaiter:加入等待队列
- acquireQueued:自旋等待
Q7: 线程池工作流程?
- 提交任务
- 核心线程数未满 → 创建核心线程
- 核心线程数已满,队列未满 → 放入队列
- 队列已满,最大线程数未满 → 创建非核心线程
- 最大线程数已满 → 执行拒绝策略
Q8: CAS和ABA问题?
- CAS(Compare-And-Swap):原子比较并交换
- ABA问题:A→B→A,CAS认为没变化
- 解决:加版本号(AtomicStampedReference)
Q9: Java内存模型happens-before规则?
- 程序顺序规则
- 监视器锁规则
- volatile规则
- 线程启动/终止规则
- 传递性
Q10: ThreadLocal原理?
- 每个线程维护ThreadLocalMap
- ThreadLocalMap的key是ThreadLocal(弱引用)
- value是存储值(强引用)
- 内存泄漏:key为null时value无法回收
高频题
Q11: 如何实现一个线程安全的单例?
// 方式1:双重检查锁
public class Singleton {
private static volatile Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
// 方式2:静态内部类(推荐)
public class Singleton {
private Singleton() {}
private static class Holder {
static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
}
// 方式3:枚举(最佳)
public enum Singleton {
INSTANCE;
public void doWork() {}
}
Q12: 高并发场景如何优化?
- 缓存:多级缓存(本地+Redis)
- 异步:消息队列、CompletableFuture
- 无锁:CAS、Atomic类
- 连接池:数据库、HTTP、Redis
- 限流:令牌桶、滑动窗口
- 降级:熔断、兜底数据
| Q13: 分布式事务解决方案? | 方案 | 优点 | 缺点 |
|---|---|---|---|
| 2PC | 强一致性 | 性能差,单点故障 | |
| TCC | 性能好 | 实现复杂 | |
| Saga | 灵活 | 补偿逻辑复杂 | |
| Seata | 使用简单 | 性能相对低 |
Q14: 如何解决热点Key问题?
- 缓存分片:均匀分散到不同Redis实例
- 本地缓存:减轻Redis压力
- 互斥锁:防止缓存击穿
- 布隆过滤器:快速判断是否存在
Q15: 消息队列如何保证消息不丢失?
生产者:
- 使用confirm机制
- 消息持久化
Broker:
- 消息持久化
- 多副本同步
消费者:
- 手动ACK
- 死信队列
学习路线
第一阶段:基础(2周)
┌─────────────────────────────────────────┐
│ 第1周:并发基础 │
├─────────────────────────────────────────┤
│ □ 线程创建与生命周期 │
│ □ wait/notify/notifyAll │
│ □ synchronized关键字 │
│ □ volatile关键字 │
│ □ ThreadLocal │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 第2周:并发工具 │
├─────────────────────────────────────────┤
│ □ AtomicInteger等Atomic类 │
│ □ CountDownLatch │
│ □ CyclicBarrier │
│ □ Semaphore │
│ □ Exchanger │
└─────────────────────────────────────────┘
第二阶段:进阶(3周)
┌─────────────────────────────────────────┐
│ 第3-4周:锁与并发集合 │
├─────────────────────────────────────────┤
│ □ ReentrantLock详细使用 │
│ □ ReadWriteLock │
│ □ StampedLock │
│ □ ConcurrentHashMap │
│ □ ConcurrentLinkedQueue │
│ □ CopyOnWriteArrayList │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 第5周:线程池与异步 │
├─────────────────────────────────────────┤
│ □ ThreadPoolExecutor参数详解 │
│ □ ForkJoinPool │
│ □ CompletableFuture │
│ □ JMM内存模型 │
└─────────────────────────────────────────┘
第三阶段:高级(4周)
┌─────────────────────────────────────────┐
│ 第6-7周:分布式并发 │
├─────────────────────────────────────────┤
│ □ Redis分布式锁 │
│ □ Redis缓存 │
│ □ 消息队列(Kafka/RabbitMQ) │
│ □ 分布式限流 │
│ □ 分布式事务 │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 第8-9周:性能优化 │
├─────────────────────────────────────────┤
│ □ JVM调优 │
│ □ 性能分析工具 │
│ □ 无锁编程 │
│ □ 响应式编程 │
│ □ 高并发架构设计 │
└─────────────────────────────────────────┘
第四阶段:实战(持续)
┌─────────────────────────────────────────┐
│ 实践项目 │
├─────────────────────────────────────────┤
│ □ 实现一个高性能计数器 │
│ □ 实现一个分布式缓存 │
│ □ 实现一个限流器 │
│ □ 实现一个秒杀系统 │
│ □ 阅读开源源码(Netty、Disruptor等) │
└─────────────────────────────────────────┘
推荐资源
书籍
- 《Java并发编程实战》- 必读
- 《Java并发编程的艺术》
- 《深入理解Java虚拟机》
- 《凤凰架构:构建可靠的大型分布式系统》
文档
源码
- Doug Lea的并发库
- Netty
- Disruptor
- Redisson
工具
- JConsole / VisualVM
- Arthas
- JProfiler
- Async-profiler
文档版本: 1.0
最后更新: 2026-01-19