Java高并发完整指南

内容纲要

Java高并发完整指南

从入门到专家 - 完全掌握Java高并发开发

学习路线图

[[高并发学习路线图.svg]]


目录

  1. 核心概念
  2. Java并发基础
  3. 线程池深入
  4. 锁机制详解
  5. 并发集合
  6. 无锁编程
  7. JMM内存模型
  8. 异步编程
  9. 性能优化
  10. 分布式高并发
  11. 实战案例
  12. 面试题库
  13. 学习路线

核心概念

什么是高并发

高并发是指系统能够在短时间内处理大量并发请求的能力,涉及以下几个维度:

维度 说明 衡量指标
QPS 每秒查询率 请求/秒
TPS 每秒事务数 事务/秒
RT 响应时间 毫秒
并发数 同时处理的请求数

CAP理论

        Consistency (一致性)
              /     \
             /       \
      Availability   Partition Tolerance
      (可用性)        (分区容错性)
  • CP: 牺牲可用性,保证一致性(Redis、Zookeeper)
  • AP: 牺牲一致性,保证可用性(Cassandra、DNS)
  • CA: 理论上存在,实际网络分区不可避免

高并发三驾马车

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   缓存      │    │   异步      │    │   队列      │
│  Redis/MC   │────│ MQ/EventBus │────│  Kafka/RQ   │
└─────────────┘    └─────────────┘    └─────────────┘

Java并发基础

1. 线程创建与生命周期

// 方式1: 继承Thread(不推荐)
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程运行");
    }
}

// 方式2: 实现Runnable(推荐)
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("线程运行");
    }
}

// 方式3: 使用Lambda(推荐)
Thread thread = new Thread(() -> {
    System.out.println("线程运行");
});

// 方式4: 实现Callable(可获取返回结果)
Callable<String> task = () -> "返回结果";
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
String result = future.get();

线程状态转换图

   NEW
     │
     ▼ start()
   RUNNABLE
     │
     ├───────▶ BLOCKED (等待锁)
     │         │
     │         ▼ 获得锁
     ▼         │
   RUNNING  ◀─┘
     │
     ├───────▶ WAITING (wait/join/park)
     │         │
     │         ▼ notify/unpark
     ▼         │
   TIMED_WAITING ◀──────┘
     │
     ▼
   TERMINATED

2. 线程通信 - wait/notify

public class WaitNotifyExample {
    private final Object lock = new Object();
    private boolean condition = false;

    // 等待线程
    public void waitMethod() throws InterruptedException {
        synchronized (lock) {
            while (!condition) {  // 必须用while,防止虚假唤醒
                lock.wait();
            }
            // 执行业务逻辑
        }
    }

    // 通知线程
    public void notifyMethod() {
        synchronized (lock) {
            condition = true;
            lock.notifyAll();  // 推荐用notifyAll,避免信号丢失
        }
    }
}

3. volatile关键字

// volatile特性:可见性、有序性、不保证原子性
public class VolatileExample {
    private volatile boolean running = true;
    private volatile int counter = 0;

    // ✅ 正确:布尔值的读写是原子的
    public void stop() {
        running = false;
    }

    public void run() {
        while (running) {
            // ...
        }
    }

    // ❌ 错误:volatile不保证复合操作的原子性
    public void increment() {
        counter++;  // 不是线程安全的!
    }

    // ✅ 正确:使用Atomic类
    private AtomicInteger atomicCounter = new AtomicInteger(0);
    public void safeIncrement() {
        atomicCounter.incrementAndGet();
    }
}

volatile vs synchronized

特性 volatile synchronized
可见性
原子性 ×(复合操作)
有序性
阻塞 不阻塞 阻塞
性能 较低

线程池深入

1. ThreadPoolExecutor完整参数

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10,                      // corePoolSize: 核心线程数(常驻)
    20,                      // maximumPoolSize: 最大线程数
    60L,                     // keepAliveTime: 空闲线程存活时间
    TimeUnit.SECONDS,        // 时间单位
    new LinkedBlockingQueue<>(100),  // 工作队列
    new ThreadFactory() {     // 线程工厂
        private final AtomicInteger counter = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("worker-" + counter.incrementAndGet());
            thread.setDaemon(false);
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

2. 工作队列类型对比

// 1. 无界队列(不推荐,可能导致OOM)
new LinkedBlockingQueue<>()  // 默认无界

// 2. 有界队列(推荐)
new LinkedBlockingQueue<>(100)  // 队列容量100
new ArrayBlockingQueue<>(100)

// 3. 同步队列(直接传递,S=corePoolSize)
new SynchronousQueue<>()  // 不存储任务,直接提交执行

// 4. 优先级队列
new PriorityBlockingQueue<>(11, Comparator.comparing(Task::getPriority))

线程池工作流程

          提交任务
             │
             ▼
    ┌────────────────┐
    │ 核心线程数满? │
    └─────┬──────────┘
          │ 否                 │ 是
          ▼                    ▼
    创建核心线程      ┌────────────────┐
                      │ 工作队列满?  │
                      └─────┬──────────┘
                            │ 否              │ 是
                            ▼                 ▼
                      放入队列    ┌────────────────┐
                                 │ 最大线程数满? │
                                 └─────┬──────────┘
                                       │ 否        │ 是
                                       ▼           ▼
                              创建非核心线程  拒绝策略

3. 拒绝策略详解

// 1. AbortPolicy(默认):抛出RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy()

// 2. CallerRunsPolicy:由提交任务的线程执行
new ThreadPoolExecutor.CallerRunsPolicy()

// 3. DiscardPolicy:直接丢弃任务
new ThreadPoolExecutor.DiscardPolicy()

// 4. DiscardOldestPolicy:丢弃队列中最老的任务,再尝试提交
new ThreadPoolExecutor.DiscardOldestPolicy()

// 5. 自定义拒绝策略
new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志
        log.warn("任务被拒绝: {}", r);

        // 尝试放入其他队列
        if (!fallbackQueue.offer(r)) {
            // 执行降级逻辑
            fallbackExecutor.execute(() -> {
                try {
                    r.run();
               ();
            }
        }
    }
}

4. 合理的线程池配置

public class ThreadPoolConfig {

    /**
     * CPU密集型线程池
     * 线程数 = CPU核心数 + 1
     */
    public static ExecutorService cpuIntensivePool() {
        int cores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cores,
            cores + 1,
            0L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    /**
     * IO密集型线程池
     * 线程数 = CPU核心数 * 2
     */
    public static ExecutorService ioIntensivePool() {
        int cores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cores * 2,
            cores * 4,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    /**
     * 混合型线程池
     * 根据任务类型动态调整
     */
    public static ExecutorService mixedPool() {
        int cores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cores,
            cores * 2,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

5. 线程池监控

public class ThreadPoolMonitor {

    public static void monitor(ThreadPoolExecutor executor) {
        // 当前线程数
        int poolSize = executor.getPoolSize();

        // 活跃线程数
        int activeCount = executor.getActiveCount();

        // 队列大小
        int queueSize = executor.getQueue().size();

        // 已完成任务数
        long completedTaskCount = executor.getCompletedTaskCount();

        // 拒绝任务数(需要自定义计数器)
        long rejectedTaskCount = getRejectedCount(executor);

        System.out.printf(
            "PoolSize=%d, Active=%d, Queue=%d, Completed=%d, Rejected=%d%n",
            poolSize, activeCount, queueSize, completedTaskCount, rejectedTaskCount
        );
    }

    // 使用Micrometer监控
    public static ThreadPoolExecutor createMonitoredPool(MeterRegistry registry) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
        Gauge.builder("threadpool.pool.size", executor, ThreadPoolExecutor::getPoolSize)
             .register(registry);
        Gauge.builder("threadpool.active.count", executor, ThreadPoolExecutor::getActiveCount)
             .register(registry);
        Gauge.builder("threadpool.queue.size", executor, e -> e.getQueue().size())
             .register(registry);
        return executor;
    }
}

6. ForkJoinPool(工作窃取)

public class ForkJoinExample {

    // 并行计算数组元素和
    public static long sum(long[] array) {
        ForkJoinPool pool = new ForkJoinPool();
        SumTask task = new SumTask(array, 0, array.length);
        return pool.invoke(task);
    }

    static class SumTask extends RecursiveTask<Long> {
        private final long[] array;
        private final int start;
        private final int end;
        private static final int THRESHOLD = 1000;

        public SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 任务足够小,直接计算
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }

            // 任务太大,拆分
            int mid = (start + end) >>> 1;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);

            left.fork();  // 异步执行左半部分
            long rightResult = right.compute();  // 同步执行右半部分
            long leftResult = left.join();  // 等待左半部分结果

            return leftResult + rightResult;
        }
    }
}

锁机制详解

1. synchronized关键字

// 对象锁
public class ObjectLock {
    public synchronized void method1() {
        // 实例方法锁,等价于 synchronized(this)
    }

    public void method2() {
        synchronized (this) {
            // 代码块锁
        }
    }

    private final Object lock = new Object();
    public void method3() {
        synchronized (lock) {
            // 使用独立对象锁,减少锁竞争
        }
    }
}

// 类锁
public class ClassLock {
    public static synchronized void staticMethod() {
        // 静态方法锁,等价于 synchronized(ClassLock.class)
    }

    public void method() {
        synchronized (ClassLock.class) {
            // 类级别锁
        }
    }
}

synchronized优化(JVM层次)

无锁 ──▶ 偏向锁 ──▶ 轻量级锁 ──▶ 重量级锁
  │          │           │           │
  ▼          ▼           ▼           ▼
  无竞争    一个线程    竞争但CAS    真正竞争
            持有时间短              持有时间长

-XX:+UseBiasedLocking      // 启用偏向锁(默认)
-XX:BiasedLockingStartupDelay=0  // 偏向锁启动延迟
-XX:AutoBoxCacheMax=20000  // 自动装箱缓存

2. ReentrantLock(可重入锁)

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    // 基本使用
    public void method() {
        lock.lock();
        try {
            // 临界区代码
        } finally {
            lock.unlock();  // 必须在finally中释放
        }
    }

    // 可中断的锁获取
    public void interruptibleMethod() throws InterruptedException {
        while (!lock.tryLock(100, TimeUnit.MILLISECONDS)) {
            // 处理等待逻辑
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
        }
        try {
            // 临界区
        } finally {
            lock.unlock();
        }
    }

    // 公平锁(按请求顺序获取)
    private final ReentrantLock fairLock = new ReentrantLock(true);

    // Condition实现等待/通知
    public void await() throws InterruptedException {
        lock.lock();
        try {
            while (!conditionMet()) {
                condition.await();  // 释放锁并等待
            }
            // 执行业务逻辑
        } finally {
            lock.unlock();
        }
    }

    public void signal() {
        lock.lock();
        try {
            updateCondition();
            condition.signal();  // 唤醒一个等待线程
            // 或 condition.signalAll();  // 唤醒所有
        } finally {
            lock.unlock();
        }
    }

    // 读写锁
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();

    public String read() {
        readLock.lock();
        try {
            return data;
        } finally {
            readLock.unlock();
        }
    }

    public void write(String newData) {
        writeLock.lock();
        try {
            this.data = newData;
        } finally {
            writeLock.unlock();
        }
    }
}

锁选择指南

场景 推荐锁 原因
简单同步 synchronized JVM自动优化,代码简洁
需要中断 ReentrantLock lockInterruptibly()
公平锁 ReentrantLock(true) synchronized不支持
多条件 ReentrantLock + Condition wait/notify只有一个条件
读多写少 ReentrantReadWriteLock 读写分离提升性能
高并发CAS StampedLock 乐观读,性能最优

3. StampedLock(读写锁的升级版)

public class StampedLockExample {
    private final StampedLock lock = new StampedLock();
    private double x, y;

    // 乐观读(完全无锁)
    public double distanceFromOrigin() {
        long stamp = lock.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!lock.validate(stamp)) {  // 验证戳是否有效
            stamp = lock.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                lock.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 悲观读
    public void move(double deltaX, double deltaY) {
        long stamp = lock.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            lock.unlockWrite(stamp);
        }
    }

    // 读锁升级为写锁
    public void moveIfAtOrigin(double newX, double newY) {
        long stamp = lock.readLock();
        try {
            while (x == 0 && y == 0) {
                long ws = lock.tryConvertToWriteLock(stamp);
                if (ws != 0L) {  // 升级成功
                    stamp = ws;
                    x = newX;
                    y = newY;
                    return;
                }
                // 升级失败,先释放读锁再获取写锁
                lock.unlockRead(stamp);
                stamp = lock.writeLock();
            }
        } finally {
            lock.unlock(stamp);
        }
    }
}

4. 锁优化技巧

// 1. 锁粒度细化
public class FineGrainedLock {
    private final Object lockA = new Object();
    private final Object lockB = new Object();
    private int countA = 0;
    private int countB = 0;

    public void incrementA() {
        synchronized (lockA) {  // 只锁A相关的操作
            countA++;
        }
    }

    public void incrementB() {
        synchronized (lockB) {  // 只锁B相关的操作
            countB++;
        }
    }
}

// 2. 锁分离
public class ReadWriteSeparation {
    private volatile String value;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    // 读操作用读锁
    public String getValue() {
        lock.readLock().lock();
        try {
            return value;
        } finally {
            lock.readLock().unlock();
        }
    }

    // 写操作用写锁
    public void setValue(String newValue) {
        lock.writeLock().lock();
        try {
            this.value = newValue;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

// 3. 锁消除(JVM自动)
// 局部变量不需要锁
public void lockElimination() {
    StringBuffer sb = new StringBuffer();  // 局部变量,JVM自动消除锁
    sb.append("a").append("b");
}

// 4. 锁粗化(JVM自动)
// 连续的锁操作合并为一个大锁
public synchronized void lockCoarsening() {
    method1();  // synchronized
    method2();  // synchronized
    // JVM会合并为一个锁
}

// 5. 减少锁持有时间
public class ReduceLockHoldTime {
    private final Lock lock = new ReentrantLock();

    public void process() {
        lock.lock();
        try {
            // 只在必要时持有锁
            updateSharedData();
        } finally {
            lock.unlock();
        }

        // 不需要锁的操作放在外面
        doIndependentWork();
    }
}

并发集合

1. ConcurrentHashMap详解

public class ConcurrentHashMapExample {

    // Java 8+ 实现原理
    // 1. Node数组 + 链表/红黑树
    // 2. 只锁一个槽位(分段锁的升级)
    // 3. CAS + synchronized

    private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    // 基本操作
    public void basicOps() {
        map.put("key1", 1);
        Integer value = map.get("key1");
        map.remove("key1");

        // 原子操作
        map.putIfAbsent("key", 1);         // 不存在才put
        map.computeIfAbsent("key", k -> 0);  // 不存在才计算
        map.computeIfPresent("key", (k, v) -> v + 1);  // 存在才计算
        map.compute("key", (k, v) -> v == null ? 1 : v + 1);  // 总是计算
        map.merge("key", 1, Integer::sum);  // 合并值
        map.replace("key", 1, 2);  // CAS更新
    }

    // 批量操作
    public void batchOps() {
        Map<String, Integer> updates = Map.of("a", 1, "b", 2, "c", 3);

        // 遍历
        map.forEach((k, v) -> System.out.println(k + "=" + v));

        // 搜索
        String result = map.search(1, (k, v) -> v > 10 ? k : null);

        // 归约
        Integer sum = map.reduceValues(1, Integer::sum);

        // 批量插入
        map.putAll(updates);
    }
}

ConcurrentHashMap演进

版本 实现方式 锁粒度
Java 7 Segment分段锁 16个段
Java 8+ Node数组 + CAS 单个Node

2. 其他并发集合

// 1. ConcurrentSkipListMap - 跳表实现,有序
ConcurrentSkipListMap<String, Integer> skipMap = new ConcurrentSkipListMap<>();
skipMap.put("c", 3);
skipMap.put("a", 1);
skipMap.put("b", 2);
// 自动排序: a=1, b=2, c=3

// 2. CopyOnWriteArrayList - 写时复制
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("a");  // 写操作复制数组
String first = list.get(0);  // 读操作无锁

// 适用场景:读远多于写
// 不适用:频繁写操作

// 3. ConcurrentLinkedQueue - 无界无锁队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();
queue.peek();

// 4. BlockingQueue - 阻塞队列
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(100);

// 生产者
blockingQueue.put("item");  // 队列满时阻塞

// 消费者
String item = blockingQueue.take();  // 队列空时阻塞
String item = blockingQueue.poll(1, TimeUnit.SECONDS);  // 超时等待

// 5. ArrayBlockingQueue - 有界阻塞队列
BlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(10);

// 6. PriorityBlockingQueue - 优先级队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(
    11, Comparator.comparingInt(Task::getPriority).reversed()
);

// 7. DelayQueue - 延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(5000, "task1"));  // 5秒后执行

// 8. SynchronousQueue - 同步队列(直接传递)
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 生产者put会阻塞,直到有消费者take

// 9. ConcurrentSkipListSet - 有序并发集合
ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<>();
set.add("c");
set.add("a");
set.add("b");
// 自动排序: a, b, c

// 10. ConcurrentHashSet(需要自己实现)
Set<String> concurrentSet = Collections.newSetFromMap(
    new ConcurrentHashMap<String, Boolean>()
);

无锁编程

1. Atomic类家族

// 1. 基本类型原子类
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet();    // i++
atomicInt.getAndIncrement();    // ++i
atomicInt.getAndAdd(10);        // i += 10
atomicInt.compareAndSet(0, 1);  // CAS操作

// 2. 引用类型原子类
AtomicReference<String> atomicRef = new AtomicReference<>("initial");
atomicRef.compareAndSet("initial", "updated");

// 3. 数组类型原子类
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
atomicArray.getAndIncrement(0);

// 4. 对象属性原子类
public class AtomicFieldExample {
    private volatile int value;
    private static final AtomicIntegerFieldUpdater<AtomicFieldExample>
        UPDATER = AtomicIntegerFieldUpdater.newUpdater(
            AtomicFieldExample.class, "value"
        );

    public void increment() {
        UPDATER.getAndIncrement(this);
    }
}

// 5. ABA问题及解决
// ABA问题:A -> B -> A,CAS认为没变化
AtomicStampedReference<String> stampedRef = new AtomicStampedReference<>("A", 0);
int stamp = stampedRef.getStamp();
boolean success = stampedRef.compareAndSet("A", "B", stamp, stamp + 1);

// 6. 累加器(LongAdder替代AtomicLong)
LongAdder adder = new LongAdder();
adder.increment();  // 高并发下性能更好
long sum = adder.sum();  // 获取值时合并

// 7. 累加器组
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
accumulator.accumulate(10);

2. CAS原理与实现

// CAS(Compare-And-Swap)底层实现
public class CAS {
    // 简化的CAS实现逻辑
    public final boolean compareAndSwap(Object obj, long offset,
                                       int expected, int update) {
        // 原子操作:如果obj的offset处值==expected,则设为update
        // 返回是否成功
        return unsafe.compareAndSwapInt(obj, offset, expected, update);
    }
}

// 自旋锁实现
public class SpinLock {
    private final AtomicReference<Thread> owner = new AtomicReference<>();

    public void lock() {
        Thread current = Thread.currentThread();
        // CAS自旋
        while (!owner.compareAndSet(null, current)) {
            // 可以加入yield或park避免CPU空转
            Thread.yield();
            // 或 LockSupport.parkNanos(1000);
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        owner.compareAndSet(current, null);
    }
}

3. 并发工具类

// 1. CountDownLatch - 倒计时门闩
public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);  // 等待3个任务

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    // 执行任务
                    doWork();
                } finally {
                    latch.countDown();  // 完成一个任务
                }
            }).start();
        }

        latch.await();  // 等待所有任务完成
        System.out.println("所有任务完成");
    }
}

// 2. CyclicBarrier - 循环栅栏
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程到达,开始下一轮");
        });

        for (int i = 0; i < parties; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    System.out.println("线程" + id + "到达");
                    barrier.await();  // 等待其他线程

                    // 执行并行任务
                    doParallelWork();

                    barrier.await();  // 第二次汇聚
                    System.out.println("线程" + id + "完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

// 3. Semaphore - 信号量
public class SemaphoreExample {
    private final Semaphore semaphore = new Semaphore(5);  // 最多5个并发

    public void accessResource() throws InterruptedException {
        semaphore.acquire();  // 获取许可
        try {
            // 访问受限资源
            doWork();
        } finally {
            semaphore.release();  // 释放许可
        }
    }

    // 超时获取
    public boolean accessWithTimeout() throws InterruptedException {
        if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
            try {
                doWork();
                return true;
            } finally {
                semaphore.release();
            }
        }
        return false;
    }
}

// 4. Exchanger - 交换数据
public Exchanger<String> exchanger = new Exchanger<>();

// 线程1
String data1 = "from-thread-1";
String received1 = exchanger.exchange(data1);

// 线程2
String data2 = "from-thread-2";
String received2 = exchanger.exchange(data2);  // 得到data1

// 5. Phaser - 阶段同步(CyclicBarrier升级版)
public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();

        // 动态注册参与方
        phaser.register();

        new Thread(() -> {
            int phase = phaser.arriveAndAwaitAdvance();  // 到达并等待
            System.out.println("阶段" + phase + "完成");
        }).start();
    }
}

// 6. CompletableFuture - 异步编程
public class CompletableFutureExample {
    public static void main(String[] args) {
        // 1. 创建异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "World";
        });

        // 2. 组合任务
        CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> {
            return a + " " + b;
        });

        // 3. 等待结果
        combined.thenAccept(result -> System.out.println(result));

        // 4. 异常处理
        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Error");
        }).exceptionally(ex -> {
            System.out.println("Error: " + ex.getMessage());
            return "default";
        });

        // 5. 超时控制
        try {
            String result = CompletableFuture.supplyAsync(() -> {
                Thread.sleep(5000);
                return "done";
            }).get(3, TimeUnit.SECONDS);  // 3秒超时
        } catch (TimeoutException e) {
            System.out.println("超时");
        }

        // 6. 批量任务
        List<CompletableFuture<String>> futures = List.of(
            CompletableFuture.supplyAsync(() -> "a"),
            CompletableFuture.supplyAsync(() -> "b"),
            CompletableFuture.supplyAsync(() -> "c")
        );

        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        allOf.join();  // 等待所有完成

        // 7. 任一完成
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
            futures.toArray(new CompletableFuture[0])
        );
        Object firstResult = anyOf.join();
    }
}

JMM内存模型

1. JMM核心概念

┌─────────────┐         ┌─────────────┐
│  Thread 1   │         │  Thread 2   │
│             │         │             │
│ ┌─────────┐ │         │ ┌─────────┐ │
│ │ Local   │ │         │ │ Local   │ │
│ │ Memory  │ │         │ │ Memory  │ │
│ └────┬────┘ │         │ └────┬────┘ │
└──────┼──────┘         └──────┼──────┘
       │                        │
  Read/Write              Read/Write
       │                        │
       ▼                        ▼
┌─────────────────────────────────────┐
│          Main Memory (堆)           │
│                                     │
│  ┌──────┐  ┌──────┐  ┌──────┐      │
│  │ 共享 │  │ 共享 │  │ 共享 │      │
│  │ 变量 │  │ 变量 │  │ 变量 │      │
│  └──────┘  └──────┘  └──────┘      │
└─────────────────────────────────────┘

2. happens-before规则

/**
 * happens-before规则:
 * 1. 程序顺序规则:单线程内,代码顺序在前的happens-before在后的
 * 2. 监视器锁规则:unlock happens before 同一个锁的lock
 * 3. volatile规则:volatile写 happens before volatile读
 * 4. 线程启动规则:Thread.start() happens before 子线程第一条指令
 * 5. 线程终止规则:子线程结束 happens before 父线程Thread.join()
 * 6. 线程中断规则:Thread.interrupt() happens before 检测到中断
 * 7. 对象终结规则:构造函数结束 happens before finalize()
 * 8. 传递性:A happens before B, B happens before C => A happens before C
 */

// 示例1:volatile保证可见性
public class VolatileVisibility {
    private volatile boolean flag = false;

    public void writer() {
        flag = true;  // volatile写
    }

    public void reader() {
        while (!flag) {  // volatile读,能保证看到writer的修改
            // ...
        }
    }
}

// 示例2:final域保证可见性
public class FinalFieldExample {
    private final int x;
    private final int y;

    public FinalFieldExample(int x, int y) {
        this.x = x;  // final域必须在构造函数中初始化
        this.y = y;
    }

    // 引用对象:final引用必须保证对象不逃逸
    private final Reference ref;
    private final int value;

    public FinalFieldExample() {
        this.ref = new Reference();
        this.value = 42;  // 不可见性危险!
        // ref和value的初始化可能重排序
    }

    // 正确做法:不要在构造函数中让this引用逃逸
    public FinalFieldExample safe() {
        FinalFieldExample instance = new FinalFieldExample();
        safePublish(instance);
        return instance;
    }
}

3. 内存屏障

// JVM会插入内存屏障来禁止重排序

// 1. StoreStore屏障:禁止store1重排序到store2
volatile x = 1;
// StoreStore屏障
y = 2;

// 2. StoreLoad屏障:禁止store重排序到load
volatile x = 1;
// StoreLoad屏障(最昂贵)
int a = y;

// 3. LoadLoad屏障:禁止load1重排序到load2
int a = x;
// LoadLoad屏障
int b = y;

// 4. LoadStore屏障:禁止load重排序到store
int a = x;
// LoadStore屏障
y = 1;

4. 双重检查锁(DCL)

// 正确的DCL实现
public class Singleton {
    private static volatile Singleton instance;  // 必须用volatile

    private Singleton() {}

    public static Singleton getInstance() {
        if (instance == null) {  // 第一次检查
            synchronized (Singleton.class) {
                if (instance == null) {  // 第二次检查
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }

    /**
     * 为什么需要volatile?
     * 1. new Singleton()分为三步:
     *    a. 分配内存
     *    b. 初始化对象
     *    c. 引用指向内存
     * 2. 可能重排序为: a -> c -> b
     * 3. 线程1执行a->c,线程2看到instance!=null,拿到未初始化对象
     * 4. volatile禁止重排序,保证b在c之前执行
     */
}

// Java 5+可以使用内部类Holder(推荐)
public class Singleton {
    private Singleton() {}

    private static class Holder {
        static final Singleton INSTANCE = new Singleton();
    }

    public static Singleton getInstance() {
        return Holder.INSTANCE;
    }
}

异步编程

1. CompletableFuture深度使用

public class AsyncProgramming {

    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    // 1. 链式调用
    public CompletableFuture<String> chainExample() {
        return CompletableFuture.supplyAsync(() -> fetchUser(), executor)
            .thenApply(user -> transformUser(user))      // 同步转换
            .thenCompose(user -> fetchOrders(user))       // 异步转换
            .thenApply(orders -> formatResult(orders))
            .whenComplete((result, ex) -> {               // 完成回调
                if (ex != null) {
                    log.error("Error", ex);
                } else {
                    log.info("Success: {}", result);
                }
            });
    }

    // 2. 并行执行
    public CompletableFuture<Result> parallelExample() {
        CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA());
        CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB());
        CompletableFuture<C> futureC = CompletableFuture.supplyAsync(() -> doC());

        return CompletableFuture.allOf(futureA, futureB, futureC)
            .thenApply(v -> combine(
                futureA.join(),
                futureB.join(),
                futureC.join()
            ));
    }

    // 3. 超时控制
    public CompletableFuture<String> withTimeout(CompletableFuture<String> future,
                                                  long timeout, TimeUnit unit) {
        CompletableFuture<String> result = new CompletableFuture<>();

        future.whenComplete((value, ex) -> {
            if (ex != null) {
                result.completeExceptionally(ex);
            } else {
                result.complete(value);
            }
        });

        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.schedule(() -> {
            result.completeExceptionally(new TimeoutException());
        }, timeout, unit);

        return result;
    }

    // 4. 重试机制
    public CompletableFuture<String> withRetry(Supplier<String> task,
                                               int maxRetries) {
        CompletableFuture<String> future = new CompletableFuture<>();
        retry(task, 0, maxRetries, future);
        return future;
    }

    private void retry(Supplier<String> task, int attempt, int maxRetries,
                      CompletableFuture<String> future) {
        CompletableFuture.supplyAsync(task)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    future.complete(result);
                } else if (attempt < maxRetries) {
                    retry(task, attempt + 1, maxRetries, future);
                } else {
                    future.completeExceptionally(ex);
                }
            });
    }

    // 5. 批量处理
    public CompletableFuture<Void> batchProcess(List<String> items,
                                                  Function<String, String> processor) {
        List<CompletableFuture<String>> futures = items.stream()
            .map(item -> CompletableFuture.supplyAsync(() -> processor.apply(item), executor))
            .collect(Collectors.toList());

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    // 6. 节流(限流)
    public CompletableFuture<String> throttledExecute(Runnable task,
                                                       RateLimiter limiter) {
        if (limiter.tryAcquire()) {
            return CompletableFuture.runAsync(task, executor)
                .thenApply(v -> "success");
        } else {
            return CompletableFuture.failedFuture(
                new RateLimitExceededException()
            );
        }
    }

    // 7. 断路器
    public CompletableFuture<String> circuitBreakerExecute(
            Supplier<String> supplier, CircuitBreaker cb) {
        if (cb.isOpen()) {
            return CompletableFuture.failedFuture(
                new CircuitBreakerOpenException()
            );
        }

        return CompletableFuture.supplyAsync(supplier, executor)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    cb.recordFailure();
                } else {
                    cb.recordSuccess();
                }
            });
    }
}

2. 响应式编程(Reactor)

// 使用Project Reactor
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveExample {

    // 1. 创建Mono
    Mono<String> mono = Mono.just("Hello");
    Mono<String> empty = Mono.empty();
    Mono<String> error = Mono.error(new RuntimeException());

    // 2. 创建Flux
    Flux<String> flux = Flux.just("a", "b", "c");
    Flux<Integer> range = Flux.range(1, 10);
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

    // 3. 操作符
    Flux<String> processed = flux
        .filter(s -> s.startsWith("a"))  // 过滤
        .map(String::toUpperCase)        // 转换
        .flatMap(this::asyncOperation)     // 异步转换
        .distinct()                        // 去重
        .take(5)                           // 限制数量
        .timeout(Duration.ofSeconds(5));   // 超时

    // 4. 订阅
    processed.subscribe(
        value -> System.out.println(value),  // onNext
        error -> error.printStackTrace(),    // onError
        () -> System.out.println("Done")    // onComplete
    );

    // 5. 背压
    Flux<Integer> source = Flux.range(1, 1000);
    source.onBackpressureBuffer(100)  // 缓冲100个
          .onBackpressureDrop()       // 丢弃多余的
          .onBackpressureLatest()     // 只保留最新的
          .subscribe(value -> process(value));

    // 6. 调度器(线程切换)
    Flux.just(1, 2, 3)
        .subscribeOn(Schedulers.parallel())  // 订阅在并行线程
        .publishOn(Schedulers.single())      // 发布在单线程
        .subscribe(System.out::println);
}

性能优化

1. JVM调优

# JVM参数参考

# 1. 内存设置
-Xms4g                          # 初始堆大小
-Xmx4g                          # 最大堆大小(建议Xms=Xmx)
-Xmn1g                          # 新生代大小
-XX:NewRatio=2                  # 新生代/老年代比例
-XX:SurvivorRatio=8             # Eden/Survivor比例

# 2. GC选择
-XX:+UseG1GC                    # 使用G1GC(推荐)
-XX:MaxGCPauseMillis=200        # 最大GC停顿时间
-XX:G1HeapRegionSize=16m        # G1 Region大小

# 或使用ZGC(Java 15+)
-XX:+UseZGC
# 或使用ShenandoahGC
-XX:+UseShenandoahGC

# 3. 线程相关
-XX:+UseBiasedLocking           # 偏向锁(默认)
-XX:AutoBoxCacheMax=20000        # Integer缓存范围

# 4. JIT编译
-XX:+TieredCompilation           # 分层编译(默认)
-XX:CompileThreshold=1000        # C1编译阈值
-XX:CICompilerCount=4            # 编译线程数

# 5. 其他优化
-XX:+UseStringDeduplication      # 字符串去重
-XX:+UseCompressedOops           # 压缩指针(64位)
-XX:+UseCompressedClassPointers  # 压缩类指针
-Djava.awt.headless=true         # 无头模式

2. 性能分析工具

# 1. Jprofiler
java -agentpath:/path/libjprofilerti.so=port=8849 -jar app.jar

# 2. VisualVM
jvisualvm

# 3. Arthas(阿里开源,推荐)
# 下载
wget https://arthas.aliyun.com/arthas-boot.jar
# 启动
java -jar arthas-boot.jar
# 常用命令
trace com.example.Service method      # 追踪方法
watch com.example.Service method returnObj  # 监控返回值
thread                               # 查看线程
dump                                 # dump堆
sysprop                              # 查看系统属性

# 4. JFR(JDK自带)
# 开始录制
jcmd <pid> JFR.start name=recording duration=60s filename=recording.jfr
# 分析
jfr print --events gc.heap.summary recording.jfr

# 5. Async-profiler(火焰图)
# Linux下
./profiler.sh -d 30 -f flame.svg <pid>

3. 性能优化清单

public class PerformanceOptimization {

    // 1. 避免频繁创建对象
    public class ObjectPool {
        private final Queue<MyObject> pool = new ConcurrentLinkedQueue<>();

        public MyObject borrow() {
            MyObject obj = pool.poll();
            return obj != null ? obj : new MyObject();
        }

        public void returnObject(MyObject obj) {
            obj.reset();  // 重置对象状态
            pool.offer(obj);
        }
    }

    // 2. 使用基本类型替代包装类
    // Bad
    List<Integer> list = new ArrayList<>();
    // Good
    int[] array = new int[100];

    // 3. 预分配集合大小
    // Bad
    List<String> list = new ArrayList<>();  // 默认10
    // Good
    List<String> list = new ArrayList<>(1000);

    // 4. 使用StringBuilder拼接字符串
    // Bad
    String result = "";
    for (String s : strings) {
        result += s;  // 每次创建新对象
    }
    // Good
    StringBuilder sb = new StringBuilder();
    for (String s : strings) {
        sb.append(s);
    }
    String result = sb.toString();

    // 5. 减少方法调用
    // Bad
    public int sum(int[] arr) {
        int sum = 0;
        for (int i = 0; i < arr.length; i++) {
            sum += arr[i];
        }
        return sum;
    }
    // Good(循环展开)
    public int sumOptimized(int[] arr) {
        int sum = 0;
        int i = 0;
        for (; i <= arr.length - 4; i += 4) {
            sum += arr[i] + arr[i + 1] + arr[i + 2] + arr[i + 3];
        }
        for (; i < arr.length; i++) {
            sum += arr[i];
        }
        return sum;
    }

    // 6. 使用缓存
    private static final Map<String, Integer> CACHE = new ConcurrentHashMap<>();

    public Integer compute(String key) {
        return CACHE.computeIfAbsent(key, this::expensiveComputation);
    }

    // 7. 批量操作
    // Bad
    for (String id : ids) {
        db.update(id);  // N次IO
    }
    // Good
    db.batchUpdate(ids);  // 1次IO

    // 8. 使用无锁数据结构
    // Bad
    private int counter;
    public void increment() {
        synchronized (this) {
            counter++;
        }
    }
    // Good
    private AtomicInteger counter = new AtomicInteger();
    public void increment() {
        counter.incrementAndGet();
    }

    // 9. 异步非阻塞
    // Bad
    public void process() {
        String result1 = remoteCall1();  // 阻塞
        String result2 = remoteCall2();  // 阻塞
        process(result1, result2);
    }
    // Good
    public CompletableFuture<Void> processAsync() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::remoteCall1);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::remoteCall2);
        return CompletableFuture.allOf(f1, f2)
            .thenRun(() -> process(f1.join(), f2.join()));
    }

    // 10. 连接池
    // Bad
    public void doQuery() {
        Connection conn = DriverManager.getConnection(url);  // 每次新建
        // ...
        conn.close();
    }
    // Good
    private final DataSource dataSource = createDataSource();

    public void doQuery() {
        try (Connection conn = dataSource.getConnection()) {
            // ...
        }
    }
}

分布式高并发

1. 分布式锁

// Redis分布式锁(Redisson)
import org.redisson.Redisson;
import org.redisson.api.RLock;

public class DistributedLockExample {
    private final RedissonClient redisson = Redisson.create(config);

    public void doWithLock() {
        RLock lock = redisson.getLock("myLock");

        try {
            // 尝试获取锁,最多等待10秒,锁30秒后自动释放
            boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);

            if (acquired) {
                try {
                    // 执行业务逻辑
                    doWork();
                } finally {
                    lock.unlock();
                }
            } else {
                // 获取锁失败
                handleFailure();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // 读写锁
    public void readWriteLock() {
        RReadWriteLock rwLock = redisson.getReadWriteLock("rwLock");
        RLock rLock = rwLock.readLock();
        RLock wLock = rwLock.writeLock();

        // 读锁
        rLock.lock();
        try {
            // 读操作
        } finally {
            rLock.unlock();
        }

        // 写锁
        wLock.lock();
        try {
            // 写操作
        } finally {
            wLock.unlock();
        }
    }
}

2. 分布式缓存

// Spring Cache + Redis
@Service
public class CacheService {

    @Cacheable(value = "user", key = "#id")
    public User getUser(Long id) {
        return userRepository.findById(id);
    }

    @CachePut(value = "user", key = "#user.id")
    public User updateUser(User user) {
        return userRepository.save(user);
    }

    @CacheEvict(value = "user", key = "#id")
    public void deleteUser(Long id) {
        userRepository.deleteById(id);
    }

    // 批量清除
    @CacheEvict(value = "user", allEntries = true)
    public void clearCache() {}
}

// Redis配置
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        // Key序列化
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());

        // Value序列化
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        template.setValueSerializer(serializer);
        template.setHashValueSerializer(serializer);

        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(30))  // 30分钟过期
            .serializeKeysWith(RedisSerializationContext.SerializationPair
                .fromSerializer(new StringRedisSerializer()))
            .serializeValuesWith(RedisSerializationContext.SerializationPair
                .fromSerializer(new GenericJackson2JsonRedisSerializer()));

        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
    }
}

3. 消息队列

// Kafka生产者
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String key, String message) {
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(
            topic, key, message
        );

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("发送失败", ex);
            } else {
                log.info("发送成功: offset={}", result.getRecordMetadata().offset());
            }
        });
    }

    // 批量发送
    public void sendBatch(String topic, List<String> messages) {
        for (String message : messages) {
            kafkaTemplate.send(topic, message);
        }
        kafkaTemplate.flush();
    }
}

// Kafka消费者
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consume(String message) {
        try {
            processMessage(message);
        } catch (Exception e) {
            log.error("处理失败", e);
            throw e;  // 抛出异常让Kafka重试
        }
    }

    // 批量消费
    @KafkaListener(
        topics = "my-topic",
        groupId = "my-group",
        batch = "true"
    )
    public void consumeBatch(List<String> messages) {
        for (String message : messages) {
            processMessage(message);
        }
    }
}

4. 限流

// Guava RateLimiter
public class RateLimiterExample {
    private final RateLimiter rateLimiter = RateLimiter.create(100.0);  // 100 QPS

    public void execute() {
        if (rateLimiter.tryAcquire()) {
            doWork();
        } else {
            handleRateLimit();
        }
    }
}

// 滑动窗口限流
public class SlidingWindowRateLimiter {
    private final ConcurrentNavigableMap<Long, AtomicInteger> window = new ConcurrentSkipListMap<>();
    private final int maxRequests;
    private final long windowSizeMillis;

    public SlidingWindowRateLimiter(int maxRequests, long windowSizeMillis) {
        this.maxRequests = maxRequests;
        this.windowSizeMillis = windowSizeMillis;
    }

    public boolean tryAcquire() {
        long now = System.currentTimeMillis();
        long start = now - windowSizeMillis;

        // 清理过期数据
        window.headMap(start, false).clear();

        // 检查是否超限
        if (window.size() >= maxRequests) {
            return false;
        }

        // 记录当前请求
        window.compute(now, (k, v) -> v == null ? new AtomicInteger(1) : new AtomicInteger(v.incrementAndGet()));
        return true;
    }
}

// Redis + Lua实现分布式限流
public class RedisRateLimiter {
    private final StringRedisTemplate redisTemplate;
    private static final String LUA_SCRIPT =
        "local key = KEYS[1]\n" +
        "local limit = tonumber(ARGV[1])\n" +
        "local expire = tonumber(ARGV[2])\n" +
        "local current = redis.call('incr', key)\n" +
        "if current == 1 then\n" +
        "    redis.call('expire', key, expire)\n" +
        "end\n" +
        "if current > limit then\n" +
        "    return 0\n" +
        "else\n" +
        "    return 1\n" +
        "end";

    public boolean tryAcquire(String key, int limit, long expireSeconds) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
        Long result = redisTemplate.execute(script,
            Collections.singletonList(key),
            String.valueOf(limit),
            String.valueOf(expireSeconds)
        );
        return result != null && result == 1;
    }
}

实战案例

案例1:高并发计数器

public class HighPerformanceCounter {
    // 方案1:LongAdder(高并发写场景)
    private final LongAdder counter = new LongAdder();

    public void increment() {
        counter.increment();
    }

    public long get() {
        return counter.sum();
    }

    // 方案2:分段计数(避免热点竞争)
    private final int segments = 16;
    private final LongAdder[] counters = new LongAdder[segments];

    public SegmentedCounter() {
        for (int i = 0; i < segments; i++) {
            counters[i] = new LongAdder();
        }
    }

    public void increment() {
        int index = Thread.currentThread().hashCode() & (segments - 1);
        counters[index].increment();
    }

    public long get() {
        long sum = 0;
        for (LongAdder counter : counters) {
            sum += counter.sum();
        }
        return sum;
    }

    // 方案3:Redis计数器(分布式场景)
    public class RedisCounter {
        private final StringRedisTemplate redisTemplate;

        public long increment(String key) {
            return redisTemplate.opsForValue().increment(key);
        }

        public long get(String key) {
            String value = redisTemplate.opsForValue().get(key);
            return value != null ? Long.parseLong(value) : 0;
        }
    }
}

案例2:高并发缓存

public class HighPerformanceCache<K, V> {
    private final ConcurrentHashMap<K, CompletableFuture<V>> cache = new ConcurrentHashMap<>();
    private final Function<K, V> loader;
    private final Duration expireAfter;

    public HighPerformanceCache(Function<K, V> loader, Duration expireAfter) {
        this.loader = loader;
        this.expireAfter = expireAfter;
    }

    public V get(K key) {
        CompletableFuture<V> future = cache.get(key);

        if (future == null) {
            CompletableFuture<V> newFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    return loader.apply(key);
                } catch (Exception e) {
                    cache.remove(key, newFuture);
                    throw e;
                }
            });

            future = cache.putIfAbsent(key, newFuture);
            if (future == null) {
                future = newFuture;
            }
        }

        try {
            return future.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        }
    }

    // 使用示例
    public static void main(String[] args) {
        HighPerformanceCache<String, User> userCache = new HighPerformanceCache<>(
            userId -> userRepository.findById(userId),  // 加载函数
            Duration.ofMinutes(10)  // 过期时间
        );

        User user = userCache.get("user-123");
    }
}

案例3:高并发限流器

public class AdvancedRateLimiter {
    private final int qps;
    private final Semaphore semaphore;
    private final AtomicInteger currentRequests = new AtomicInteger(0);
    private final ScheduledExecutorService scheduler;

    public AdvancedRateLimiter(int qps) {
        this.qps = qps;
        this.semaphore = new Semaphore(qps);
        this.scheduler = Executors.newScheduledThreadPool(1);

        // scheduler.scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.SECONDS);
    }

    public boolean tryAcquire() {
        if (semaphore.tryAcquire()) {
            currentRequests.incrementAndGet();
            return true;
        }
        return false;
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
        currentRequests.incrementAndGet();
    }

    public void release() {
        semaphore.release();
        currentRequests.decrementAndGet();
    }

    public int getCurrentRequests() {
        return currentRequests.get();
    }

    // Token Bucket算法
    public static class TokenBucket {
        private final long capacity;
        private final long refillRate;  // tokens per second
        private long tokens;
        private long lastRefillTime;

        public TokenBucket(long capacity, long refillRate) {
            this.capacity = capacity;
            this.refillRate = refillRate;
            this.tokens = capacity;
            this.lastRefillTime = System.currentTimeMillis();
        }

        public synchronized boolean tryConsume(int tokens) {
            refill();
            if (this.tokens >= tokens) {
                this.tokens -= tokens;
                return true;
            }
            return false;
        }

        private void refill() {
            long now = System.currentTimeMillis();
            long elapsed = (now - lastRefillTime) / 1000;
            if (elapsed > 0) {
                long newTokens = Math.min(capacity, this.tokens + elapsed * refillRate);
                this.tokens = newTokens;
                lastRefillTime = now;
            }
        }
    }
}

案例4:高并发秒杀

@Service
public class SeckillService {
    private final RedissonClient redisson;
    private final StringRedisTemplate redisTemplate;
    private final OrderService orderService;

    /**
     * 秒杀流程:
     * 1. 预热:将库存加载到Redis
     * 2. 校验:检查用户是否已购买、活动是否有效
     * 3. 扣减:Redis原子扣减库存
     * 4. 下单:异步创建订单
     * 5. 回滚:如果下单失败,恢复库存
     */
    @Transactional
    public SeckillResult seckill(Long userId, Long productId) {
        String stockKey = "seckill:stock:" + productId;
        String userKey = "seckill:user:" + userId + ":" + productId;
        String lockKey = "seckill:lock:" + productId;

        // 1. 检查用户是否已购买
        if (Boolean.TRUE.equals(redisTemplate.hasKey(userKey))) {
            return SeckillResult.failure("已购买");
        }

        // 2. 分布式锁
        RLock lock = redisson.getLock(lockKey);
        boolean locked = false;
        try {
            locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
            if (!locked) {
                return SeckillResult.failure("系统繁忙");
            }

            // 3. 扣减库存
            Long stock = redisTemplate.opsForValue().decrement(stockKey);
            if (stock == null || stock < 0) {
                // 恢复库存
                redisTemplate.opsForValue().increment(stockKey);
                return SeckillResult.failure("库存不足");
            }

            // 4. 标记用户已购买
            redisTemplate.opsForValue().set(userKey, "1", Duration.ofDays(1));

            // 5. 异步下单
            CompletableFuture.runAsync(() -> {
                try {
                    orderService.createOrder(userId, productId);
                } catch (Exception e) {
                    log.error("下单失败,回滚库存", e);
                    redisTemplate.opsForValue().increment(stockKey);
                    redisTemplate.delete(userKey);
                }
            });

            return SeckillResult.success();

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return SeckillResult.failure("系统繁忙");
        } finally {
            if (locked) {
                lock.unlock();
            }
        }
    }

    /**
     * 预热库存
     */
    public void warmUp(Long productId, int stock) {
        String stockKey = "seckill:stock:" + productId;
        redisTemplate.opsForValue().set(stockKey, String.valueOf(stock));
    }

    /**
     * 本地库存缓存 + Redis最终一致性
     */
    public static class LocalCacheSeckillService {
        private final Map<Long, AtomicInteger> localStock = new ConcurrentHashMap<>();
        private final StringRedisTemplate redisTemplate;
        private final AsyncStockUpdater asyncUpdater;

        public SeckillResult seckillWithLocalCache(Long productId) {
            AtomicInteger stock = localStock.get(productId);

            // 先扣本地库存
            if (stock == null || stock.decrementAndGet() < 0) {
                return SeckillResult.failure("库存不足");
            }

            try {
                // 异步更新Redis
                asyncUpdater.decrementAsync(productId);

                // 创建订单
                // ...

                return SeckillResult.success();

            } catch (Exception e) {
                // 失败恢复本地库存
                stock.incrementAndGet();
                return SeckillResult.failure("系统繁忙");
            }
        }
    }
}

面试题库

基础题

Q1: 线程和进程的区别?

  • 进程是资源分配的基本单位,线程是CPU调度的基本单位
  • 进程间独立,线程共享进程资源
  • 进程间通信复杂,线程间通信简单
  • 线程上下文切换开销小

Q2: synchronized和ReentrantLock的区别?

特性 synchronized ReentrantLock
实现 JVM层面 API层面
可中断 ×
公平锁 ×
多条件 ×
手动释放 自动 必须手动

Q3: volatile的作用?

  • 保证可见性:写操作立即刷新到主内存
  • 保证有序性:禁止指令重排序
  • 不保证原子性:复合操作不安全

Q4: 线程池参数含义?

  • corePoolSize:核心线程数(常驻)
  • maximumPoolSize:最大线程数
  • keepAliveTime:空闲线程存活时间
  • workQueue:工作队列
  • threadFactory:线程工厂
  • handler:拒绝策略

Q5: HashMap和ConcurrentHashMap的区别?

  • HashMap非线程安全,ConcurrentHashMap线程安全
  • Java 7:ConcurrentHashMap分段锁(16段)
  • Java 8+:ConcurrentHashMap使用CAS+synchronized,锁粒度到Node级别

进阶题

Q6: AQS原理?

AQS(AbstractQueuedSynchronizer)核心:
1. state变量(volatile int):表示同步状态
2. CLH队列:等待线程队列
3. 独占/共享模式
4. acquire/release:获取/释放状态

模板方法模式:
- tryAcquire:尝试获取(子类实现)
- addWaiter:加入等待队列
- acquireQueued:自旋等待

Q7: 线程池工作流程?

  1. 提交任务
  2. 核心线程数未满 → 创建核心线程
  3. 核心线程数已满,队列未满 → 放入队列
  4. 队列已满,最大线程数未满 → 创建非核心线程
  5. 最大线程数已满 → 执行拒绝策略

Q8: CAS和ABA问题?

  • CAS(Compare-And-Swap):原子比较并交换
  • ABA问题:A→B→A,CAS认为没变化
  • 解决:加版本号(AtomicStampedReference)

Q9: Java内存模型happens-before规则?

  1. 程序顺序规则
  2. 监视器锁规则
  3. volatile规则
  4. 线程启动/终止规则
  5. 传递性

Q10: ThreadLocal原理?

  • 每个线程维护ThreadLocalMap
  • ThreadLocalMap的key是ThreadLocal(弱引用)
  • value是存储值(强引用)
  • 内存泄漏:key为null时value无法回收

高频题

Q11: 如何实现一个线程安全的单例?

// 方式1:双重检查锁
public class Singleton {
    private static volatile Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

// 方式2:静态内部类(推荐)
public class Singleton {
    private Singleton() {}

    private static class Holder {
        static final Singleton INSTANCE = new Singleton();
    }

    public static Singleton getInstance() {
        return Holder.INSTANCE;
    }
}

// 方式3:枚举(最佳)
public enum Singleton {
    INSTANCE;

    public void doWork() {}
}

Q12: 高并发场景如何优化?

  1. 缓存:多级缓存(本地+Redis)
  2. 异步:消息队列、CompletableFuture
  3. 无锁:CAS、Atomic类
  4. 连接池:数据库、HTTP、Redis
  5. 限流:令牌桶、滑动窗口
  6. 降级:熔断、兜底数据
Q13: 分布式事务解决方案? 方案 优点 缺点
2PC 强一致性 性能差,单点故障
TCC 性能好 实现复杂
Saga 灵活 补偿逻辑复杂
Seata 使用简单 性能相对低

Q14: 如何解决热点Key问题?

  1. 缓存分片:均匀分散到不同Redis实例
  2. 本地缓存:减轻Redis压力
  3. 互斥锁:防止缓存击穿
  4. 布隆过滤器:快速判断是否存在

Q15: 消息队列如何保证消息不丢失?
生产者:

  • 使用confirm机制
  • 消息持久化

Broker:

  • 消息持久化
  • 多副本同步

消费者:

  • 手动ACK
  • 死信队列

学习路线

第一阶段:基础(2周)

┌─────────────────────────────────────────┐
│ 第1周:并发基础                          │
├─────────────────────────────────────────┤
│ □ 线程创建与生命周期                    │
│ □ wait/notify/notifyAll                 │
│ □ synchronized关键字                    │
│ □ volatile关键字                        │
│ □ ThreadLocal                           │
└─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────┐
│ 第2周:并发工具                          │
├─────────────────────────────────────────┤
│ □ AtomicInteger等Atomic类               │
│ □ CountDownLatch                        │
│ □ CyclicBarrier                         │
│ □ Semaphore                             │
│ □ Exchanger                             │
└─────────────────────────────────────────┘

第二阶段:进阶(3周)

┌─────────────────────────────────────────┐
│ 第3-4周:锁与并发集合                    │
├─────────────────────────────────────────┤
│ □ ReentrantLock详细使用                 │
│ □ ReadWriteLock                         │
│ □ StampedLock                           │
│ □ ConcurrentHashMap                     │
│ □ ConcurrentLinkedQueue                 │
│ □ CopyOnWriteArrayList                  │
└─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────┐
│ 第5周:线程池与异步                      │
├─────────────────────────────────────────┤
│ □ ThreadPoolExecutor参数详解            │
│ □ ForkJoinPool                         │
│ □ CompletableFuture                     │
│ □ JMM内存模型                           │
└─────────────────────────────────────────┘

第三阶段:高级(4周)

┌─────────────────────────────────────────┐
│ 第6-7周:分布式并发                      │
├─────────────────────────────────────────┤
│ □ Redis分布式锁                         │
│ □ Redis缓存                             │
│ □ 消息队列(Kafka/RabbitMQ)            │
│ □ 分布式限流                            │
│ □ 分布式事务                            │
└─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────┐
│ 第8-9周:性能优化                        │
├─────────────────────────────────────────┤
│ □ JVM调优                               │
│ □ 性能分析工具                          │
│ □ 无锁编程                              │
│ □ 响应式编程                            │
│ □ 高并发架构设计                        │
└─────────────────────────────────────────┘

第四阶段:实战(持续)

┌─────────────────────────────────────────┐
│ 实践项目                                │
├─────────────────────────────────────────┤
│ □ 实现一个高性能计数器                 │
│ □ 实现一个分布式缓存                   │
│ □ 实现一个限流器                        │
│ □ 实现一个秒杀系统                      │
│ □ 阅读开源源码(Netty、Disruptor等)   │
└─────────────────────────────────────────┘

推荐资源

书籍

  1. 《Java并发编程实战》- 必读
  2. 《Java并发编程的艺术》
  3. 《深入理解Java虚拟机》
  4. 《凤凰架构:构建可靠的大型分布式系统》

文档

  1. Java并发官方文档
  2. Reactor官方文档

源码

  1. Doug Lea的并发库
  2. Netty
  3. Disruptor
  4. Redisson

工具

  1. JConsole / VisualVM
  2. Arthas
  3. JProfiler
  4. Async-profiler

文档版本: 1.0
最后更新: 2026-01-19

close
arrow_upward