Java Concurrency
Java的线程是Java并发编程的核心,允许程序同时执行多个任务。 [虚拟线程相关内容]
1. 线程概述
线程(Thread)是操作系统能够进行运算调度的最小单位。在Java中,线程是JVM(Java虚拟机)管理的轻量级执行单元,多个线程共享同一进程的内存空间(如堆、方法区),但每个线程有自己的栈空间(如局部变量和方法调用栈)。
Java通过java.lang.Thread
类和java.lang.Runnable
接口提供线程支持。Java的线程模型基于操作系统的原生线程,依赖于底层操作系统的线程实现。
2. 线程的创建
Java中创建线程主要有以下几种方式:
(1) 继承Thread
类
通过继承Thread
类并重写run()
方法来定义线程的执行逻辑。
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
- 特点: 简单,但不推荐,因为Java是单继承的,继承
Thread
会限制类的扩展性。
(2) 实现Runnable
接口
通过实现Runnable
接口并将其传递给Thread
对象。
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
- 优点: 更灵活,推荐使用,因为它允许类继承其他类,且便于资源共享。
(3) 使用Callable
和Future
Callable
接口允许线程返回结果或抛出异常,通常与ExecutorService
结合使用。
import java.util.concurrent.*;
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "Task completed by " + Thread.currentThread().getName();
}
}
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(new MyCallable());
System.out.println(future.get()); // 阻塞获取结果
executor.shutdown();
}
}
- 特点: 适合需要返回值或异常处理的场景,依赖线程池管理。
(4) 使用线程池(Executor Framework)
Java的java.util.concurrent
包提供了线程池机制,避免频繁创建和销毁线程的开销。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> System.out.println("Task in thread pool"));
executor.shutdown();
}
}
- 常用线程池:
Executors.newFixedThreadPool(int nThreads)
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
- 作用:固定大小的线程池,核心线程数=最大线程数=nThreads,任务进入无界队列(LinkedBlockingQueue) 排队。
- 参数:
- nThreads:工作线程的固定数量。过小吞吐不足,过大将增加上下文切换与内存占用。
- threadFactory:可选,自定义线程名称、是否为守护线程、异常处理器等,便于排障与治理。
- 返回类型:ExecutorService(实为 ThreadPoolExecutor)。
- 注意:使用无界队列,峰值流量可能导致任务无限堆积;生产建议改用自定义 ThreadPoolExecutor + 有界队列以形成背压。
Executors.newCachedThreadPool()
Executors.newCachedThreadPool(ThreadFactory threadFactory)
- 作用:弹性伸缩的线程池。核心线程数=0,最大线程数=Integer.MAX_VALUE,空闲线程默认 60s 回收,使用 SynchronousQueue 直接移交任务。
- 参数:
- threadFactory:可选,自定义线程的创建策略与命名等。
- 返回类型:ExecutorService(实为 ThreadPoolExecutor)。
- 注意:在突发流量下可能创建大量线程,存在资源枯竭风险;需配合上游限流/信号量,或优先使用自定义可控的 ThreadPoolExecutor。
Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor(ThreadFactory threadFactory)
- 作用:单线程串行执行任务,保证任务按提交顺序(FIFO)执行;若线程异常退出会创建新线程接替。
- 参数:
- threadFactory:可选,自定义线程特性。
- 返回类型:ExecutorService(实为 ThreadPoolExecutor,核心=最大=1,队列为无界 LinkedBlockingQueue)。
- 注意:无界队列可能堆积任务导致延迟放大;不适合高吞吐或长阻塞任务。
Executors.newScheduledThreadPool(int corePoolSize)
Executors.newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
- 作用:支持定时/周期性任务调度;内部为 ScheduledThreadPoolExecutor,使用基于时间的延迟队列。
- 参数:
- corePoolSize:核心线程数。用于并行触发定时/周期任务;核心线程默认不回收。
- threadFactory:可选,自定义调度线程特性;建议命名以区分业务线程。
- 返回类型:ScheduledExecutorService(实为 ScheduledThreadPoolExecutor)。
- 注意:定时任务应短小无阻塞;长任务应拆分或使用独立执行器,避免挡住调度线程。
4.1 为什么尽量自定义 ThreadPoolExecutor
- Executors 快捷工厂的隐患:
- newFixedThreadPool 使用无界队列(LinkedBlockingQueue),高峰期任务堆积可能 OOM。
- newCachedThreadPool 最大线程数 Integer.MAX_VALUE,极端情况下可能创建过多线程导致资源枯竭。
- newSingleThreadExecutor 单线程串行执行,后台堆积同样可能放大延迟。
- 建议:使用 ThreadPoolExecutor 显式配置核心参数与队列边界,结合拒绝策略形成背压。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class PooledExecutors {
// 命名线程工厂,便于排障
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger idx = new AtomicInteger(1);
NamedThreadFactory(String prefix) { this.prefix = prefix; }
@Override public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + idx.getAndIncrement());
t.setDaemon(false);
t.setUncaughtExceptionHandler((thr, ex) ->
System.err.println(thr.getName() + " uncaught: " + ex.getMessage()));
return t;
}
}
public static ExecutorService buildBoundedPool() {
int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
int max = cores * 2 + 1;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1024); // 有界队列形成背压
ThreadFactory tf = new NamedThreadFactory("biz-pool");
RejectedExecutionHandler reject = new ThreadPoolExecutor.CallerRunsPolicy(); // 调用方回退,天然限流
ThreadPoolExecutor pool = new ThreadPoolExecutor(
cores, max,
30L, TimeUnit.SECONDS,
queue, tf, reject);
pool.allowCoreThreadTimeOut(true); // 允许核心线程超时回收
return pool;
}
}
要点:
- 使用有界队列(ArrayBlockingQueue/LinkedBlockingQueue(带容量))以避免无限堆积。
- 选择合适拒绝策略:
- AbortPolicy(默认,抛异常);CallerRunsPolicy(调用方执行,背压);
- DiscardPolicy(丢弃);DiscardOldestPolicy(丢最老)。
- 根据任务类型设置线程上限与存活时间:
- IO 密集:线程数可高于 CPU 核心,但要观测上下文切换与资源句柄用量;
- CPU 密集:线程数≈核心数,避免过度切换。
4.1.1 ThreadPoolExecutor 构造参数全解
构造重载签名:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数说明与取舍:
- corePoolSize
- 基线线程数。小于该值时有新任务会优先创建新线程(即便队列未满)。
- allowCoreThreadTimeOut(true) 后,核心线程也会在空闲 keepAliveTime 后退出。
- maximumPoolSize
- 线程上限。队列无法接收时(如有界队列满或 SynchronousQueue 直接移交失败),且当前线程数小于该上限时会创建新线程执行。
- 若使用无界队列(默认 LinkedBlockingQueue 无容量),maximumPoolSize 实际无效,线程数基本固定在 corePoolSize。
- keepAliveTime + unit
- 非核心线程空闲存活时间;开启 allowCoreThreadTimeOut(true) 后也作用于核心线程。
- 过短会频繁创建/销毁,过长空闲线程占用内存与栈资源。
- workQueue
- 任务排队容器,决定伸缩与拒绝行为的关键:
- SynchronousQueue:零容量“直接移交”;更易扩容到 maximumPoolSize,适合短任务+高并发但需强限流。
- ArrayBlockingQueue(cap):有界数组;可结合 CallerRunsPolicy 形成自然背压,容量宜与突发窗口匹配。
- LinkedBlockingQueue([cap]):链表;不传 cap 时近似无界(不建议生产使用);传 cap 则为有界。
- PriorityBlockingQueue:按优先级出队;注意任务需实现 Comparable。
- 任务排队容器,决定伸缩与拒绝行为的关键:
- threadFactory
- 自定义线程名、优先级、守护属性与 UncaughtExceptionHandler;建议按“业务-用途-编号”命名,便于观测/排障。
- handler(RejectedExecutionHandler)
- 当“线程数已达 maximum 且队列无法再接收”或执行器已 shutdown 时触发:
- AbortPolicy:抛 RejectedExecutionException,显式失败(默认)。
- CallerRunsPolicy:在提交线程中执行任务,实现背压。
- DiscardPolicy:直接丢弃任务。
- DiscardOldestPolicy:丢弃队列头最老任务,再尝试入队当前任务。
- 可自定义告警/打点/降级逻辑。
- 当“线程数已达 maximum 且队列无法再接收”或执行器已 shutdown 时触发:
任务接纳流程(简化):
- 若运行线程数 < corePoolSize → 新建线程执行;
- 否则尝试入队 workQueue → 成功则排队;
- 入队失败且运行线程数 < maximumPoolSize → 新建线程执行;
- 否则触发拒绝策略 handler。
相关可选配置与方法:
- allowCoreThreadTimeOut(true):允许核心线程空闲超时退出。
- prestartAllCoreThreads()/prestartCoreThread():预热核心线程,降低首批任务延迟。
- setRejectedExecutionHandler(…) / setThreadFactory(…) / setKeepAliveTime(…):运行期调优。
- getPoolSize()/getActiveCount()/getQueue().size()/getLargestPoolSize()/getCompletedTaskCount():监控指标。
最佳实践摘要:
- 一律使用“有界队列 + 合理 maximumPoolSize + 拒绝策略”形成背压。
- 根据任务性质(CPU/IO/阻塞时长)压测得出合理 core/max/queue 容量与 keepAlive。
- 统一命名与异常处理,接入指标与日志,设定报警阈值与自恢复策略。
4.2 优雅关闭与资源回收
import java.util.concurrent.*;
public class ShutdownSample {
public static void gracefulShutdown(ExecutorService pool) {
pool.shutdown(); // 拒绝新任务,执行已入队任务
try {
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 中断正在执行的任务并清空队列
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
- 在容器/服务停止钩子中调用优雅关闭,避免任务“半途而废”。
4.3 监控与调优指标
- 关键指标:
getPoolSize()
、getActiveCount()
、getQueue().size()
、getLargestPoolSize()
、getCompletedTaskCount()
。 - 观察维度:任务延迟(排队时间)、执行时间分布、拒绝次数、上下文切换、CPU 占用、GC 压力。
- 建议接入 Micrometer/JMX 导出指标,结合告警阈值动态调参。
4.4 ScheduledExecutor 的定时语义
scheduleAtFixedRate(initialDelay, period, unit)
:基于固定频率,尽量按“预定时刻”补偿漂移;若任务执行超过 period,将串行按最小间隔继续。scheduleWithFixedDelay(initialDelay, delay, unit)
:基于固定延迟,从上次任务“完成时刻”开始计算下一次。
import java.util.concurrent.*;
ScheduledExecutorService ses = Executors.newScheduledThreadPool(2);
ses.scheduleAtFixedRate(() -> {/* metrics */}, 1, 1, TimeUnit.SECONDS);
ses.scheduleWithFixedDelay(() -> {/* health-check */}, 0, 5, TimeUnit.SECONDS);
- 定时任务应短小且无阻塞;长任务独立线程池或拆分。
4.5 虚拟线程(Java 21)
Executors.newVirtualThreadPerTaskExecutor()
适合大量“阻塞型”任务(如阻塞 IO),以较低成本挂起/恢复。- 注意事项:
- CPU 密集任务并不会因虚拟线程增多而更快;仍受 CPU 限制。
- 避免长期持有同步器/本地代码调用造成 carrier pinning。
- 仍需背压策略与并发上限(如信号量)防止外部系统被打爆。
import java.util.concurrent.*;
try (ExecutorService vexec = Executors.newVirtualThreadPerTaskExecutor()) {
vexec.submit(() -> {
// 阻塞 IO 示例:虚拟线程可廉价挂起
Thread.sleep(100);
return "ok";
});
}
4.6 常见选型与配置建议
- CPU 密集:自定义 ThreadPoolExecutor,线程数≈核心或略多;无界提交改为背压。
- IO 密集:线程数>核心,设置有界队列 + CallerRunsPolicy,必要时分级限流。
- 批处理/限时任务:设置合理 keepAlive + allowCoreThreadTimeOut,空闲时收缩线程。
- 定时任务:使用 ScheduledExecutorService,区分 fixedRate 与 fixedDelay。
- 高吞吐短任务:优先减少锁竞争与对象分配,尽量避免在线程池任务中进行阻塞 IO。
- 不要在映射器/回调中执行重操作或互相等待,防止线程池“自我阻塞”。
3. 线程的生命周期
Java线程有以下六种状态(定义在Thread.State
枚举中):
- NEW(新建): 线程对象创建但未调用
start()
。 - RUNNABLE(可运行): 线程已调用
start()
,可能正在运行或等待CPU调度。 - BLOCKED(阻塞): 线程等待获取锁(如
synchronized
块)。 - WAITING(等待): 线程通过
wait()
、join()
或LockSupport.park()
进入无限期等待。 - TIMED_WAITING(定时等待): 线程通过
sleep(long)
、wait(long)
或join(long)
进入有限期等待。 - TERMINATED(终止): 线程执行完成或异常终止。
生命周期图示:
NEW → RUNNABLE → (BLOCKED / WAITING / TIMED_WAITING) → TERMINATED
4. 线程的常用方法
以下是Thread
类和相关类的常用方法:
start()
: 启动线程,调用run()
方法。run()
: 线程的执行逻辑,需重写。sleep(long millis)
: 让线程休眠指定时间,不释放锁。yield()
: 让出CPU,线程仍为RUNNABLE状态。join()
: 等待线程执行完成。interrupt()
: 中断线程(设置中断标志,可能抛出InterruptedException
)。isAlive()
: 检查线程是否存活。setPriority(int)
: 设置线程优先级(1-10,默认为5)。setDaemon(boolean)
: 设置为守护线程(随主线程结束而结束)。
5. 线程同步
多线程访问共享资源可能导致数据不一致或线程安全问题,Java提供了多种同步机制。
(1) synchronized
关键字
用于方法或代码块,synchronized
基于对象监视器(Monitor)实现互斥与可见性保障。进入同步块(monitorenter)需要获取对象监视器;退出同步块(monitorexit)自动释放。异常退出也会释放锁。
Monitor 机制概览
- 对象与监视器
- 每个对象天生携带一个内部锁(对象监视器/Monitor),
synchronized
即对该监视器的获取与释放。 - 监视器与对象头中的 Mark Word 关联,JVM 根据竞争态将锁在不同形态间转换(无锁 → 轻量级自旋 → 重量级阻塞);JDK 15 起移除了偏向锁。
- 每个对象天生携带一个内部锁(对象监视器/Monitor),
- 指令与同步方式
- 同步代码块在字节码层面编译为 monitorenter/monitorexit 指令对。
- 同步方法不会显式生成 monitorenter/monitorexit,而是通过方法标志位(ACC_SYNCHRONIZED)由 JVM 在调用/返回时隐式进入/退出监视器。
- 等待与两个队列
- EntryList: 等待获取监视器的线程队列(竞争互斥)。
- WaitSet: 持有锁的线程调用 wait 后会释放监视器并挂起到 WaitSet;notify/notifyAll 仅将其迁移回 EntryList,真正恢复运行还需重新竞争监视器。
- 重入与计数
- 监视器是可重入的;同一线程可多次进入同一监视器,JVM 维护持有计数,退出时成对递减。
- 内存语义与可见性
- 退出监视器具备发布(release)效果;进入监视器具备获取(acquire)效果;形成 happens-before:先释放后获取的线程能看到临界区内的最新写入。
- 中断与阻塞语义
- 竞争进入监视器的阻塞不可被中断;处于 wait 的线程可被中断并抛出 InterruptedException。
- 实现与优化
- 采用自旋 + 阻塞的混合策略以降低用户态/内核态切换成本;JIT 可能进行锁粗化与锁消除优化。
- 使用建议
- 锁对象应为 private final,避免锁定 this、Class 对象或字符串常量。
- wait/notify/notifyAll 必须在持有同一监视器的 synchronized 块内调用,并使用 while 循环检查条件以规避虚假唤醒。
- 临界区尽量短小,避免其中执行 I/O、RPC 或长耗时计算。
1. 锁的粒度与形态
- 实例锁(对象监视器):
// 等价于 synchronized(this) { ... } public synchronized void method() { /* ... */ }
- 类锁(类对象监视器):
// 等价于 synchronized(ClassName.class) { ... } public static synchronized void staticMethod() { /* ... */ }
- 说明:
- 不同实例拥有各自的监视器,互不干扰;类锁在整个 JVM 中对该类唯一。
- 现代 HotSpot 使用轻量级自旋 + 持有者阻塞策略;JDK 15 起移除了“偏向锁”。
2. 可重入性与持有计数
- 同一线程可重复获取同一把锁(重入),JVM 维护持有计数,退出时成对减少。
public synchronized void outer() {
inner(); // 同一线程重入,不会死锁
}
public synchronized void inner() { /* ... */ }
3. Java 内存模型(JMM)中的 happens-before
- 对同一监视器:
- 线程 A 的“退出(monitorexit)”happens-before 线程 B 之后对同一监视器的“进入(monitorenter)”。
- 对
wait/notify
:- 对同一监视器的
notify/notifyAll
发生在被唤醒线程从wait
返回之前。
- 对同一监视器的
- 直接结论:
- 在同步块内写入的变化,对随后进入同一监视器的其他线程可见。
- 释放锁是“发布”(release)屏障;获取锁是“获取”(acquire)屏障。
4. 中断与阻塞语义
- 进入
synchronized
的等待不可被中断(无法通过Thread.interrupt()
打断进入监视器的竞争)。 wait()
可被中断,会抛出InterruptedException
;应及时恢复中断或按业务处理。
final Object lock = new Object();
public void awaitFlag() {
synchronized (lock) {
while (!condition) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断标志
return; // 或者按需处理
}
}
// 条件满足后的逻辑
}
}
5. 正确使用 wait/notify 的模式
- 永远用“条件-循环”模式防止“虚假唤醒”(spurious wakeup)与“丢信号”:
final Object lock = new Object();
boolean ready = false;
public void awaitReady() throws InterruptedException {
synchronized (lock) {
while (!ready) { // 使用 while 而非 if
lock.wait(); // 释放监视器并等待
}
// 继续执行
}
}
public void signalReady() {
synchronized (lock) {
ready = true;
lock.notifyAll(); // 多条件或多等待者场景推荐 notifyAll
}
}
- 关键要点:
wait/notify/notifyAll
必须在持有相同监视器的synchronized
块内调用。notify
只唤醒一个等待者,可能导致“唤错对象”;复杂场景优先notifyAll
,或使用多个监视器/Condition
分离条件。- 避免“先通知后等待”的丢信号问题: 用共享条件变量 + while 检查。
6. 锁对象的选择与封装
- 避免锁定外部可见对象(包括
this
、Class
、常量字符串),防止外部代码干扰:
private final Object lock = new Object(); // 私有、不可变
public void safeInc() {
synchronized (lock) {
// 临界区
}
}
// 反例: 不要锁定字符串常量或公共对象
// synchronized ("LOCK") { ... } // 字符串常量会被驻留,可能与他处冲突
// synchronized (SomeClass.class) { ... } // 类锁影响全局
- 锁对象应为
private final
,避免被替换或暴露。
7. 同步集合的迭代规约
Collections.synchronizedXxx
在遍历时仍需外部同步,否则遍历期间结构变化会不安全:
import java.util.*;
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
// 写入可以直接调用 list.add(...)
synchronized (list) { // 在同一监视器上外部加锁再遍历
for (Integer v : list) {
// 访问 v
}
}
8. 发布与可见性模式
- 通过以下方式进行“安全发布”(Safe Publication):
- 静态初始化或不可变对象
volatile
字段- 通过
synchronized
的发布与访问
- 用
synchronized
替代volatile
的示例(互斥 + 可见性):
private int flag = 0;
private final Object lock = new Object();
public void setFlag(int v) {
synchronized (lock) {
flag = v; // 写入对随后获取同一锁的线程可见
}
}
public int getFlag() {
synchronized (lock) {
return flag; // 读取看到最新
}
}
- 何时选
volatile
: 变量独立、无复合不变式、只需可见性与有序性、不需要原子复合操作。 - 何时选
synchronized
: 存在复合操作或不变式,需要互斥与可见性同时保证。
9. 性能与编译器优化
- 最小化临界区: 将
synchronized
作用范围缩到最小,避免包含 I/O、RPC、睡眠等。 - JVM 优化:
- 锁粗化(coarsening)与锁消除(elimination)可能改变实际加锁边界与开销。
- 轻量级自旋后再阻塞,减轻用户态/内核态切换。
- 在高并发吞吐需求下,优先选择更高层并发结构(
ConcurrentHashMap
、BlockingQueue
等)降低手工加锁复杂度。
10. 死锁与规约
- 多把锁按统一顺序获取,跨组件共享时制定顺序协议。
- 保持临界区短小,减少嵌套锁与锁竞争。
- 使用“超时 + 退避”策略需转向
Lock.tryLock(...)
;synchronized
不支持超时或可中断获取。
11. 方法锁与块锁的取舍
- 更推荐块级
synchronized
,仅包裹必要的共享状态操作,避免方法级锁导致的过度串行化。
(2) Lock 接口详解(ReentrantLock/ReadWrite/Stamped)
java.util.concurrent.locks.Lock
提供了比 synchronized
更丰富的能力: 可中断加锁、超时获取、公平性、条件队列、多种锁形态等。最常用实现是 ReentrantLock
。
- 基础模式(必须用 try/finally 释放):
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 阻塞直到获取到锁
try {
count++;
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}
}
- 非阻塞尝试获取(立即返回):
import java.util.concurrent.locks.ReentrantLock;
public class TryLockExample {
private final ReentrantLock lock = new ReentrantLock();
public void executeIfFree() {
if (lock.tryLock()) { // 不可用即刻返回 false
try {
// 执行业务(尽量短小)
} finally {
lock.unlock();
}
} else {
// 走降级/排队/丢弃策略
}
}
}
- 支持可中断与超时:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class InterruptibleAndTimedLock {
private final ReentrantLock lock = new ReentrantLock();
// 可中断获取: 响应线程中断,避免无限等待
public void doWorkInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
try {
// 执行逻辑
} finally {
lock.unlock();
}
}
// 超时获取: 在限定时间内未获取到则放弃
public boolean doWorkWithTimeout() throws InterruptedException {
if (lock.tryLock(200, TimeUnit.MILLISECONDS)) {
try {
// 执行逻辑
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
- 公平锁: 按等待时间顺序获取,避免饥饿(吞吐略降)
import java.util.concurrent.locks.ReentrantLock;
public class FairLockExample {
private final ReentrantLock fairLock = new ReentrantLock(true); // true = 公平
}
常用状态查询(仅用于监控/调试,勿据此做强逻辑判断):
isLocked()
: 当前是否被任何线程持有isHeldByCurrentThread()
: 当前线程是否持有getHoldCount()
: 重入次数hasQueuedThreads()
/getQueueLength()
: 是否有等待者/大致数量
条件队列 Condition(与 Object.wait/notify 等价但更灵活): 一个锁可创建多个条件队列,分离不同等待理由,减少无关唤醒。详见下文“Condition”小节。
死锁规避示例: 双锁获取用“超时 + 退避重试”
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadLocalRandom;
public class DeadlockAvoid {
private final Lock lockA = new ReentrantLock();
private final Lock lockB = new ReentrantLock();
public void doWithTwoLocks() throws InterruptedException {
while (true) {
boolean a = lockA.tryLock(50, TimeUnit.MILLISECONDS);
if (a) {
try {
boolean b = lockB.tryLock(50, TimeUnit.MILLISECONDS);
if (b) {
try {
// 临界区
return; // 完成退出
} finally {
lockB.unlock();
}
}
} finally {
lockA.unlock();
}
}
// 退避一小段随机时间,降低活锁概率
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 25));
}
}
}
- 与 synchronized 的差异简述:
- 互斥/可见性保证等价(进入/退出临界区的内存语义一致)
- Lock 支持公平性、可中断、超时、多个条件队列
- 需要手动释放,错误释放更易出错;因此必须使用 try/finally
ReentrantReadWriteLock(读写锁)
对读多写少场景提升吞吐。多读可并行,写独占。
import java.util.concurrent.locks.*;
public class RwCache<K, V> {
private final java.util.Map<K, V> map = new java.util.HashMap<>();
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private final Lock r = rw.readLock();
private final Lock w = rw.writeLock();
public V get(K key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public void put(K key, V value) {
w.lock();
try {
map.put(key, value);
} finally {
w.unlock();
}
}
// 降级: 持有写锁期间获取读锁,然后释放写锁,保持读锁
public V putAndRead(K key, V value) {
w.lock();
try {
map.put(key, value);
r.lock(); // 先获取读锁
} finally {
w.unlock(); // 再释放写锁(完成降级)
}
try {
return map.get(key);
} finally {
r.unlock();
}
}
}
注意:
- 不支持升级(读->写),否则容易死锁;需释放读锁再尝试写锁,或改造逻辑。
- 公平模式可避免长写被读吞噬,但吞吐降低。
StampedLock(乐观读/写锁)
相较读写锁,提供乐观读以进一步提升读性能;但不重入,且条件受限(不支持可重入、lockInterruptibly
,谨慎使用)。
import java.util.concurrent.locks.StampedLock;
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// 乐观读: 先无锁读取,再验证,无效则退化为悲观读
public double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead();
double cx = x, cy = y;
if (!sl.validate(stamp)) { // 有写入则验证失败
stamp = sl.readLock();
try {
cx = x; cy = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.hypot(cx, cy);
}
// 写锁: 独占
public void move(double dx, double dy) {
long stamp = sl.writeLock();
try {
x += dx; y += dy;
} finally {
sl.unlockWrite(stamp);
}
}
// 尝试读->写转换(避免释放-再申请的窗口期)
public void moveIfAt(double ox, double oy, double nx, double ny) {
long stamp = sl.readLock();
try {
while (x == ox && y == oy) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws; // 转换成功,现为写锁
x = nx; y = ny;
return;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock(); // 转换失败,显式获取写锁
}
}
} finally {
sl.unlock(stamp);
}
}
}
注意:
- 不可重入;严禁在同一线程中重复加同类锁,否则死锁。
- 默认不可中断,涉及中断语义的场景优先 ReentrantLock。
使用建议(Best Practices)
- 始终使用 try/finally 释放锁;在 finally 中仅做必要释放,避免抛异常。
- 允许中断就用
lockInterruptibly()
;需要超时就用tryLock(timeout, unit)
。 - 临界区尽量短小,不做阻塞 I/O、远程调用或耗时计算。
- 明确并记录多把锁的获取顺序;跨组件共享锁时使用统一顺序协议。
- 读多写少选
ReentrantReadWriteLock
;超高读频率且可接受限制时再考虑StampedLock
。 - 优先使用更高层并发容器(例如
ConcurrentHashMap
、BlockingQueue
),减少手写锁逻辑。 - 只在持有锁且
isHeldByCurrentThread()
为 true 时解锁,防止非法解锁。
(3) volatile关键字
确保变量的可见性,防止指令重排序,但不保证原子性。
public class Singleton {
private static volatile Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
(4) 原子类(AtomicXxx) 详解
原子类提供无锁(非阻塞)的原子性读改写能力,底层依赖硬件的 CAS(Compare-And-Swap) 指令与内存屏障。它们通常用于高性能计数器、状态位切换、引用替换等场景,能在高并发下减少线程上下文切换与锁竞争。
4.1 基本概念与 CAS
- CAS(Compare-And-Set): 在“期望值等于当前值”时,原子地将变量更新为新值,返回是否成功。
- 在高并发下,CAS 常配合自旋重试使用,避免线程阻塞/唤醒的开销。
- 典型代码(自旋 + CAS):
import java.util.concurrent.atomic.AtomicInteger;
public class CasAdder {
private final AtomicInteger value = new AtomicInteger(0);
public void add(int delta) {
int prev, next;
do {
prev = value.get();
next = prev + delta;
} while (!value.compareAndSet(prev, next)); // 失败则重试(自旋)
}
public int get() {
return value.get();
}
}
- 常见方法语义速览(以数值原子类为例):
- get(): 具备 volatile 读语义(获取/可见性)
- set(v): 具备 volatile 写语义(发布/可见性)
- lazySet(v): 更弱的发布语义(最终可见,低开销,JDK 9 以后等价 setRelease)
- compareAndSet(expect, update): CAS,成功时具备全栅栏效果
- weakCompareAndSet(…): 可能“伪失败”,需放入循环重试;在部分架构上更快
- getAndSet(v)/getAndAdd(x)/addAndGet(x)/incrementAndGet()/getAndIncrement():
读改写复合原子操作 - getAndUpdate(fn)/updateAndGet(fn)、getAndAccumulate(x, acc)/accumulateAndGet(x, acc):
使用函数式更新,避免手写自旋循环
注意: CAS 存在 ABA 问题,即值从 A→B→A 的往返无法被单纯 CAS 发现;需要“带戳”或“带标记”的引用类来规避。
4.2 常见原子类族与适用场景
- 基本数值/布尔
- AtomicBoolean, AtomicInteger, AtomicLong
- 适用于计数器、开关、序列号等单变量原子更新
import java.util.concurrent.atomic.AtomicLong;
public class SimpleCounter {
private final AtomicLong counter = new AtomicLong();
public long nextId() { // 自增并返回
return counter.incrementAndGet();
}
public void add(long delta) {
counter.addAndGet(delta);
}
public long current() {
return counter.get();
}
}
- 高并发计数/累加
- LongAdder/DoubleAdder: 通过分段-分箱(Striped64)技术降低热点竞争,在高争用下远优于 AtomicLong
- LongAccumulator/DoubleAccumulator: 自定义聚合函数的累加器
- 注意: sum()/sumThenReset() 不是线性化快照,适合统计/指标,不适合作为强一致计数
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
public class QpsMeter {
private final LongAdder adder = new LongAdder();
private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
public void startReport() {
ses.scheduleAtFixedRate(() -> {
long qps = adder.sumThenReset(); // 取值后清零
System.out.println("QPS: " + qps);
}, 1, 1, TimeUnit.SECONDS);
}
public void record() {
adder.increment();
}
}
- 引用与带版本/标记的引用
- AtomicReference
: 原子替换引用,常与不可变对象(Immutable)搭配,避免“半更新”状态
- AtomicReference
import java.util.concurrent.atomic.AtomicReference;
public class ConfigHolder {
public static final class Config {
public final int timeoutMs;
public final boolean enabled;
public Config(int timeoutMs, boolean enabled) {
this.timeoutMs = timeoutMs;
this.enabled = enabled;
}
public Config withTimeout(int t) { return new Config(t, this.enabled); }
public Config withEnabled(boolean e) { return new Config(this.timeoutMs, e); }
}
private final AtomicReference<Config> holder = new AtomicReference<>(new Config(3000, true));
public void updateTimeout(int t) {
holder.updateAndGet(cfg -> cfg.withTimeout(t));
}
public Config get() { return holder.get(); }
}
- 解决 ABA:
- AtomicStampedReference
: 附加整数“戳”(版本) - AtomicMarkableReference
: 附加布尔“标记”
- AtomicStampedReference
import java.util.concurrent.atomic.AtomicStampedReference;
public class AbaSafeRef {
private final AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);
public boolean tryUpdateTo101() {
int[] stamp = new int[1];
Integer cur = ref.get(stamp);
int newStamp = stamp[0] + 1;
return ref.compareAndSet(cur, 101, stamp[0], newStamp);
}
}
- 数组与字段更新器
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray:
对数组下标的原子操作
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray:
import java.util.concurrent.atomic.AtomicIntegerArray;
public class ShardedCounter {
private final AtomicIntegerArray buckets;
public ShardedCounter(int shards) {
this.buckets = new AtomicIntegerArray(shards);
}
public void add(int shard, int delta) {
buckets.addAndGet(shard, delta);
}
public int sum() {
int s = 0;
for (int i = 0; i < buckets.length(); i++) s += buckets.get(i);
return s;
}
}
- 字段更新器(反射驱动,较低层、不如直接原子字段高效):
- AtomicIntegerFieldUpdater/AtomicLongFieldUpdater/AtomicReferenceFieldUpdater
- 约束: 目标字段必须为非 final 的 volatile,具备恰当可见性(访问修饰符);过度使用会有反射与安全检查开销
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class StateBox {
volatile int state; // 必须 volatile、非 final
private static final AtomicIntegerFieldUpdater<StateBox> UPDATER =
AtomicIntegerFieldUpdater.newUpdater(StateBox.class, "state");
public boolean cas(int expect, int update) {
return UPDATER.compareAndSet(this, expect, update);
}
}
4.3 内存语义与可见性
- 原子类的 get/set 与 volatile 字段读写语义一致:
- get(): 获取/可见性(类似 acquire)
- set(): 发布/可见性(类似 release)
- lazySet(): 更弱的发布(最终一致,对随后线程“迟些”可见,吞吐更好)
- 读改写复合操作(getAndXxx/compareAndSet/updateAndGet 等)在成功路径上提供全栅栏,确保复合更新对其他线程的可见性与有序性。
- 尽量不要将“原子类 + 普通字段”混用形成跨变量不变式;否则需额外同步手段保证整体可见性与原子性。
4.4 常见范式与实战示例
- 原子最大值/最小值(使用函数式累加避免手写自旋):
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicMax {
private final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
public void record(int candidate) {
max.accumulateAndGet(candidate, Math::max);
}
public int getMax() {
return max.get();
}
}
无锁栈的 ABA 风险(简述):
- 若使用 AtomicReference
做栈顶 CAS,A→B→A 的往返会被误判为“未变”。 - 采用 AtomicStampedReference
在栈顶附加版本戳可规避。 - 实现复杂度高,生产中优先选择成熟队列/栈实现(如 JCTools/BlockingQueue)。
- 若使用 AtomicReference
高争用计数器选择:
- 单核/低争用: 选 AtomicLong
- 多核/高争用(热点更新): 选 LongAdder/LongAccumulator
- 需要精确线性化读快照: 选 AtomicLong(但读会阻塞写的一致性窗口)
4.5 何时不该使用原子类
- 跨多个变量/对象的不变式与事务性要求(例如“余额 + 库存”同时扣减),应使用锁、STM 或数据库事务。
- 涉及阻塞等待、条件协调的场景,用 Lock + Condition 或更高层的同步器(如 Semaphore、CountDownLatch、Phaser)。
- 大量对象字段用字段更新器的批量微优化不一定划算;优先考虑设计层面消除热点与共享。
4.6 性能与伪共享(False Sharing)
- 多线程更新相邻内存的不同变量会竞争同一缓存行,导致性能骤降。
- 规避手段:
- 使用 LongAdder 这类内部带填充/分箱的实现
- 自行分片(如分桶数组/多实例),避免多个线程聚焦同一内存热点
- JDK 提供 @Contended 注解(需 JVM 参数启用)可进行填充,但不建议在通用库外滥用
4.7 小结(原子类最佳实践)
- 单变量原子更新优先用数值原子类,复合逻辑用 updateAndGet/accumulateAndGet。
- 高争用计数优先 LongAdder;需要强一致读再退回 AtomicLong。
- 对象引用更新配合不可变数据结构使用引用原子类,避免“半状态”可见。
- 需要抗 ABA 时使用 Stamped/Markable 引用类。
- 避免将原子类当作“轻量锁”跨多变量使用;遇到不变式就引入锁或更高层抽象。
(5) 线程安全集合
在并发场景下优先选择内置的并发集合而非手写锁逻辑,可以显著降低复杂度与提升吞吐。Java 提供了三大类线程安全集合方案:
- 同步包装器(阻塞式):
Collections.synchronizedXxx(...)
- 无锁或细粒度锁的并发集合:
ConcurrentHashMap
、ConcurrentLinkedQueue/Deque
、ConcurrentSkipListMap/Set
- 阻塞/可转移队列族:
BlockingQueue
、TransferQueue
及其实现
下面对常用集合的语义、适用场景、常见陷阱与最佳实践做系统总结。
5.1 同步包装器 Collections.synchronizedXxx
- 用法:
Collections.synchronizedList(new ArrayList<>())
、Collections.synchronizedMap(new HashMap<>())
等。 - 特点:
- 对每次访问使用同一把内置锁,简单直接,但在高并发下吞吐较低。
- 迭代必须“外部再加锁”(见上文“同步集合的迭代规约”),否则遍历期间结构性变化可能不安全。
- 适合:并发度不高、迁移成本低、代码量少的旧代码快速加固。
- 不适合:高并发、读多写多或需要无阻塞迭代的场景。
import java.util.*;
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
list.add(1); // 写入线程安全
synchronized (list) { // 遍历前额外加锁
for (Integer v : list) {
// ...
}
}
5.2 Copy-On-Write 系列:CopyOnWriteArrayList/CopyOnWriteArraySet
- 写时复制:每次写操作都会复制底层数组,写代价高、读无需加锁且遍历为“快照”。
- 读多写少、元素数量较小的场景极佳,如“监听器列表/订阅者列表”、“白名单配置”等。
- 迭代器不抛 ConcurrentModificationException,且不反映迭代期间的后续写入。
- 限制与陷阱:
- 写入、批量操作成本高;不适合大集合或高写频的负载。
- 迭代器不支持 remove(),会抛 UnsupportedOperationException。
import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList<String> listeners = new CopyOnWriteArrayList<>();
listeners.add("A");
for (String l : listeners) { // 快照遍历,无需外部同步
// 回调通知
}
5.3 非阻塞队列:ConcurrentLinkedQueue/ConcurrentLinkedDeque
- 无界、无锁(基于 CAS)的多生产者多消费者队列/双端队列。
- 操作是非阻塞的(offer/poll 返回立即结果),适合高吞吐、低延迟的“邮箱/缓冲”。
- 迭代为“弱一致”(weakly consistent),不抛 CME,能容忍并发修改。
size()
为 O(n) 且仅近似,避免在热路径频繁调用。
import java.util.concurrent.ConcurrentLinkedQueue;
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.offer("a");
String v = q.poll(); // 可能为 null(队列空)
5.4 阻塞/转移队列族:BlockingQueue 与 TransferQueue
BlockingQueue 提供“阻塞 put/take 与超时 offer/poll”,非常适合生产者-消费者。常见实现:
- ArrayBlockingQueue:有界数组环形队列;内存本地性好,支持公平模式;
- LinkedBlockingQueue:链表结构,容量可选(默认近似无界);在高并发下吞吐稳定;
- SynchronousQueue:容量为 0 的“直接移交”队列;生产者与消费者必须配对(常用于 Cached 线程池);
- PriorityBlockingQueue:基于堆的优先队列(无界);按优先级出队;
- DelayQueue:基于时间窗口的队列,元素实现 Delayed;只在到期后可出队;
- LinkedTransferQueue(TransferQueue):支持 transfer 语义,有等待消费者时可直接“交付”。
关键语义:
- put/take:无超时的阻塞版本;offer/poll:立即/带超时版本;
- 方法可响应中断:捕获 InterruptedException 并及时恢复中断标志或退出。
import java.util.concurrent.*;
// 直接交付,若有等待消费者可零拷贝转移
LinkedTransferQueue<String> tq = new LinkedTransferQueue<>();
new Thread(() -> {
try {
tq.transfer("event"); // 等待直到有消费者接收
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
System.out.println("got: " + tq.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
5.5 并发 Map/Set:ConcurrentHashMap 与 SkipList 家族
ConcurrentHashMap(无序)
- 禁止 null key 与 null value。
- 迭代器为弱一致,不抛 CME;
size()
仅近似,JDK 8 提供mappingCount()
作为 long 近似计数。 - 原子复合操作(针对“单个 key”):
putIfAbsent
、compute
、computeIfAbsent/Present
、merge
、replace(key, oldVal, newVal)
等- 用于替代“get-then-put”的竞态模式,确保同一 key 上的复合逻辑原子执行。
- 批量与并行操作:
forEach(long threshold, ...)
、reduce*
、search*
;阈值决定是否并行,映射函数应避免阻塞或长耗时。 - 常见陷阱:
- “get 后判断再 put”不是原子操作;请使用
computeIfAbsent/merge
。 - 在映射函数中进行阻塞 I/O 或调用同一 Map 的结构性修改,可能造成性能与可维护性问题。
- “get 后判断再 put”不是原子操作;请使用
- 示例:并发构建“多值 Map”(Map<K, List
>)
import java.util.*;
import java.util.concurrent.*;
public class MultiMap<K, V> {
private final ConcurrentHashMap<K, List<V>> map = new ConcurrentHashMap<>();
public void add(K k, V v) {
map.computeIfAbsent(k, kk -> Collections.synchronizedList(new ArrayList<>()))
.add(v); // 列表内部仍需同步,或改用并发友好容器
}
public List<V> get(K k) {
return map.getOrDefault(k, List.of());
}
}
- 计数器模式:热点场景优先 LongAdder 减少争用
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class KeyedCounter<K> {
private final ConcurrentHashMap<K, LongAdder> counters = new ConcurrentHashMap<>();
public void incr(K k) {
counters.computeIfAbsent(k, kk -> new LongAdder()).increment();
}
public long get(K k) {
LongAdder adder = counters.get(k);
return adder == null ? 0L : adder.sum();
}
}
ConcurrentSkipListMap / ConcurrentSkipListSet(有序/可导航)
- 基于跳表的并发有序 Map/Set,提供
NavigableMap/NavigableSet
能力:subMap/headMap/tailMap
、ceiling/floor/higher/lower
等。 - 读取与范围视图在读多写少且需要排序/范围查询时优势明显;代价是相对
ConcurrentHashMap
写入更慢、内存开销更高。
import java.util.concurrent.*;
ConcurrentSkipListMap<Integer, String> sm = new ConcurrentSkipListMap<>();
sm.put(10, "a");
sm.put(3, "b");
System.out.println(sm.firstEntry()); // 最小 key 的条目
System.out.println(sm.subMap(3, true, 10, false)); // [3,10)
5.6 选择建议与对照
- 列表读多写少、订阅/监听器:CopyOnWriteArrayList
- 高吞吐非阻塞“邮箱/缓冲”:ConcurrentLinkedQueue/Deque
- 生产者-消费者:ArrayBlockingQueue(有界、平滑延迟)、LinkedBlockingQueue(吞吐与弹性)、SynchronousQueue(直接移交)、LinkedTransferQueue(主动交付)
- 并发字典:ConcurrentHashMap(无序、高吞吐)、ConcurrentSkipListMap(有序/范围查询)
- 简单旧容器快速加固:Collections.synchronizedXxx(注意遍历外部加锁)
5.7 常见 FAQ/陷阱
- 可以使用 null 吗?
- ConcurrentHashMap、ConcurrentSkipListMap/Set、CopyOnWriteArrayList 都禁止 null。
- 迭代是否抛 ConcurrentModificationException?
- 并发集合大多为“弱一致迭代”,通常不抛 CME;CopyOnWrite 是“快照迭代”,也不抛 CME。
- 可以边遍历边删除吗?
- 并发 Map 的弱一致迭代允许并发修改,但语义是“尽力而为的近似视图”;若要确保逐项删除与遍历一致性,使用
Iterator.remove()
支持的集合或通过removeIf
批处理,并测试语义是否符合。
- 并发 Map 的弱一致迭代允许并发修改,但语义是“尽力而为的近似视图”;若要确保逐项删除与遍历一致性,使用
- 需要精确 size 吗?
- 并发容器的
size()
常为近似或代价高;避免在热路径依赖 size 做控制逻辑,改用阈值/采样/指标系统。
- 并发容器的
- compute/merge 中能做阻塞操作吗?
- 不建议。映射函数尽量短小且无阻塞;否则可能拖垮整个热点 key 的进度。
// 正确的原子更新:避免 get-then-put 竞态
map.merge(key, 1, Integer::sum);
// 或者
map.compute(key, (k, oldV) -> oldV == null ? 1 : oldV + 1);
小结:
- “先选容器后写代码”。读多写少/是否需要有序/是否需要阻塞/是否需要直接交付,是选择容器的关键维度。
- 有“单 key 原子复合逻辑”需求时,优先使用 ConcurrentHashMap 的 compute/merge/putIfAbsent 等方法。
- 需要范围查询与排序,使用 ConcurrentSkipListMap/Set。
- 需要阻塞协调,选择合适的 BlockingQueue/TransferQueue 实现。
6. 线程通信
线程通信的目标是“协调进度、传递数据、传递信号”。正确的通信原语既要保证可见性/有序性,也要提供合适的阻塞/唤醒与取消语义。以下按抽象层次从低到高系统梳理并给出模式与示例。
6.1 基础语义与 happens-before
- 可见性与排序
- 进入/退出监视器(synchronized)分别具备“获取/发布”语义;Lock 的 lock/unlock 语义等价。
- 阻塞队列、信号量、闩/栅栏等高层同步器在成功的入队/出队或许可获取/释放上也提供与监视器等价的内存语义。
- 中断处理
- 大多数阻塞方法提供“可中断”语义,抛 InterruptedException。收到中断后应尽快退出或调用 Thread.currentThread().interrupt() 恢复标志。
- 常见 happens-before 结论
- 同一监视器:释放锁 hb 随后获取锁。
- BlockingQueue:put 成功 hb 相应 take 看到的数据。
- CountDownLatch:计数减至 0 hb 所有 await 返回。
- CyclicBarrier/Phaser:阶段推进对同阶段参与者形成屏障关系。
- CompletableFuture:完成 complete/成功计算 hb 其依赖阶段开始。
6.2 Object.wait/notify/notifyAll
- 依托对象监视器,wait 会释放监视器、进入等待队列;被唤醒后需重新竞争监视器。
- 模式:必须使用“条件-循环”以防虚假唤醒与丢信号;复杂场景偏向 notifyAll 或拆分多个条件。
final Object lock = new Object();
boolean ready = false;
public void awaitReady() throws InterruptedException {
synchronized (lock) {
while (!ready) { // while 防虚假唤醒
lock.wait(); // 释放监视器并等待
}
// ... 条件满足后的逻辑
}
}
public void signalReady() {
synchronized (lock) {
ready = true;
lock.notifyAll(); // 多等待者/多条件场景优先 notifyAll
}
}
- 带超时等待:避免永久等待,配合时间预算循环重试。
public boolean awaitWithTimeout(long timeoutMs) throws InterruptedException {
long deadline = System.nanoTime() + timeoutMs * 1_000_000L;
synchronized (lock) {
while (!ready) {
long remaining = deadline - System.nanoTime();
if (remaining <= 0L) return false;
lock.wait(Math.max(1L, remaining / 1_000_000L));
}
return true;
}
}
- 不适合:复杂条件编排、多条件分离、需要中断/超时精细控制的场景(转用 Condition)。
6.3 LockSupport.park/unpark
- 单个“许可(permit)”语义:unpark 可先发生,线程“持有许可”时下一次 park 立即返回;不累加多许可。
- 无需持有锁即可使用,常用于构建更高层同步器或实现自定义阻塞策略。
import java.util.concurrent.locks.LockSupport;
class Parker {
private Thread worker;
public void start() {
worker = new Thread(() -> {
// 做一点工作...
LockSupport.park(); // 等待许可
// 被 unpark 后继续
}, "worker");
worker.start();
}
public void signal() {
LockSupport.unpark(worker); // 可先于 park 调用
}
}
- 注意:仅传递“继续执行”的信号,不附带条件检查与可见性保障,通常需与共享状态(volatile/锁)配合。
6.4 Lock + Condition(条件队列)
- 一个 Lock 可派生多个 Condition,将不同等待理由分离,减少无关唤醒;支持 await/signal/signalAll 以及 awaitNanos/awaitUntil 的带时钟等待。
- Bounded Buffer 示例:双条件避免“误唤醒跨类”
import java.util.concurrent.locks.*;
import java.util.ArrayDeque;
import java.util.Deque;
public class BoundedBuffer<T> {
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private final Deque<T> deque = new ArrayDeque<>();
private final int cap;
public BoundedBuffer(int cap) { this.cap = cap; }
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (deque.size() == cap) {
notFull.await(); // 可中断等待
}
deque.addLast(x);
notEmpty.signal(); // 只唤醒“取”的等待者
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (deque.isEmpty()) {
notEmpty.await();
}
T v = deque.removeFirst();
notFull.signal(); // 只唤醒“放”的等待者
return v;
} finally {
lock.unlock();
}
}
}
- 何时 signal vs signalAll:单条件、等待者等价且不易“唤错”时可用 signal;多条件/复杂依赖优先 signalAll 或拆分 Condition。
6.5 阻塞队列族(BlockingQueue/TransferQueue)
- 通过 put/take(或 offer/poll 带超时)实现安全交接,天然具备可见性保证与可中断阻塞;LinkedTransferQueue 支持“直接交付”(transfer)。
- 哨兵(毒丸)模式优雅停机
import java.util.concurrent.*;
class PoisonPillDemo {
private static final int PILL = Integer.MIN_VALUE;
private final BlockingQueue<Integer> q = new LinkedBlockingQueue<>();
public void start() {
Thread consumer = new Thread(() -> {
try {
for (;;) {
int v = q.take();
if (v == PILL) break; // 收到哨兵退出
// 处理 v
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
consumer.start();
new Thread(() -> {
try {
// 生产若干数据...
q.put(PILL); // 发送哨兵
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
6.6 CountDownLatch(一次性闩)
- 主线程等待若干事件完成(计数到 0)再继续;一次性使用,不可重置。
import java.util.concurrent.*;
CountDownLatch ready = new CountDownLatch(3);
// 3 个并发初始化任务
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 初始化...
} finally {
ready.countDown();
}
}).start();
}
ready.await(); // 所有初始化完成后继续
6.7 CyclicBarrier 与 Phaser(可复用屏障/多阶段协作)
- CyclicBarrier:固定参与者数量,每轮所有参与者都到达屏障时“同时”推进,可附加 barrierAction。
import java.util.concurrent.*;
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("phase done"));
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 阶段 1
barrier.await();
// 阶段 2
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
- Phaser:参与者可动态注册/撤销,适合变更规模的阶段性任务。
import java.util.concurrent.Phaser;
Phaser phaser = new Phaser(1); // 主线程注册
for (int i = 0; i < 3; i++) {
phaser.register();
new Thread(() -> {
for (int phase = 0; phase < 2; phase++) {
// 工作...
phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}).start();
}
phaser.arriveAndDeregister(); // 主线程参与完毕
6.8 Semaphore(信号量/限流/资源池)
- 通过许可数限制并发度;可用于数据库连接等有限资源的访问或简单令牌桶限流。
import java.util.concurrent.*;
Semaphore sem = new Semaphore(10);
void doCall() throws InterruptedException {
if (sem.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
// 访问受限资源
} finally {
sem.release();
}
} else {
// 超时降级
}
}
6.9 Exchanger(配对交换)
- 两个线程在交换点互换对象,适用于“双缓存”生产-消费等。
import java.util.concurrent.*;
Exchanger<int[]> ex = new Exchanger<>();
// producer / consumer 在同一轮 exchange 互换缓冲区
6.10 CompletableFuture(异步通信/编排)
- 通过阶段式回调链路传递结果与信号;支持 anyOf/allOf、超时、取消与异常传播。
import java.util.concurrent.*;
import static java.util.concurrent.CompletableFuture.*;
CompletableFuture<String> f =
supplyAsync(() -> "A")
.thenCombine(supplyAsync(() -> "B"), (a, b) -> a + b)
.orTimeout(500, TimeUnit.MILLISECONDS) // 超时自动抛 TimeoutException
.exceptionally(ex -> "fallback");
System.out.println(f.join());
- 线程池选择:CPU 密集尽量使用专用池;避免在 thenApply 等同步阶段执行阻塞操作,必要时使用 thenApplyAsync/thenComposeAsync。
6.11 超时、取消与中断约定
- 所有可阻塞调用尽量提供超时参数;不依赖默认的“永久阻塞”。
- 捕获 InterruptedException 后要么抛出上层,要么恢复中断标志并终止当前逻辑。
- 统一“停机协议”:利用哨兵、取消标志(volatile)、Future.cancel(true) 或线程池优雅关闭组合实现。
6.12 常见模式与选型建议
- 生产者-消费者:BlockingQueue(有界);需要直接交付用 LinkedTransferQueue;要求背压。
- 双阶段/多阶段流水线:CyclicBarrier/Phaser;或多段 BlockingQueue 串接。
- 资源限流/并发控制:Semaphore;细化到“每资源一把信号量”。
- 条件协调:Lock + 多个 Condition;简单场景可用 wait/notifyAll。
- 异步请求/聚合:CompletableFuture anyOf/allOf;配合超时与降级。
- 自定义阻塞:LockSupport.park/unpark + 明确的共享状态与内存语义。
6.13 常见陷阱
- 在未持有监视器时调用 wait/notify;或在 if 而非 while 中 wait。
- 使用 notify 唤醒错误等待者导致“丢信号”;复杂场景应拆分条件或用 notifyAll/多个 Condition。
- 在 compute/merge 等关键映射函数内执行阻塞操作,造成串行化/死锁风险。
- 忽略中断与超时,导致线程泄漏与不可恢复挂起。
- 将近似 size/计数用于严谨控制逻辑;应以许可/队列容量/闩为准。
7. 线程安全问题与解决方案
常见问题:
- 数据竞争: 多个线程同时修改共享变量。
- 死锁: 两个或更多线程相互等待对方释放锁。
- 活锁: 线程不断尝试但无法进展。
- 线程饥饿: 某些线程无法获取CPU时间。
解决方案:
- 使用同步机制(如
synchronized
、Lock
)。 - 避免嵌套锁,减少死锁风险。
- 使用线程池管理线程。
- 优先使用高层次并发工具(如
ConcurrentHashMap
、ExecutorService
)。
8. 高级主题
(1) Fork/Join框架
用于分治算法,适合递归任务分解。
import java.util.concurrent.*;
public class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= 100) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = start + (end - start) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork();
return right.compute() + left.join();
}
}
}
(2) ThreadLocal
为每个线程提供独立的变量副本。
public class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
Runnable task = () -> {
threadLocal.set(threadLocal.get() + 1);
System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
};
new Thread(task).start();
new Thread(task).start();
}
}
(3) CompletableFuture
支持异步编程,简化回调处理。
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenAccept(System.out::println)
.get();
}
}
9. 最佳实践
- 优先使用线程池: 避免手动创建大量线程。
- 最小化锁范围: 减少同步代码块的粒度。
- 避免共享状态: 尽量使用不可变对象或局部变量。
- 正确处理中断: 捕获
InterruptedException
并恢复中断状态。 - 测试并发代码: 使用工具如JMeter、JCStress测试线程安全性。
10. 常见问题与调试
- 死锁排查: 使用
jstack
或IDE调试工具查看线程状态。 - 性能问题: 使用
VisualVM
或JProfiler
分析线程瓶颈。 - 内存泄漏: 检查
ThreadLocal
是否正确清理。
总结
Java的线程机制通过Thread
、Runnable
、Executor
等提供了强大的并发支持。合理使用同步机制(如synchronized
、Lock
、volatile
)、线程池和并发工具,可以有效管理多线程程序的复杂性。高级工具如Fork/Join
、CompletableFuture
进一步提升了并发编程的灵活性。