Java的线程是Java并发编程的核心,允许程序同时执行多个任务。 [虚拟线程相关内容]

1. 线程概述

线程(Thread)是操作系统能够进行运算调度的最小单位。在Java中,线程是JVM(Java虚拟机)管理的轻量级执行单元,多个线程共享同一进程的内存空间(如堆、方法区),但每个线程有自己的栈空间(如局部变量和方法调用栈)。

Java通过java.lang.Thread类和java.lang.Runnable接口提供线程支持。Java的线程模型基于操作系统的原生线程,依赖于底层操作系统的线程实现。

2. 线程的创建

Java中创建线程主要有以下几种方式:

(1) 继承Thread

通过继承Thread类并重写run()方法来定义线程的执行逻辑。

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread running: " + Thread.currentThread().getName());
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start(); // 启动线程
    }
}
  • 特点: 简单,但不推荐,因为Java是单继承的,继承Thread会限制类的扩展性。

(2) 实现Runnable接口

通过实现Runnable接口并将其传递给Thread对象。

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Thread running: " + Thread.currentThread().getName());
    }
}

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
    }
}
  • 优点: 更灵活,推荐使用,因为它允许类继承其他类,且便于资源共享。

(3) 使用CallableFuture

Callable接口允许线程返回结果或抛出异常,通常与ExecutorService结合使用。

import java.util.concurrent.*;

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Task completed by " + Thread.currentThread().getName();
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<String> future = executor.submit(new MyCallable());
        System.out.println(future.get()); // 阻塞获取结果
        executor.shutdown();
    }
}
  • 特点: 适合需要返回值或异常处理的场景,依赖线程池管理。

(4) 使用线程池(Executor Framework)

Java的java.util.concurrent包提供了线程池机制,避免频繁创建和销毁线程的开销。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> System.out.println("Task in thread pool"));
        executor.shutdown();
    }
}
  • 常用线程池:
    • Executors.newFixedThreadPool(int nThreads)

    • Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

      • 作用:固定大小的线程池,核心线程数=最大线程数=nThreads,任务进入无界队列(LinkedBlockingQueue) 排队。
      • 参数:
        • nThreads:工作线程的固定数量。过小吞吐不足,过大将增加上下文切换与内存占用。
        • threadFactory:可选,自定义线程名称、是否为守护线程、异常处理器等,便于排障与治理。
      • 返回类型:ExecutorService(实为 ThreadPoolExecutor)。
      • 注意:使用无界队列,峰值流量可能导致任务无限堆积;生产建议改用自定义 ThreadPoolExecutor + 有界队列以形成背压。
    • Executors.newCachedThreadPool()

    • Executors.newCachedThreadPool(ThreadFactory threadFactory)

      • 作用:弹性伸缩的线程池。核心线程数=0,最大线程数=Integer.MAX_VALUE,空闲线程默认 60s 回收,使用 SynchronousQueue 直接移交任务。
      • 参数:
        • threadFactory:可选,自定义线程的创建策略与命名等。
      • 返回类型:ExecutorService(实为 ThreadPoolExecutor)。
      • 注意:在突发流量下可能创建大量线程,存在资源枯竭风险;需配合上游限流/信号量,或优先使用自定义可控的 ThreadPoolExecutor。
    • Executors.newSingleThreadExecutor()

    • Executors.newSingleThreadExecutor(ThreadFactory threadFactory)

      • 作用:单线程串行执行任务,保证任务按提交顺序(FIFO)执行;若线程异常退出会创建新线程接替。
      • 参数:
        • threadFactory:可选,自定义线程特性。
      • 返回类型:ExecutorService(实为 ThreadPoolExecutor,核心=最大=1,队列为无界 LinkedBlockingQueue)。
      • 注意:无界队列可能堆积任务导致延迟放大;不适合高吞吐或长阻塞任务。
    • Executors.newScheduledThreadPool(int corePoolSize)

    • Executors.newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

      • 作用:支持定时/周期性任务调度;内部为 ScheduledThreadPoolExecutor,使用基于时间的延迟队列。
      • 参数:
        • corePoolSize:核心线程数。用于并行触发定时/周期任务;核心线程默认不回收。
        • threadFactory:可选,自定义调度线程特性;建议命名以区分业务线程。
      • 返回类型:ScheduledExecutorService(实为 ScheduledThreadPoolExecutor)。
      • 注意:定时任务应短小无阻塞;长任务应拆分或使用独立执行器,避免挡住调度线程。

4.1 为什么尽量自定义 ThreadPoolExecutor

  • Executors 快捷工厂的隐患:
    • newFixedThreadPool 使用无界队列(LinkedBlockingQueue),高峰期任务堆积可能 OOM。
    • newCachedThreadPool 最大线程数 Integer.MAX_VALUE,极端情况下可能创建过多线程导致资源枯竭。
    • newSingleThreadExecutor 单线程串行执行,后台堆积同样可能放大延迟。
  • 建议:使用 ThreadPoolExecutor 显式配置核心参数与队列边界,结合拒绝策略形成背压。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class PooledExecutors {
    // 命名线程工厂,便于排障
    static class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private final AtomicInteger idx = new AtomicInteger(1);
        NamedThreadFactory(String prefix) { this.prefix = prefix; }
        @Override public Thread newThread(Runnable r) {
            Thread t = new Thread(r, prefix + "-" + idx.getAndIncrement());
            t.setDaemon(false);
            t.setUncaughtExceptionHandler((thr, ex) ->
                System.err.println(thr.getName() + " uncaught: " + ex.getMessage()));
            return t;
        }
    }

    public static ExecutorService buildBoundedPool() {
        int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
        int max = cores * 2 + 1;
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1024); // 有界队列形成背压
        ThreadFactory tf = new NamedThreadFactory("biz-pool");
        RejectedExecutionHandler reject = new ThreadPoolExecutor.CallerRunsPolicy(); // 调用方回退,天然限流
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                cores, max,
                30L, TimeUnit.SECONDS,
                queue, tf, reject);
        pool.allowCoreThreadTimeOut(true); // 允许核心线程超时回收
        return pool;
    }
}

要点:

  • 使用有界队列(ArrayBlockingQueue/LinkedBlockingQueue(带容量))以避免无限堆积。
  • 选择合适拒绝策略:
    • AbortPolicy(默认,抛异常);CallerRunsPolicy(调用方执行,背压);
    • DiscardPolicy(丢弃);DiscardOldestPolicy(丢最老)。
  • 根据任务类型设置线程上限与存活时间:
    • IO 密集:线程数可高于 CPU 核心,但要观测上下文切换与资源句柄用量;
    • CPU 密集:线程数≈核心数,避免过度切换。
4.1.1 ThreadPoolExecutor 构造参数全解

构造重载签名:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数说明与取舍:

  • corePoolSize
    • 基线线程数。小于该值时有新任务会优先创建新线程(即便队列未满)。
    • allowCoreThreadTimeOut(true) 后,核心线程也会在空闲 keepAliveTime 后退出。
  • maximumPoolSize
    • 线程上限。队列无法接收时(如有界队列满或 SynchronousQueue 直接移交失败),且当前线程数小于该上限时会创建新线程执行。
    • 若使用无界队列(默认 LinkedBlockingQueue 无容量),maximumPoolSize 实际无效,线程数基本固定在 corePoolSize。
  • keepAliveTime + unit
    • 非核心线程空闲存活时间;开启 allowCoreThreadTimeOut(true) 后也作用于核心线程。
    • 过短会频繁创建/销毁,过长空闲线程占用内存与栈资源。
  • workQueue
    • 任务排队容器,决定伸缩与拒绝行为的关键:
      • SynchronousQueue:零容量“直接移交”;更易扩容到 maximumPoolSize,适合短任务+高并发但需强限流。
      • ArrayBlockingQueue(cap):有界数组;可结合 CallerRunsPolicy 形成自然背压,容量宜与突发窗口匹配。
      • LinkedBlockingQueue([cap]):链表;不传 cap 时近似无界(不建议生产使用);传 cap 则为有界。
      • PriorityBlockingQueue:按优先级出队;注意任务需实现 Comparable。
  • threadFactory
    • 自定义线程名、优先级、守护属性与 UncaughtExceptionHandler;建议按“业务-用途-编号”命名,便于观测/排障。
  • handler(RejectedExecutionHandler)
    • 当“线程数已达 maximum 且队列无法再接收”或执行器已 shutdown 时触发:
      • AbortPolicy:抛 RejectedExecutionException,显式失败(默认)。
      • CallerRunsPolicy:在提交线程中执行任务,实现背压。
      • DiscardPolicy:直接丢弃任务。
      • DiscardOldestPolicy:丢弃队列头最老任务,再尝试入队当前任务。
    • 可自定义告警/打点/降级逻辑。

任务接纳流程(简化):

  1. 若运行线程数 < corePoolSize → 新建线程执行;
  2. 否则尝试入队 workQueue → 成功则排队;
  3. 入队失败且运行线程数 < maximumPoolSize → 新建线程执行;
  4. 否则触发拒绝策略 handler。

相关可选配置与方法:

  • allowCoreThreadTimeOut(true):允许核心线程空闲超时退出。
  • prestartAllCoreThreads()/prestartCoreThread():预热核心线程,降低首批任务延迟。
  • setRejectedExecutionHandler(…) / setThreadFactory(…) / setKeepAliveTime(…):运行期调优。
  • getPoolSize()/getActiveCount()/getQueue().size()/getLargestPoolSize()/getCompletedTaskCount():监控指标。

最佳实践摘要:

  • 一律使用“有界队列 + 合理 maximumPoolSize + 拒绝策略”形成背压。
  • 根据任务性质(CPU/IO/阻塞时长)压测得出合理 core/max/queue 容量与 keepAlive。
  • 统一命名与异常处理,接入指标与日志,设定报警阈值与自恢复策略。

4.2 优雅关闭与资源回收

import java.util.concurrent.*;

public class ShutdownSample {
    public static void gracefulShutdown(ExecutorService pool) {
        pool.shutdown(); // 拒绝新任务,执行已入队任务
        try {
            if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 中断正在执行的任务并清空队列
                if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException e) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
  • 在容器/服务停止钩子中调用优雅关闭,避免任务“半途而废”。

4.3 监控与调优指标

  • 关键指标:getPoolSize()getActiveCount()getQueue().size()getLargestPoolSize()getCompletedTaskCount()
  • 观察维度:任务延迟(排队时间)、执行时间分布、拒绝次数、上下文切换、CPU 占用、GC 压力。
  • 建议接入 Micrometer/JMX 导出指标,结合告警阈值动态调参。

4.4 ScheduledExecutor 的定时语义

  • scheduleAtFixedRate(initialDelay, period, unit):基于固定频率,尽量按“预定时刻”补偿漂移;若任务执行超过 period,将串行按最小间隔继续。
  • scheduleWithFixedDelay(initialDelay, delay, unit):基于固定延迟,从上次任务“完成时刻”开始计算下一次。
import java.util.concurrent.*;

ScheduledExecutorService ses = Executors.newScheduledThreadPool(2);
ses.scheduleAtFixedRate(() -> {/* metrics */}, 1, 1, TimeUnit.SECONDS);
ses.scheduleWithFixedDelay(() -> {/* health-check */}, 0, 5, TimeUnit.SECONDS);
  • 定时任务应短小且无阻塞;长任务独立线程池或拆分。

4.5 虚拟线程(Java 21)

  • Executors.newVirtualThreadPerTaskExecutor() 适合大量“阻塞型”任务(如阻塞 IO),以较低成本挂起/恢复。
  • 注意事项:
    • CPU 密集任务并不会因虚拟线程增多而更快;仍受 CPU 限制。
    • 避免长期持有同步器/本地代码调用造成 carrier pinning。
    • 仍需背压策略与并发上限(如信号量)防止外部系统被打爆。
import java.util.concurrent.*;

try (ExecutorService vexec = Executors.newVirtualThreadPerTaskExecutor()) {
    vexec.submit(() -> {
        // 阻塞 IO 示例:虚拟线程可廉价挂起
        Thread.sleep(100);
        return "ok";
    });
}

4.6 常见选型与配置建议

  • CPU 密集:自定义 ThreadPoolExecutor,线程数≈核心或略多;无界提交改为背压。
  • IO 密集:线程数>核心,设置有界队列 + CallerRunsPolicy,必要时分级限流。
  • 批处理/限时任务:设置合理 keepAlive + allowCoreThreadTimeOut,空闲时收缩线程。
  • 定时任务:使用 ScheduledExecutorService,区分 fixedRate 与 fixedDelay。
  • 高吞吐短任务:优先减少锁竞争与对象分配,尽量避免在线程池任务中进行阻塞 IO。
  • 不要在映射器/回调中执行重操作或互相等待,防止线程池“自我阻塞”。

3. 线程的生命周期

Java线程有以下六种状态(定义在Thread.State枚举中):

  1. NEW(新建): 线程对象创建但未调用start()
  2. RUNNABLE(可运行): 线程已调用start(),可能正在运行或等待CPU调度。
  3. BLOCKED(阻塞): 线程等待获取锁(如synchronized块)。
  4. WAITING(等待): 线程通过wait()join()LockSupport.park()进入无限期等待。
  5. TIMED_WAITING(定时等待): 线程通过sleep(long)wait(long)join(long)进入有限期等待。
  6. TERMINATED(终止): 线程执行完成或异常终止。

生命周期图示:

NEW → RUNNABLE → (BLOCKED / WAITING / TIMED_WAITING) → TERMINATED

thread life cycle

4. 线程的常用方法

以下是Thread类和相关类的常用方法:

  • start(): 启动线程,调用run()方法。
  • run(): 线程的执行逻辑,需重写。
  • sleep(long millis): 让线程休眠指定时间,不释放锁。
  • yield(): 让出CPU,线程仍为RUNNABLE状态。
  • join(): 等待线程执行完成。
  • interrupt(): 中断线程(设置中断标志,可能抛出InterruptedException)。
  • isAlive(): 检查线程是否存活。
  • setPriority(int): 设置线程优先级(1-10,默认为5)。
  • setDaemon(boolean): 设置为守护线程(随主线程结束而结束)。

5. 线程同步

多线程访问共享资源可能导致数据不一致或线程安全问题,Java提供了多种同步机制。

(1) synchronized关键字

用于方法或代码块,synchronized 基于对象监视器(Monitor)实现互斥与可见性保障。进入同步块(monitorenter)需要获取对象监视器;退出同步块(monitorexit)自动释放。异常退出也会释放锁。

Monitor 机制概览

  • 对象与监视器
    • 每个对象天生携带一个内部锁(对象监视器/Monitor),synchronized 即对该监视器的获取与释放。
    • 监视器与对象头中的 Mark Word 关联,JVM 根据竞争态将锁在不同形态间转换(无锁 → 轻量级自旋 → 重量级阻塞);JDK 15 起移除了偏向锁。
  • 指令与同步方式
    • 同步代码块在字节码层面编译为 monitorenter/monitorexit 指令对。
    • 同步方法不会显式生成 monitorenter/monitorexit,而是通过方法标志位(ACC_SYNCHRONIZED)由 JVM 在调用/返回时隐式进入/退出监视器。
  • 等待与两个队列
    • EntryList: 等待获取监视器的线程队列(竞争互斥)。
    • WaitSet: 持有锁的线程调用 wait 后会释放监视器并挂起到 WaitSet;notify/notifyAll 仅将其迁移回 EntryList,真正恢复运行还需重新竞争监视器。
  • 重入与计数
    • 监视器是可重入的;同一线程可多次进入同一监视器,JVM 维护持有计数,退出时成对递减。
  • 内存语义与可见性
    • 退出监视器具备发布(release)效果;进入监视器具备获取(acquire)效果;形成 happens-before:先释放后获取的线程能看到临界区内的最新写入。
  • 中断与阻塞语义
    • 竞争进入监视器的阻塞不可被中断;处于 wait 的线程可被中断并抛出 InterruptedException。
  • 实现与优化
    • 采用自旋 + 阻塞的混合策略以降低用户态/内核态切换成本;JIT 可能进行锁粗化与锁消除优化。
  • 使用建议
    • 锁对象应为 private final,避免锁定 this、Class 对象或字符串常量。
    • wait/notify/notifyAll 必须在持有同一监视器的 synchronized 块内调用,并使用 while 循环检查条件以规避虚假唤醒。
    • 临界区尽量短小,避免其中执行 I/O、RPC 或长耗时计算。

1. 锁的粒度与形态

  • 实例锁(对象监视器):
    // 等价于 synchronized(this) { ... }
    public synchronized void method() { /* ... */ }     
  • 类锁(类对象监视器):
    // 等价于 synchronized(ClassName.class) { ... }
    public static synchronized void staticMethod() { /* ... */ }  
  • 说明:
    • 不同实例拥有各自的监视器,互不干扰;类锁在整个 JVM 中对该类唯一。
    • 现代 HotSpot 使用轻量级自旋 + 持有者阻塞策略;JDK 15 起移除了“偏向锁”。

2. 可重入性与持有计数

  • 同一线程可重复获取同一把锁(重入),JVM 维护持有计数,退出时成对减少。
public synchronized void outer() {
    inner(); // 同一线程重入,不会死锁
}
public synchronized void inner() { /* ... */ }

3. Java 内存模型(JMM)中的 happens-before

  • 对同一监视器:
    • 线程 A 的“退出(monitorexit)”happens-before 线程 B 之后对同一监视器的“进入(monitorenter)”。
  • wait/notify:
    • 对同一监视器的 notify/notifyAll 发生在被唤醒线程从 wait 返回之前。
  • 直接结论:
    • 在同步块内写入的变化,对随后进入同一监视器的其他线程可见。
    • 释放锁是“发布”(release)屏障;获取锁是“获取”(acquire)屏障。

4. 中断与阻塞语义

  • 进入 synchronized 的等待不可被中断(无法通过 Thread.interrupt()打断进入监视器的竞争)。
  • wait() 可被中断,会抛出 InterruptedException;应及时恢复中断或按业务处理。
final Object lock = new Object();
public void awaitFlag() {
    synchronized (lock) {
        while (!condition) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断标志
                return; // 或者按需处理
            }
        }
        // 条件满足后的逻辑
    }
}

5. 正确使用 wait/notify 的模式

  • 永远用“条件-循环”模式防止“虚假唤醒”(spurious wakeup)与“丢信号”:
final Object lock = new Object();
boolean ready = false;

public void awaitReady() throws InterruptedException {
    synchronized (lock) {
        while (!ready) {             // 使用 while 而非 if
            lock.wait();             // 释放监视器并等待
        }
        // 继续执行
    }
}

public void signalReady() {
    synchronized (lock) {
        ready = true;
        lock.notifyAll();            // 多条件或多等待者场景推荐 notifyAll
    }
}
  • 关键要点:
    • wait/notify/notifyAll 必须在持有相同监视器的 synchronized 块内调用。
    • notify 只唤醒一个等待者,可能导致“唤错对象”;复杂场景优先 notifyAll,或使用多个监视器/Condition 分离条件。
    • 避免“先通知后等待”的丢信号问题: 用共享条件变量 + while 检查。

6. 锁对象的选择与封装

  • 避免锁定外部可见对象(包括 thisClass、常量字符串),防止外部代码干扰:
private final Object lock = new Object(); // 私有、不可变
public void safeInc() {
    synchronized (lock) {
        // 临界区
    }
}

// 反例: 不要锁定字符串常量或公共对象
// synchronized ("LOCK") { ... }  // 字符串常量会被驻留,可能与他处冲突
// synchronized (SomeClass.class) { ... } // 类锁影响全局
  • 锁对象应为 private final,避免被替换或暴露。

7. 同步集合的迭代规约

  • Collections.synchronizedXxx 在遍历时仍需外部同步,否则遍历期间结构变化会不安全:
import java.util.*;

List<Integer> list = Collections.synchronizedList(new ArrayList<>());
// 写入可以直接调用 list.add(...)
synchronized (list) {                 // 在同一监视器上外部加锁再遍历
    for (Integer v : list) {
        // 访问 v
    }
}

8. 发布与可见性模式

  • 通过以下方式进行“安全发布”(Safe Publication):
    • 静态初始化或不可变对象
    • volatile 字段
    • 通过 synchronized 的发布与访问
  • synchronized 替代 volatile 的示例(互斥 + 可见性):
private int flag = 0;
private final Object lock = new Object();

public void setFlag(int v) {
    synchronized (lock) {
        flag = v;                    // 写入对随后获取同一锁的线程可见
    }
}

public int getFlag() {
    synchronized (lock) {
        return flag;                 // 读取看到最新
    }
}
  • 何时选 volatile: 变量独立、无复合不变式、只需可见性与有序性、不需要原子复合操作。
  • 何时选 synchronized: 存在复合操作或不变式,需要互斥与可见性同时保证。

9. 性能与编译器优化

  • 最小化临界区: 将 synchronized 作用范围缩到最小,避免包含 I/O、RPC、睡眠等。
  • JVM 优化:
    • 锁粗化(coarsening)与锁消除(elimination)可能改变实际加锁边界与开销。
    • 轻量级自旋后再阻塞,减轻用户态/内核态切换。
  • 在高并发吞吐需求下,优先选择更高层并发结构(ConcurrentHashMapBlockingQueue 等)降低手工加锁复杂度。

10. 死锁与规约

  • 多把锁按统一顺序获取,跨组件共享时制定顺序协议。
  • 保持临界区短小,减少嵌套锁与锁竞争。
  • 使用“超时 + 退避”策略需转向 Lock.tryLock(...)synchronized 不支持超时或可中断获取。

11. 方法锁与块锁的取舍

  • 更推荐块级 synchronized,仅包裹必要的共享状态操作,避免方法级锁导致的过度串行化。

(2) Lock 接口详解(ReentrantLock/ReadWrite/Stamped)

java.util.concurrent.locks.Lock 提供了比 synchronized 更丰富的能力: 可中断加锁、超时获取、公平性、条件队列、多种锁形态等。最常用实现是 ReentrantLock

  • 基础模式(必须用 try/finally 释放):
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();                 // 阻塞直到获取到锁
        try {
            count++;
        } finally {
            lock.unlock();           // 必须在 finally 中释放
        }
    }
}
  • 非阻塞尝试获取(立即返回):
import java.util.concurrent.locks.ReentrantLock;

public class TryLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void executeIfFree() {
        if (lock.tryLock()) {        // 不可用即刻返回 false
            try {
                // 执行业务(尽量短小)
            } finally {
                lock.unlock();
            }
        } else {
            // 走降级/排队/丢弃策略
        }
    }
}
  • 支持可中断与超时:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class InterruptibleAndTimedLock {
    private final ReentrantLock lock = new ReentrantLock();

    // 可中断获取: 响应线程中断,避免无限等待
    public void doWorkInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            // 执行逻辑
        } finally {
            lock.unlock();
        }
    }

    // 超时获取: 在限定时间内未获取到则放弃
    public boolean doWorkWithTimeout() throws InterruptedException {
        if (lock.tryLock(200, TimeUnit.MILLISECONDS)) {
            try {
                // 执行逻辑
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
}
  • 公平锁: 按等待时间顺序获取,避免饥饿(吞吐略降)
import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {
    private final ReentrantLock fairLock = new ReentrantLock(true); // true = 公平
}
  • 常用状态查询(仅用于监控/调试,勿据此做强逻辑判断):

    • isLocked(): 当前是否被任何线程持有
    • isHeldByCurrentThread(): 当前线程是否持有
    • getHoldCount(): 重入次数
    • hasQueuedThreads() / getQueueLength(): 是否有等待者/大致数量
  • 条件队列 Condition(与 Object.wait/notify 等价但更灵活): 一个锁可创建多个条件队列,分离不同等待理由,减少无关唤醒。详见下文“Condition”小节。

  • 死锁规避示例: 双锁获取用“超时 + 退避重试”

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadLocalRandom;

public class DeadlockAvoid {
    private final Lock lockA = new ReentrantLock();
    private final Lock lockB = new ReentrantLock();

    public void doWithTwoLocks() throws InterruptedException {
        while (true) {
            boolean a = lockA.tryLock(50, TimeUnit.MILLISECONDS);
            if (a) {
                try {
                    boolean b = lockB.tryLock(50, TimeUnit.MILLISECONDS);
                    if (b) {
                        try {
                            // 临界区
                            return;  // 完成退出
                        } finally {
                            lockB.unlock();
                        }
                    }
                } finally {
                    lockA.unlock();
                }
            }
            // 退避一小段随机时间,降低活锁概率
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 25));
        }
    }
}
  • 与 synchronized 的差异简述:
    • 互斥/可见性保证等价(进入/退出临界区的内存语义一致)
    • Lock 支持公平性、可中断、超时、多个条件队列
    • 需要手动释放,错误释放更易出错;因此必须使用 try/finally

ReentrantReadWriteLock(读写锁)

对读多写少场景提升吞吐。多读可并行,写独占。

import java.util.concurrent.locks.*;

public class RwCache<K, V> {
    private final java.util.Map<K, V> map = new java.util.HashMap<>();
    private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private final Lock r = rw.readLock();
    private final Lock w = rw.writeLock();

    public V get(K key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    public void put(K key, V value) {
        w.lock();
        try {
            map.put(key, value);
        } finally {
            w.unlock();
        }
    }

    // 降级: 持有写锁期间获取读锁,然后释放写锁,保持读锁
    public V putAndRead(K key, V value) {
        w.lock();
        try {
            map.put(key, value);
            r.lock();              // 先获取读锁
        } finally {
            w.unlock();            // 再释放写锁(完成降级)
        }
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
}

注意:

  • 不支持升级(读->写),否则容易死锁;需释放读锁再尝试写锁,或改造逻辑。
  • 公平模式可避免长写被读吞噬,但吞吐降低。

StampedLock(乐观读/写锁)

相较读写锁,提供乐观读以进一步提升读性能;但不重入,且条件受限(不支持可重入、lockInterruptibly,谨慎使用)。

import java.util.concurrent.locks.StampedLock;

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    // 乐观读: 先无锁读取,再验证,无效则退化为悲观读
    public double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double cx = x, cy = y;
        if (!sl.validate(stamp)) {             // 有写入则验证失败
            stamp = sl.readLock();
            try {
                cx = x; cy = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.hypot(cx, cy);
    }

    // 写锁: 独占
    public void move(double dx, double dy) {
        long stamp = sl.writeLock();
        try {
            x += dx; y += dy;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 尝试读->写转换(避免释放-再申请的窗口期)
    public void moveIfAt(double ox, double oy, double nx, double ny) {
        long stamp = sl.readLock();
        try {
            while (x == ox && y == oy) {
                long ws = sl.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    stamp = ws;        // 转换成功,现为写锁
                    x = nx; y = ny;
                    return;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock(); // 转换失败,显式获取写锁
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

注意:

  • 不可重入;严禁在同一线程中重复加同类锁,否则死锁。
  • 默认不可中断,涉及中断语义的场景优先 ReentrantLock。

使用建议(Best Practices)

  • 始终使用 try/finally 释放锁;在 finally 中仅做必要释放,避免抛异常。
  • 允许中断就用 lockInterruptibly();需要超时就用 tryLock(timeout, unit)
  • 临界区尽量短小,不做阻塞 I/O、远程调用或耗时计算。
  • 明确并记录多把锁的获取顺序;跨组件共享锁时使用统一顺序协议。
  • 读多写少选 ReentrantReadWriteLock;超高读频率且可接受限制时再考虑 StampedLock
  • 优先使用更高层并发容器(例如 ConcurrentHashMapBlockingQueue),减少手写锁逻辑。
  • 只在持有锁且 isHeldByCurrentThread() 为 true 时解锁,防止非法解锁。

(3) volatile关键字

确保变量的可见性,防止指令重排序,但不保证原子性。

public class Singleton {
    private static volatile Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

(4) 原子类(AtomicXxx) 详解

原子类提供无锁(非阻塞)的原子性读改写能力,底层依赖硬件的 CAS(Compare-And-Swap) 指令与内存屏障。它们通常用于高性能计数器、状态位切换、引用替换等场景,能在高并发下减少线程上下文切换与锁竞争。

4.1 基本概念与 CAS

  • CAS(Compare-And-Set): 在“期望值等于当前值”时,原子地将变量更新为新值,返回是否成功。
  • 在高并发下,CAS 常配合自旋重试使用,避免线程阻塞/唤醒的开销。
  • 典型代码(自旋 + CAS):
import java.util.concurrent.atomic.AtomicInteger;

public class CasAdder {
    private final AtomicInteger value = new AtomicInteger(0);

    public void add(int delta) {
        int prev, next;
        do {
            prev = value.get();
            next = prev + delta;
        } while (!value.compareAndSet(prev, next)); // 失败则重试(自旋)
    }

    public int get() {
        return value.get();
    }
}
  • 常见方法语义速览(以数值原子类为例):
    • get(): 具备 volatile 读语义(获取/可见性)
    • set(v): 具备 volatile 写语义(发布/可见性)
    • lazySet(v): 更弱的发布语义(最终可见,低开销,JDK 9 以后等价 setRelease)
    • compareAndSet(expect, update): CAS,成功时具备全栅栏效果
    • weakCompareAndSet(…): 可能“伪失败”,需放入循环重试;在部分架构上更快
    • getAndSet(v)/getAndAdd(x)/addAndGet(x)/incrementAndGet()/getAndIncrement():
      读改写复合原子操作
    • getAndUpdate(fn)/updateAndGet(fn)、getAndAccumulate(x, acc)/accumulateAndGet(x, acc):
      使用函数式更新,避免手写自旋循环

注意: CAS 存在 ABA 问题,即值从 A→B→A 的往返无法被单纯 CAS 发现;需要“带戳”或“带标记”的引用类来规避。

4.2 常见原子类族与适用场景

  • 基本数值/布尔
    • AtomicBoolean, AtomicInteger, AtomicLong
    • 适用于计数器、开关、序列号等单变量原子更新
import java.util.concurrent.atomic.AtomicLong;

public class SimpleCounter {
    private final AtomicLong counter = new AtomicLong();

    public long nextId() { // 自增并返回
        return counter.incrementAndGet();
    }

    public void add(long delta) {
        counter.addAndGet(delta);
    }

    public long current() {
        return counter.get();
    }
}
  • 高并发计数/累加
    • LongAdder/DoubleAdder: 通过分段-分箱(Striped64)技术降低热点竞争,在高争用下远优于 AtomicLong
    • LongAccumulator/DoubleAccumulator: 自定义聚合函数的累加器
    • 注意: sum()/sumThenReset() 不是线性化快照,适合统计/指标,不适合作为强一致计数
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

public class QpsMeter {
    private final LongAdder adder = new LongAdder();
    private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();

    public void startReport() {
        ses.scheduleAtFixedRate(() -> {
            long qps = adder.sumThenReset(); // 取值后清零
            System.out.println("QPS: " + qps);
        }, 1, 1, TimeUnit.SECONDS);
    }

    public void record() {
        adder.increment();
    }
}
  • 引用与带版本/标记的引用
    • AtomicReference: 原子替换引用,常与不可变对象(Immutable)搭配,避免“半更新”状态
import java.util.concurrent.atomic.AtomicReference;

public class ConfigHolder {
    public static final class Config {
        public final int timeoutMs;
        public final boolean enabled;
        public Config(int timeoutMs, boolean enabled) {
            this.timeoutMs = timeoutMs;
            this.enabled = enabled;
        }
        public Config withTimeout(int t) { return new Config(t, this.enabled); }
        public Config withEnabled(boolean e) { return new Config(this.timeoutMs, e); }
    }

    private final AtomicReference<Config> holder = new AtomicReference<>(new Config(3000, true));

    public void updateTimeout(int t) {
        holder.updateAndGet(cfg -> cfg.withTimeout(t));
    }

    public Config get() { return holder.get(); }
}
  • 解决 ABA:
    • AtomicStampedReference: 附加整数“戳”(版本)
    • AtomicMarkableReference: 附加布尔“标记”
import java.util.concurrent.atomic.AtomicStampedReference;

public class AbaSafeRef {
    private final AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);

    public boolean tryUpdateTo101() {
        int[] stamp = new int[1];
        Integer cur = ref.get(stamp);
        int newStamp = stamp[0] + 1;
        return ref.compareAndSet(cur, 101, stamp[0], newStamp);
    }
}
  • 数组与字段更新器
    • AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray:
      对数组下标的原子操作
import java.util.concurrent.atomic.AtomicIntegerArray;

public class ShardedCounter {
    private final AtomicIntegerArray buckets;

    public ShardedCounter(int shards) {
        this.buckets = new AtomicIntegerArray(shards);
    }

    public void add(int shard, int delta) {
        buckets.addAndGet(shard, delta);
    }

    public int sum() {
        int s = 0;
        for (int i = 0; i < buckets.length(); i++) s += buckets.get(i);
        return s;
    }
}
  • 字段更新器(反射驱动,较低层、不如直接原子字段高效):
    • AtomicIntegerFieldUpdater/AtomicLongFieldUpdater/AtomicReferenceFieldUpdater
    • 约束: 目标字段必须为非 final 的 volatile,具备恰当可见性(访问修饰符);过度使用会有反射与安全检查开销
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class StateBox {
    volatile int state; // 必须 volatile、非 final
    private static final AtomicIntegerFieldUpdater<StateBox> UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(StateBox.class, "state");

    public boolean cas(int expect, int update) {
        return UPDATER.compareAndSet(this, expect, update);
    }
}

4.3 内存语义与可见性

  • 原子类的 get/set 与 volatile 字段读写语义一致:
    • get(): 获取/可见性(类似 acquire)
    • set(): 发布/可见性(类似 release)
    • lazySet(): 更弱的发布(最终一致,对随后线程“迟些”可见,吞吐更好)
  • 读改写复合操作(getAndXxx/compareAndSet/updateAndGet 等)在成功路径上提供全栅栏,确保复合更新对其他线程的可见性与有序性。
  • 尽量不要将“原子类 + 普通字段”混用形成跨变量不变式;否则需额外同步手段保证整体可见性与原子性。

4.4 常见范式与实战示例

  • 原子最大值/最小值(使用函数式累加避免手写自旋):
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicMax {
    private final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);

    public void record(int candidate) {
        max.accumulateAndGet(candidate, Math::max);
    }

    public int getMax() {
        return max.get();
    }
}
  • 无锁栈的 ABA 风险(简述):

    • 若使用 AtomicReference做栈顶 CAS,A→B→A 的往返会被误判为“未变”。
    • 采用 AtomicStampedReference在栈顶附加版本戳可规避。
    • 实现复杂度高,生产中优先选择成熟队列/栈实现(如 JCTools/BlockingQueue)。
  • 高争用计数器选择:

    • 单核/低争用: 选 AtomicLong
    • 多核/高争用(热点更新): 选 LongAdder/LongAccumulator
    • 需要精确线性化读快照: 选 AtomicLong(但读会阻塞写的一致性窗口)

4.5 何时不该使用原子类

  • 跨多个变量/对象的不变式与事务性要求(例如“余额 + 库存”同时扣减),应使用锁、STM 或数据库事务。
  • 涉及阻塞等待、条件协调的场景,用 Lock + Condition 或更高层的同步器(如 Semaphore、CountDownLatch、Phaser)。
  • 大量对象字段用字段更新器的批量微优化不一定划算;优先考虑设计层面消除热点与共享。

4.6 性能与伪共享(False Sharing)

  • 多线程更新相邻内存的不同变量会竞争同一缓存行,导致性能骤降。
  • 规避手段:
    • 使用 LongAdder 这类内部带填充/分箱的实现
    • 自行分片(如分桶数组/多实例),避免多个线程聚焦同一内存热点
    • JDK 提供 @Contended 注解(需 JVM 参数启用)可进行填充,但不建议在通用库外滥用

4.7 小结(原子类最佳实践)

  • 单变量原子更新优先用数值原子类,复合逻辑用 updateAndGet/accumulateAndGet。
  • 高争用计数优先 LongAdder;需要强一致读再退回 AtomicLong。
  • 对象引用更新配合不可变数据结构使用引用原子类,避免“半状态”可见。
  • 需要抗 ABA 时使用 Stamped/Markable 引用类。
  • 避免将原子类当作“轻量锁”跨多变量使用;遇到不变式就引入锁或更高层抽象。

(5) 线程安全集合

在并发场景下优先选择内置的并发集合而非手写锁逻辑,可以显著降低复杂度与提升吞吐。Java 提供了三大类线程安全集合方案:

  • 同步包装器(阻塞式):Collections.synchronizedXxx(...)
  • 无锁或细粒度锁的并发集合:ConcurrentHashMapConcurrentLinkedQueue/DequeConcurrentSkipListMap/Set
  • 阻塞/可转移队列族:BlockingQueueTransferQueue 及其实现

下面对常用集合的语义、适用场景、常见陷阱与最佳实践做系统总结。

5.1 同步包装器 Collections.synchronizedXxx

  • 用法:Collections.synchronizedList(new ArrayList<>())Collections.synchronizedMap(new HashMap<>()) 等。
  • 特点:
    • 对每次访问使用同一把内置锁,简单直接,但在高并发下吞吐较低。
    • 迭代必须“外部再加锁”(见上文“同步集合的迭代规约”),否则遍历期间结构性变化可能不安全。
  • 适合:并发度不高、迁移成本低、代码量少的旧代码快速加固。
  • 不适合:高并发、读多写多或需要无阻塞迭代的场景。
import java.util.*;

List<Integer> list = Collections.synchronizedList(new ArrayList<>());
list.add(1); // 写入线程安全
synchronized (list) { // 遍历前额外加锁
    for (Integer v : list) {
        // ...
    }
}

5.2 Copy-On-Write 系列:CopyOnWriteArrayList/CopyOnWriteArraySet

  • 写时复制:每次写操作都会复制底层数组,写代价高、读无需加锁且遍历为“快照”。
  • 读多写少、元素数量较小的场景极佳,如“监听器列表/订阅者列表”、“白名单配置”等。
  • 迭代器不抛 ConcurrentModificationException,且不反映迭代期间的后续写入。
  • 限制与陷阱:
    • 写入、批量操作成本高;不适合大集合或高写频的负载。
    • 迭代器不支持 remove(),会抛 UnsupportedOperationException。
import java.util.concurrent.CopyOnWriteArrayList;

CopyOnWriteArrayList<String> listeners = new CopyOnWriteArrayList<>();
listeners.add("A");
for (String l : listeners) { // 快照遍历,无需外部同步
    // 回调通知
}

5.3 非阻塞队列:ConcurrentLinkedQueue/ConcurrentLinkedDeque

  • 无界、无锁(基于 CAS)的多生产者多消费者队列/双端队列。
  • 操作是非阻塞的(offer/poll 返回立即结果),适合高吞吐、低延迟的“邮箱/缓冲”。
  • 迭代为“弱一致”(weakly consistent),不抛 CME,能容忍并发修改。
  • size() 为 O(n) 且仅近似,避免在热路径频繁调用。
import java.util.concurrent.ConcurrentLinkedQueue;

ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.offer("a");
String v = q.poll(); // 可能为 null(队列空)

5.4 阻塞/转移队列族:BlockingQueue 与 TransferQueue

BlockingQueue 提供“阻塞 put/take 与超时 offer/poll”,非常适合生产者-消费者。常见实现:

  • ArrayBlockingQueue:有界数组环形队列;内存本地性好,支持公平模式;
  • LinkedBlockingQueue:链表结构,容量可选(默认近似无界);在高并发下吞吐稳定;
  • SynchronousQueue:容量为 0 的“直接移交”队列;生产者与消费者必须配对(常用于 Cached 线程池);
  • PriorityBlockingQueue:基于堆的优先队列(无界);按优先级出队;
  • DelayQueue:基于时间窗口的队列,元素实现 Delayed;只在到期后可出队;
  • LinkedTransferQueue(TransferQueue):支持 transfer 语义,有等待消费者时可直接“交付”。

关键语义:

  • put/take:无超时的阻塞版本;offer/poll:立即/带超时版本;
  • 方法可响应中断:捕获 InterruptedException 并及时恢复中断标志或退出。
import java.util.concurrent.*;

// 直接交付,若有等待消费者可零拷贝转移
LinkedTransferQueue<String> tq = new LinkedTransferQueue<>();

new Thread(() -> {
    try {
        tq.transfer("event"); // 等待直到有消费者接收
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

new Thread(() -> {
    try {
        System.out.println("got: " + tq.take());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

5.5 并发 Map/Set:ConcurrentHashMap 与 SkipList 家族

ConcurrentHashMap(无序)
  • 禁止 null key 与 null value。
  • 迭代器为弱一致,不抛 CME;size() 仅近似,JDK 8 提供 mappingCount() 作为 long 近似计数。
  • 原子复合操作(针对“单个 key”):
    • putIfAbsentcomputecomputeIfAbsent/Presentmergereplace(key, oldVal, newVal)
    • 用于替代“get-then-put”的竞态模式,确保同一 key 上的复合逻辑原子执行。
  • 批量与并行操作:forEach(long threshold, ...)reduce*search*;阈值决定是否并行,映射函数应避免阻塞或长耗时。
  • 常见陷阱:
    • “get 后判断再 put”不是原子操作;请使用 computeIfAbsent/merge
    • 在映射函数中进行阻塞 I/O 或调用同一 Map 的结构性修改,可能造成性能与可维护性问题。
  • 示例:并发构建“多值 Map”(Map<K, List>)
import java.util.*;
import java.util.concurrent.*;

public class MultiMap<K, V> {
    private final ConcurrentHashMap<K, List<V>> map = new ConcurrentHashMap<>();

    public void add(K k, V v) {
        map.computeIfAbsent(k, kk -> Collections.synchronizedList(new ArrayList<>()))
           .add(v); // 列表内部仍需同步,或改用并发友好容器
    }

    public List<V> get(K k) {
        return map.getOrDefault(k, List.of());
    }
}
  • 计数器模式:热点场景优先 LongAdder 减少争用
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class KeyedCounter<K> {
    private final ConcurrentHashMap<K, LongAdder> counters = new ConcurrentHashMap<>();

    public void incr(K k) {
        counters.computeIfAbsent(k, kk -> new LongAdder()).increment();
    }

    public long get(K k) {
        LongAdder adder = counters.get(k);
        return adder == null ? 0L : adder.sum();
    }
}
ConcurrentSkipListMap / ConcurrentSkipListSet(有序/可导航)
  • 基于跳表的并发有序 Map/Set,提供 NavigableMap/NavigableSet 能力:subMap/headMap/tailMapceiling/floor/higher/lower 等。
  • 读取与范围视图在读多写少且需要排序/范围查询时优势明显;代价是相对 ConcurrentHashMap 写入更慢、内存开销更高。
import java.util.concurrent.*;

ConcurrentSkipListMap<Integer, String> sm = new ConcurrentSkipListMap<>();
sm.put(10, "a");
sm.put(3, "b");
System.out.println(sm.firstEntry()); // 最小 key 的条目
System.out.println(sm.subMap(3, true, 10, false)); // [3,10)

5.6 选择建议与对照

  • 列表读多写少、订阅/监听器:CopyOnWriteArrayList
  • 高吞吐非阻塞“邮箱/缓冲”:ConcurrentLinkedQueue/Deque
  • 生产者-消费者:ArrayBlockingQueue(有界、平滑延迟)、LinkedBlockingQueue(吞吐与弹性)、SynchronousQueue(直接移交)、LinkedTransferQueue(主动交付)
  • 并发字典:ConcurrentHashMap(无序、高吞吐)、ConcurrentSkipListMap(有序/范围查询)
  • 简单旧容器快速加固:Collections.synchronizedXxx(注意遍历外部加锁)

5.7 常见 FAQ/陷阱

  • 可以使用 null 吗?
    • ConcurrentHashMap、ConcurrentSkipListMap/Set、CopyOnWriteArrayList 都禁止 null。
  • 迭代是否抛 ConcurrentModificationException?
    • 并发集合大多为“弱一致迭代”,通常不抛 CME;CopyOnWrite 是“快照迭代”,也不抛 CME。
  • 可以边遍历边删除吗?
    • 并发 Map 的弱一致迭代允许并发修改,但语义是“尽力而为的近似视图”;若要确保逐项删除与遍历一致性,使用 Iterator.remove() 支持的集合或通过 removeIf 批处理,并测试语义是否符合。
  • 需要精确 size 吗?
    • 并发容器的 size() 常为近似或代价高;避免在热路径依赖 size 做控制逻辑,改用阈值/采样/指标系统。
  • compute/merge 中能做阻塞操作吗?
    • 不建议。映射函数尽量短小且无阻塞;否则可能拖垮整个热点 key 的进度。
// 正确的原子更新:避免 get-then-put 竞态
map.merge(key, 1, Integer::sum);
// 或者
map.compute(key, (k, oldV) -> oldV == null ? 1 : oldV + 1);

小结:

  • “先选容器后写代码”。读多写少/是否需要有序/是否需要阻塞/是否需要直接交付,是选择容器的关键维度。
  • 有“单 key 原子复合逻辑”需求时,优先使用 ConcurrentHashMap 的 compute/merge/putIfAbsent 等方法。
  • 需要范围查询与排序,使用 ConcurrentSkipListMap/Set。
  • 需要阻塞协调,选择合适的 BlockingQueue/TransferQueue 实现。

6. 线程通信

线程通信的目标是“协调进度、传递数据、传递信号”。正确的通信原语既要保证可见性/有序性,也要提供合适的阻塞/唤醒与取消语义。以下按抽象层次从低到高系统梳理并给出模式与示例。

6.1 基础语义与 happens-before

  • 可见性与排序
    • 进入/退出监视器(synchronized)分别具备“获取/发布”语义;Lock 的 lock/unlock 语义等价。
    • 阻塞队列、信号量、闩/栅栏等高层同步器在成功的入队/出队或许可获取/释放上也提供与监视器等价的内存语义。
  • 中断处理
    • 大多数阻塞方法提供“可中断”语义,抛 InterruptedException。收到中断后应尽快退出或调用 Thread.currentThread().interrupt() 恢复标志。
  • 常见 happens-before 结论
    • 同一监视器:释放锁 hb 随后获取锁。
    • BlockingQueue:put 成功 hb 相应 take 看到的数据。
    • CountDownLatch:计数减至 0 hb 所有 await 返回。
    • CyclicBarrier/Phaser:阶段推进对同阶段参与者形成屏障关系。
    • CompletableFuture:完成 complete/成功计算 hb 其依赖阶段开始。

6.2 Object.wait/notify/notifyAll

  • 依托对象监视器,wait 会释放监视器、进入等待队列;被唤醒后需重新竞争监视器。
  • 模式:必须使用“条件-循环”以防虚假唤醒与丢信号;复杂场景偏向 notifyAll 或拆分多个条件。
final Object lock = new Object();
boolean ready = false;

public void awaitReady() throws InterruptedException {
    synchronized (lock) {
        while (!ready) {              // while 防虚假唤醒
            lock.wait();              // 释放监视器并等待
        }
        // ... 条件满足后的逻辑
    }
}

public void signalReady() {
    synchronized (lock) {
        ready = true;
        lock.notifyAll();             // 多等待者/多条件场景优先 notifyAll
    }
}
  • 带超时等待:避免永久等待,配合时间预算循环重试。
public boolean awaitWithTimeout(long timeoutMs) throws InterruptedException {
    long deadline = System.nanoTime() + timeoutMs * 1_000_000L;
    synchronized (lock) {
        while (!ready) {
            long remaining = deadline - System.nanoTime();
            if (remaining <= 0L) return false;
            lock.wait(Math.max(1L, remaining / 1_000_000L));
        }
        return true;
    }
}
  • 不适合:复杂条件编排、多条件分离、需要中断/超时精细控制的场景(转用 Condition)。

6.3 LockSupport.park/unpark

  • 单个“许可(permit)”语义:unpark 可先发生,线程“持有许可”时下一次 park 立即返回;不累加多许可。
  • 无需持有锁即可使用,常用于构建更高层同步器或实现自定义阻塞策略。
import java.util.concurrent.locks.LockSupport;

class Parker {
    private Thread worker;
    public void start() {
        worker = new Thread(() -> {
            // 做一点工作...
            LockSupport.park();           // 等待许可
            // 被 unpark 后继续
        }, "worker");
        worker.start();
    }
    public void signal() {
        LockSupport.unpark(worker);       // 可先于 park 调用
    }
}
  • 注意:仅传递“继续执行”的信号,不附带条件检查与可见性保障,通常需与共享状态(volatile/锁)配合。

6.4 Lock + Condition(条件队列)

  • 一个 Lock 可派生多个 Condition,将不同等待理由分离,减少无关唤醒;支持 await/signal/signalAll 以及 awaitNanos/awaitUntil 的带时钟等待。
  • Bounded Buffer 示例:双条件避免“误唤醒跨类”
import java.util.concurrent.locks.*;
import java.util.ArrayDeque;
import java.util.Deque;

public class BoundedBuffer<T> {
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull  = lock.newCondition();
    private final Deque<T> deque = new ArrayDeque<>();
    private final int cap;

    public BoundedBuffer(int cap) { this.cap = cap; }

    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (deque.size() == cap) {
                notFull.await();            // 可中断等待
            }
            deque.addLast(x);
            notEmpty.signal();              // 只唤醒“取”的等待者
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                notEmpty.await();
            }
            T v = deque.removeFirst();
            notFull.signal();               // 只唤醒“放”的等待者
            return v;
        } finally {
            lock.unlock();
        }
    }
}
  • 何时 signal vs signalAll:单条件、等待者等价且不易“唤错”时可用 signal;多条件/复杂依赖优先 signalAll 或拆分 Condition。

6.5 阻塞队列族(BlockingQueue/TransferQueue)

  • 通过 put/take(或 offer/poll 带超时)实现安全交接,天然具备可见性保证与可中断阻塞;LinkedTransferQueue 支持“直接交付”(transfer)。
  • 哨兵(毒丸)模式优雅停机
import java.util.concurrent.*;

class PoisonPillDemo {
    private static final int PILL = Integer.MIN_VALUE;
    private final BlockingQueue<Integer> q = new LinkedBlockingQueue<>();

    public void start() {
        Thread consumer = new Thread(() -> {
            try {
                for (;;) {
                    int v = q.take();
                    if (v == PILL) break;        // 收到哨兵退出
                    // 处理 v
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        consumer.start();

        new Thread(() -> {
            try {
                // 生产若干数据...
                q.put(PILL);                     // 发送哨兵
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

6.6 CountDownLatch(一次性闩)

  • 主线程等待若干事件完成(计数到 0)再继续;一次性使用,不可重置。
import java.util.concurrent.*;

CountDownLatch ready = new CountDownLatch(3);
// 3 个并发初始化任务
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            // 初始化...
        } finally {
            ready.countDown();
        }
    }).start();
}
ready.await();   // 所有初始化完成后继续

6.7 CyclicBarrier 与 Phaser(可复用屏障/多阶段协作)

  • CyclicBarrier:固定参与者数量,每轮所有参与者都到达屏障时“同时”推进,可附加 barrierAction。
import java.util.concurrent.*;

CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("phase done"));
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            // 阶段 1
            barrier.await();
            // 阶段 2
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            Thread.currentThread().interrupt();
        }
    }).start();
}
  • Phaser:参与者可动态注册/撤销,适合变更规模的阶段性任务。
import java.util.concurrent.Phaser;

Phaser phaser = new Phaser(1); // 主线程注册
for (int i = 0; i < 3; i++) {
    phaser.register();
    new Thread(() -> {
        for (int phase = 0; phase < 2; phase++) {
            // 工作...
            phaser.arriveAndAwaitAdvance();
        }
        phaser.arriveAndDeregister();
    }).start();
}
phaser.arriveAndDeregister(); // 主线程参与完毕

6.8 Semaphore(信号量/限流/资源池)

  • 通过许可数限制并发度;可用于数据库连接等有限资源的访问或简单令牌桶限流。
import java.util.concurrent.*;

Semaphore sem = new Semaphore(10);
void doCall() throws InterruptedException {
    if (sem.tryAcquire(100, TimeUnit.MILLISECONDS)) {
        try {
            // 访问受限资源
        } finally {
            sem.release();
        }
    } else {
        // 超时降级
    }
}

6.9 Exchanger(配对交换)

  • 两个线程在交换点互换对象,适用于“双缓存”生产-消费等。
import java.util.concurrent.*;

Exchanger<int[]> ex = new Exchanger<>();
// producer / consumer 在同一轮 exchange 互换缓冲区

6.10 CompletableFuture(异步通信/编排)

  • 通过阶段式回调链路传递结果与信号;支持 anyOf/allOf、超时、取消与异常传播。
import java.util.concurrent.*;
import static java.util.concurrent.CompletableFuture.*;

CompletableFuture<String> f =
    supplyAsync(() -> "A")
    .thenCombine(supplyAsync(() -> "B"), (a, b) -> a + b)
    .orTimeout(500, TimeUnit.MILLISECONDS)      // 超时自动抛 TimeoutException
    .exceptionally(ex -> "fallback");
System.out.println(f.join());
  • 线程池选择:CPU 密集尽量使用专用池;避免在 thenApply 等同步阶段执行阻塞操作,必要时使用 thenApplyAsync/thenComposeAsync。

6.11 超时、取消与中断约定

  • 所有可阻塞调用尽量提供超时参数;不依赖默认的“永久阻塞”。
  • 捕获 InterruptedException 后要么抛出上层,要么恢复中断标志并终止当前逻辑。
  • 统一“停机协议”:利用哨兵、取消标志(volatile)、Future.cancel(true) 或线程池优雅关闭组合实现。

6.12 常见模式与选型建议

  • 生产者-消费者:BlockingQueue(有界);需要直接交付用 LinkedTransferQueue;要求背压。
  • 双阶段/多阶段流水线:CyclicBarrier/Phaser;或多段 BlockingQueue 串接。
  • 资源限流/并发控制:Semaphore;细化到“每资源一把信号量”。
  • 条件协调:Lock + 多个 Condition;简单场景可用 wait/notifyAll。
  • 异步请求/聚合:CompletableFuture anyOf/allOf;配合超时与降级。
  • 自定义阻塞:LockSupport.park/unpark + 明确的共享状态与内存语义。

6.13 常见陷阱

  • 在未持有监视器时调用 wait/notify;或在 if 而非 while 中 wait。
  • 使用 notify 唤醒错误等待者导致“丢信号”;复杂场景应拆分条件或用 notifyAll/多个 Condition。
  • 在 compute/merge 等关键映射函数内执行阻塞操作,造成串行化/死锁风险。
  • 忽略中断与超时,导致线程泄漏与不可恢复挂起。
  • 将近似 size/计数用于严谨控制逻辑;应以许可/队列容量/闩为准。

7. 线程安全问题与解决方案

常见问题:

  • 数据竞争: 多个线程同时修改共享变量。
  • 死锁: 两个或更多线程相互等待对方释放锁。
  • 活锁: 线程不断尝试但无法进展。
  • 线程饥饿: 某些线程无法获取CPU时间。

解决方案:

  • 使用同步机制(如synchronizedLock)。
  • 避免嵌套锁,减少死锁风险。
  • 使用线程池管理线程。
  • 优先使用高层次并发工具(如ConcurrentHashMapExecutorService)。

8. 高级主题

(1) Fork/Join框架

用于分治算法,适合递归任务分解。

import java.util.concurrent.*;

public class SumTask extends RecursiveTask<Long> {
    private final int[] array;
    private final int start, end;

    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= 100) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = start + (end - start) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            left.fork();
            return right.compute() + left.join();
        }
    }
}

(2) ThreadLocal

为每个线程提供独立的变量副本。

public class ThreadLocalExample {
    private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        Runnable task = () -> {
            threadLocal.set(threadLocal.get() + 1);
            System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
        };

        new Thread(task).start();
        new Thread(task).start();
    }
}

(3) CompletableFuture

支持异步编程,简化回调处理。

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " World")
                .thenAccept(System.out::println)
                .get();
    }
}

9. 最佳实践

  • 优先使用线程池: 避免手动创建大量线程。
  • 最小化锁范围: 减少同步代码块的粒度。
  • 避免共享状态: 尽量使用不可变对象或局部变量。
  • 正确处理中断: 捕获InterruptedException并恢复中断状态。
  • 测试并发代码: 使用工具如JMeter、JCStress测试线程安全性。

10. 常见问题与调试

  • 死锁排查: 使用jstack或IDE调试工具查看线程状态。
  • 性能问题: 使用VisualVMJProfiler分析线程瓶颈。
  • 内存泄漏: 检查ThreadLocal是否正确清理。

总结

Java的线程机制通过ThreadRunnableExecutor等提供了强大的并发支持。合理使用同步机制(如synchronizedLockvolatile)、线程池和并发工具,可以有效管理多线程程序的复杂性。高级工具如Fork/JoinCompletableFuture进一步提升了并发编程的灵活性。