当前位置: 首页 > news >正文

Java并发探索--上篇

Java并发探索–上篇

1.基本概念

  • 线程与进程:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。
  • 并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
  • 同步与异步:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。

“Java并发探索–下篇” — 在下面找

【博客园】

https://www.cnblogs.com/jackjavacpp

【CSDN】

https://blog.csdn.net/okok__TXF

2.探索线程的创建

①线程的状态

Thread源码里面看出

public enum State {// 尚未启动的线程的线程状态。NEW,// 就绪RUNNABLE,// 等待监视器锁的线程的线程状态BLOCKED,/*等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:Object.wait() 没有超时Thread.join() 没有超时LockSupport.park()*/WAITING,/*指定等待时间的等待线程的线程状态线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:Thread.sleepObject.wait with timeoutThread.join with timeoutLockSupport.parkNanosLockSupport.parkUntil*/TIMED_WAITING,//终止线程的线程状态。线程已完成执行。TERMINATED;
}

下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】

在这里插入图片描述

在Java中,一个Thread有大致六个状态。

线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(就绪) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。

明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。

②线程创建

1)两种基本方式
  • 继承Thread类,重写run方法
public class MyThread1 extends Thread {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": hello world");}
}
public class JUCMain {public static void main(String[] args) {new MyThread1().start();}
}
  • 实现Runnable接口,传入Thread
public class Runnable1 implements Runnable{@Overridepublic void run() {System.out.println("hello world, Runnable");}
}
public class JUCMain {public static void main(String[] args) {new Thread(new Runnable1()).start();}
}

网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。

首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。

// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {// 零状态值对应于状态 “NEW”// 线程想要start,必须是为0的状态if (threadStatus != 0)throw new IllegalThreadStateException();/*group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,同时线程组的未启动线程计数会减1。*/group.add(this);boolean started = false;try {start0(); //关键!调用本地方法(native)started = true;} finally {try {if (!started) { //启动失败时回滚//如果 started 为 false,说明线程启动失败,//调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。group.threadStartFailed(this);}} catch (Throwable ignore) {/* do nothing. If start0 threw a Throwable thenit will be passed up the call stack */}}
}
//========== native
private native void start0();

那么执行的是run()方法,run方法里面是啥呢

private Runnable target; // target是Runnable类型@Override
public void run() {if (target != null) {target.run();}
}

如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。

如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。

2) 其他创建方式
.lambda
  • lambda表达式创建:这个仅仅是写法不同而已。因为Runnable是个函数式接口
@FunctionalInterface
public interface Runnable {public abstract void run();
}
.callable
  • Callable创建的方式
public class MyCall implements Callable<String> {@Overridepublic String call() throws Exception {Thread.sleep(2000);return "Hello Callable";}
}public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> task = new FutureTask<>(new MyCall());new Thread(task).start();System.out.println(task.get());
}

new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.

在这里插入图片描述

从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。

Future<V>

Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。

// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {//尝试取消异步任务的执行。/*如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;如果任务成功取消,则返回 true。*/boolean cancel(boolean mayInterruptIfRunning);//如果任务在完成之前被取消,则返回 true;否则返回 false。boolean isCancelled();//如果任务已经完成,则返回 true;否则返回 false。boolean isDone();//获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。V get() throws InterruptedException, ExecutionException;//获取异步任务的计算结果,并且可以指定一个超时时间。//如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {// 很简单嘛,这个是来自Runnable的void run();
}

这个接口就相当于组合了Runnable和Future,能够获取到返回值了。

FutureTask<V> 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。

public class FutureTask<V> implements RunnableFuture<V> {// 基本属性private volatile int state;private static final int NEW          = 0;private static final int COMPLETING   = 1;private static final int NORMAL       = 2;private static final int EXCEPTIONAL  = 3;private static final int CANCELLED    = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED  = 6;/** The underlying callable; nulled out after running */private Callable<V> callable;/** 结果 */private Object outcome; /** The thread running the callable; CAS ed during run() */private volatile Thread runner;/** Treiber stack of waiting threads */private volatile WaitNode waiters;// 看它的构造函数1public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable; // 赋值callable========this.state = NEW; // ensure visibility of callable}// 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}/*Executors::callable(xx, xx)方法==========public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run(); // 调用Runnable的run()return result;}}*/// run()方法 --------------- // new Thread(new FutureTask<>(new MyCall()))public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//====调用callable.call()result = c.call(); ran = true;} catch (Throwable ex) {.........}// 如果运行OK了,设置结果!if (ran) set(result);}} finally {.............}}// 设置结果outcomeprotected void set(V v) {// https://www.cnblogs.com/jackjavacpp/p/18787832// 使用CAS --- 【见上一篇文章 java map & CAS & AQS】if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v; // 这里UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}// 比较核心的get方法================startpublic V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING) // 如果状态不是完成s = awaitDone(false, 0L); // 等待完成return report(s); // 返回结果}private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 1.计算超时截止时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) { // 2.自旋循环等待任务完成// 2.1如果该线程中断了if (Thread.interrupted()) {removeWaiter(q);// 从等待队列中移除当前节点throw new InterruptedException();}// 2.2检查状态int s = state;// 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)if (s > COMPLETING) {if (q != null)q.thread = null;return s;// 返回最终状态}// 2.3若任务状态等于 COMPLETING,表明任务正在完成,// 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued) //将节点加入等待队列queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) { // 2.4如果是有时限的get()nanos = deadline - System.nanoTime();if (nanos <= 0L) { removeWaiter(q);return state; // 返回状态}LockSupport.parkNanos(this, nanos);}else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。LockSupport.park(this);}}private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x; // 返回outcome......}//==================================end
}

awaitDone 方法的核心功能是让当前线程等待异步任务完成,它会持续检查任务的状态,根据不同的状态采取相应的处理措施,同时支持设置超时时间。在等待过程中,若线程被中断,会抛出 InterruptedException 异常。

通过上面的分析,Callable这种方式实际上本质还是Runnable嚯。使用FutureTask将Future和Runnable结合起来,功能更加丰富。

.线程池ThreadPoolExecutor
  • 线程池创建线程

如下使用方式。

public class PoolMain {public static void main(String[] args) {// 创建一个线程池ExecutorService pool = Executors.newFixedThreadPool(1);long start = System.currentTimeMillis();// execute=============pool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("execute pool创建启动线程!");});// submit==============Future<Integer> future = pool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("submit pool创建启动线程!");return 100;});try {System.out.println(future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));pool.shutdown();}
}

从上面的例子可以看出,大致有ExecutorServiceExecutors, newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。

接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,Executors只是一个工具类。

在这里插入图片描述

Executor是顶级接口

public interface Executor {// 只定义了一个方法void execute(Runnable command);
}

ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法

public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);//....<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
}

AbstractExecutorServiceExecutorService执行方法的默认实现,发现下面的submit()底层实际执行的是execute(ftask)方法【Executor接口的execute()方法,在这个抽象类里面没有具体实现,到具体子类ThreadPoolExecutor在可以看到】

public abstract class AbstractExecutorService implements ExecutorService {// 这里重点只看一下submit方法的默认实现// 优点1: 可以有Future返回值public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}// 优点2: 支持Callable参数public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
}

ThreadPoolExecutor:线程池,可以通过调用Executors静态工厂方法来创建线程池并返回一个ExecutorService对象

public class ThreadPoolExecutor extends AbstractExecutorService {/*** 七大参数!!!!======*/public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量int maximumPoolSize,//线程池的最大线程数long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间TimeUnit unit,//时间单位BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,定制策略来处理任务) {if (corePoolSize < 0 || maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize || keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
}

回到这一小节最开始的时候,例子中的线程池有两种提交并运行线程的方式executesubmit两个方法。现在来看一下ThreadPoolExecutor中的execute()方法是什么样子的。submit()我们已经知道了是在AbstractExecutorService中有默认实现的。

// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1.若当前工作线程数小于核心线程数(corePoolSize),尝试创建新的核心线程// 这里是用的位运算的,我没有深究if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // return;// 创建新线程失败,重新获取ctlc = ctl.get();}// 2.任务入队// 线程池处于运行状态(isRunning(c))// 且任务成功加入阻塞队列(workQueue.offer(command))if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 2.2 双重检查/*2.2.1再次检查线程池状态(可能在此期间线程池被关闭)。2.2.2若线程池已关闭,尝试从队列中移除任务,移除成功则拒绝任务(reject(command))。2.2.3若线程池仍运行但无活跃线程(workerCountOf(recheck) == 0),添加一个非核心线程(addWorker(null, false)),该线程会从队列中拉取任务执行。*/if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3.若任务无法入队(队列已满),尝试创建非核心线程(addWorker(command, false))else if (!addWorker(command, false))//若创建失败(线程数已达maximumPoolSize或线程池已关闭),//执行拒绝策略(reject(command))reject(command);}
/*
execute(command)
│
├─ 检查command非空
│
├─ 当前线程数 < corePoolSize?
│   ├─ 是 → 创建核心线程 → 成功则返回
│   └─ 否 → 进入下一步
│
├─ 线程池是否RUNNING且任务入队成功?
│   ├─ 是 → 双重检查状态
│   │   ├─ 线程池已关闭 → 移除任务并拒绝
│   │   └─ 线程池仍运行且无活跃线程 → 创建非核心线程
│   │
│   └─ 否 → 尝试创建非核心线程
│       ├─ 成功 → 处理任务
│       └─ 失败 → 拒绝任务
*/为什么需要二次检查线程池状态?- 任务入队后,其他线程可能关闭了线程池(如调用shutdown())。
- 处理:- 若线程池已关闭,需移除已入队任务并拒绝。- 若线程池仍运行但无活跃线程(如核心线程数为0且任务在队列中),需创建新线程处理队列任务。

场景1:核心线程数未满

  • 线程池处于 RUNNING,当前线程数 2(corePoolSize=5)。
  • addWorker(task, true) 成功创建核心线程并执行任务。

场景2:队列已满,创建临时线程

  • 核心线程满载,队列已满,线程数未达 maximumPoolSize
  • addWorker(task, false) 创建临时线程处理任务。

场景3:SHUTDOWN 状态处理剩余任务

  • 线程池调用 shutdown(),状态变为 SHUTDOWN
  • 已有任务在队列中,addWorker(null, true) 创建线程处理队列任务。

ThreadPoolExecutor设计思想:

  • 核心线程优先:优先使用核心线程处理任务。
  • 队列缓冲:核心线程满载后,任务入队等待。
  • 非核心线程应急:队列满后,创建临时线程处理任务(不超过maximumPoolSize

addWorker 创建工作线程

addWorker 方法通过精细的状态检查和并发控制,确保线程池在动态扩缩容时保持线程安全。【方便理解,可以把这个方法看作是创建线程】其核心在于:

  • 双重循环:外层处理状态变化,内层处理线程数修改。
  • 锁与原子操作结合:保证 workers 集合和 workerCount 的一致性。
  • 异常回滚机制:确保资源不会泄漏(如线程数虚增或 Worker 未清理)。

firstTask为我们最开始写的Runnable。【记一个代号叫做 “我的任务” 】

// ======== firstTask
pool.execute(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
System.out.println("execute pool创建启动线程!");
});
// addWorker(runnable, core)
//标记是否以核心线程数(corePoolSize)为上限创建线程。
//若为 false,则使用最大线程数(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {........int rs = runStateOf(c);// 获取线程池状态// 检查是否允许添加Workerif (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty()))return false;for (;;) {.........//CAS 增加线程数if (compareAndIncrementWorkerCount(c))break retry;// 成功增加,跳出循环c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;}}...........try {// 把“我的任务”传进去了w = new Worker(firstTask); // 创建的一个Workerfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 加锁保护 workers 集合mainLock.lock();try {// 再次检查状态(防止在加锁前状态变化)int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())//======抛出异常....workers.add(w); // 将 Worker 加入集合int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 【Worker类里面的thread】// 启动线程=========重点【见下面的分析】t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

Worker是ThreadPoolExecutor的内部类,可以看出是一个Runnable,那么肯定重写了run()方法

private final class Workerextends AbstractQueuedSynchronizer implements Runnable
{final Thread thread;Runnable firstTask; //"我的任务"到这里来了// 构造函数Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 利用线程工厂创建了一个线程/*如果final Thread t = w.thread;t.start();启动的话,执行的是这个内部类的run()*/this.thread = getThreadFactory().newThread(this);}// run就这一行public void run() {runWorker(this);}//到这里了final void runWorker(Worker w) {..//"我的任务"Runnable task = w.firstTask;...try {//getTask()从等待队列里面取出Runnablewhile (task != null || (task = getTask()) != null) {....task.run();//==========}}........ 无任务时回收线程processWorkerExit(w, completedAbruptly);}
}

ThreadPoolExecutor的静态内部类 :: jdk自带的四种拒绝策略

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {.........public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
}
// 2.直接丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {.....public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}
// 3.丢弃等待队列的第一个
public static class DiscardOldestPolicy implements RejectedExecutionHandler {........public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {//如果线程池未关闭,就弹出队列头部的元素,然后尝试执行if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}
// 4.调用者运行,直接执行run()方法里面的逻辑。
// 只要线程池没有关闭,就由提交任务的当前线程处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {......public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

总结一下,在了解到了ThreadPoolExecutor的一些类间关系、以及一些基本流程、属性之后。接下来我们来梳理一遍,线程池的运行方式。

  1. 创建线程池(七大参数、四大拒绝策略)

  2. 任务提交

//2.1
executor.execute(() -> {// 任务逻辑
});

2.2任务处理决策链:::

2.2.1尝试创建核心线程:当前工作线程数 < corePoolSize,直接创建新核心线程执行任务

if (workerCount < corePoolSize) {addWorker(command, true); // true表示检查corePoolSizereturn;
}

2.2.2任务入队: 若核心线程已满,任务尝试加入工作队列。

if (workQueue.offer(command)) {// 双重检查线程池状态if (线程池已关闭) 移除任务并拒绝;else if (当前无活跃线程) 创建非核心线程处理队列任务;
}

2.2.3尝试创建非核心线程: 若队列已满且线程数 < maximumPoolSize,创建非核心线程。

else if (!addWorker(command, false)) { // false表示检查maximumPoolSizereject(command); // 触发拒绝策略
}

2.2.4拒绝任务

  1. 工作线程执行任务

3.1Worker初始化:每个Worker绑定一个线程和初始任务(firstTask)。

Worker w = new Worker(firstTask);
final Thread t = w.thread;
t.start(); // 启动线程执行runWorker()

3.2任务执行循环(runWorker方法)

while (task != null || (task = getTask()) != null) {try {task.run(); // 执行任务} finally {task = null; // 清理任务引用}
}

3.3从队列获取任务(getTask方法)

  • 阻塞模式:若为核心线程或允许核心线程超时,调用workQueue.take()永久阻塞。
  • 超时模式:若非核心线程,调用workQueue.poll(keepAliveTime)超时等待。
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
  1. 线程回收与资源释放
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 异常终止时补偿workerCountdecrementWorkerCount();mainLock.lock();try {workers.remove(w); // 从集合中移除Workerif (线程池仍在运行 && 队列非空) addWorker(null, false); // 补充线程处理队列任务} finally {mainLock.unlock();}
}

线程池本质也是Runnable!

一张好图:【来自:【线程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】

在这里插入图片描述

3.探索”锁“

上面探索了线程以及线程池的创建,发现其源码中存在这种代码;

//1.Thread的start方法
public synchronized void start()//2.线程池部分源码addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//3.
LockSupport.park(this);

这些是什么呢?这就是锁。。

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

① synchronized

synchronized 是 Java 中最基本的同步机制,它可以修饰方法或代码块,确保同一时刻只有一个线程可以执行被修饰的代码。

public class SynchronizedTest {public static void main(String[] args) {SynchronizedTest lock1 = new SynchronizedTest();new Thread(lock1::test).start();new Thread(lock1::test2).start();new Thread(lock1::testStatic).start();new Thread(lock1::testFs).start();}public void testStatic() {// 锁的是Class对象synchronized (SynchronizedTest.class){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("testStatic()");}}// 锁的是一个实例对象public void test(){synchronized (this){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("test()");}}public synchronized void test2(){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("test2()");}public void testFs(){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("testFs()");}
}

上面只有test() 和 test2() 是互斥的。也就是1秒过后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起输出打印。

修饰代码块、修饰方法:锁的是该对象;

修饰静态成员:锁的是该类的Class对象 这种方式可以确保对静态变量的访问是线程安全的

还可以锁任意对象。

其实主要弄清楚各自锁的是什么对象就行了,看是否需要的是一个锁,就可以判断是否互斥了;

//锁的是Class对象
public static synchronized void testStatic1() {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("testStaticMethod()");
}
public void testStatic() {synchronized (SynchronizedTest.class){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("testStatic()");}
}
public synchronized void test2(){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("test2()");
}

如上述代码示例,testStatic1和testStatic需要持有的对象是同一个,故这二者会产生互斥,test2需要持有的是该类的一个实例对象,故不会与这二者产生互斥。

需要注意的是: synchronized 并不属于方法定义的一部分,故synchronized 关键字不能被继承。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。
来看看如下示例:

public class Father {public synchronized void method1(){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("Father method1");}
}
//
public class Son extends Father{public void syncSon() {super.method1(); // 调用的父类的同步方法}public void syncSon1() {super.method1();}public void method1() { // 重写了,但是synchronized并不会继承过来try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("Son method1");}public void sonHaha() { method1(); }public void sonHehe() { method1(); }public static void main(String[] args) {Son son = new Son();// new Thread(son::syncSon).start();// new Thread(son::syncSon1).start(); // 会互斥new Thread(son::sonHaha).start();new Thread(son::sonHehe).start(); // 不会}
}

synchronized【隐式锁】的底层原理涉及到 Java 对象头(Object Header)和 Monitor(监视器)两个关键概念。

每个 Java 对象在内存中分为三部分:

  1. 对象头(Header)
    • Mark Word(标记字段):存储哈希码、GC 分代年龄、锁状态等。
    • Klass Pointer(类型指针):指向类的元数据。
  2. 实例数据(Instance Data)
  3. 对齐填充(Padding)

Java 对象头:在 Java 虚拟机中,每个对象都有一个对象头,用于存储对象的元数据信息,包括对象的哈希码、GC 相关信息、锁状态等。对象头通常包含一个标记字段(Mark Word),用于标识对象的锁状态

Monitor(监视器):Monitor 是一种同步机制,负责管理对象的锁。每个对象都与一个 Monitor 相关联。当一个线程尝试进入一个被synchronized修饰的代码块或方法时,它会尝试获取对象的 Monitor。如果 Monitor 处于无锁状态,则当前线程会尝试将其锁定;如果 Monitor 已经被其他线程锁定,则当前线程会进入阻塞状态,直到持有锁的线程释放锁。

// C++ 实现(HotSpot 源码)
class ObjectMonitor {void*   _header;       // Mark Wordvoid*   _owner;        // 持有锁的线程volatile intptr_t  _count;     // 重入次数ObjectWaiter* _WaitSet;        // 等待队列(调用 wait() 的线程)ObjectWaiter* _EntryList;      // 阻塞队列(竞争锁失败的线程)// ...
};

查看本小节开头示例的test()方法的字节码:

在这里插入图片描述

synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,当执行 monitorenter 指令时,线程试图获取锁也就是获取 对象监视器 monitor 的持有权。第一个monitorexit正常退出同步块, 第二个是异常退出同步块。

synchronized优化:

锁升级(JDK 6+)

3.0 无锁

  • 无锁:当第一个线程第一次访问一个对象的同步块时,JVM会在对象头中设置该线程的ID,并将对象头的状态位设置为“偏向锁”。这个过程称为“偏向”,表示对象当前偏向于第一个访问它的线程。

3.1 偏向锁(Biased Locking)

  • 原理:第一个获取锁的线程将线程 ID 写入 Mark Word,后续无需 CAS。这样如果该线程再来的时候,由于是已经设置了锁偏向该线程,故只需比对一下对象头里面的Mark Word就行了。
  • 触发条件:JVM 启用了偏向锁(默认开启),且对象未处于锁定状态。
  • 撤销:当其他线程尝试竞争时,撤销偏向锁并升级为轻量级锁。

3.2 轻量级锁(Lightweight Locking)

  • 加锁流程
    1. 在当前线程栈帧中创建 Lock Record
    2. 通过 CAS 将 Mark Word 复制到 Lock Record,并尝试将 Mark Word 指向 Lock Record。

轻量级锁在以下场景会升级为重量级锁:

  1. 自旋失败:竞争线程自旋一定次数后仍未获取锁。
  2. 竞争加剧:等待锁的线程数超过 JVM 自适应的阈值

3.3 重量级锁(Heavyweight Locking)

  • 实现:通过操作系统提供的互斥量(Mutex)和条件变量实现,线程竞争失败后进入阻塞状态。
  • 性能问题:涉及用户态到内核态的切换,开销较大。

synchronized优化:

锁会升级,从低到高升级,反着降级不可以:无锁状态 -> 偏向锁状态 -> 轻量级锁状态 -> 重量级锁状态

锁类型实现机制适用场景性能开销
偏向锁通过 Mark Word 记录线程 ID单线程重复访问同步块最低
轻量级锁CAS + 自旋(适应性自旋)低竞争、短时间同步中等
重量级锁操作系统互斥量(Mutex) + Monitor高竞争、长时间同步最高

【节选自 锁升级】 :https://blog.csdn.net/weixin_45433817/article/details/132216383

在这里插入图片描述

: synchronized是公平锁吗?

首先要知道公平锁和非公平锁的概念:

  • 公平锁:公平锁指的是多个线程按照申请锁的顺序来获取锁,先到先得。当一个线程请求锁时,如果该锁当前处于可用状态,并且在该线程之前已经有其他线程在等待该锁,那么这个线程会被放入等待队列的尾部,等待前面的线程依次获取并释放锁后,它才能获取锁。
  • 非公平锁:非公平锁则不保证线程获取锁的顺序与申请锁的顺序一致。当一个线程请求锁时,即使有其他线程在等待该锁,它也会先尝试直接获取锁,如果获取成功就可以直接执行,而不用排队等待。

那么,synchronized 基于对象头的 Mark Word 和监视器(Monitor)实现。当一个线程进入同步块时,它会尝试获取对象的监视器。如果监视器处于空闲状态,该线程会直接获取监视器,而不会考虑是否有其他线程已经在等待这个监视器。例如,当一个线程释放了 synchronized 修饰的同步块的锁后,新到来的线程有很大机会直接获取到锁,而不是等待那些在等待队列中的线程,这就体现了其非公平性。

class SynchronizedNonFairExample {private static final Object lock = new Object();private static int counter = 0;public static void main(String[] args) {for (int i = 0; i < 5; i++) {new Thread(() -> {while (true) {synchronized (lock) {counter++;System.out.println(Thread.currentThread().getName() + " 获取到锁,计数: " + counter);try {// 模拟执行任务Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}, "Thread-" + i).start();}}
}

在上述代码中,多个线程竞争 lock 对象的锁。运行代码时,你会发现线程获取锁的顺序并不是按照线程启动的顺序,这就说明了 synchronized 是非公平锁。

② Lock

上一节的synchronized是jdk内置的关键字,属于隐式锁、也叫内置锁。现在这一节来探索一下显式锁,其提供更细粒度的控制(如可中断、超时、公平性),核心实现为 ReentrantLock

public interface Lock {//获取锁。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。void lock();void lockInterruptibly() throws InterruptedException;//如果锁可用,则获取锁,并立即返回值为 true。如果锁不可用,则此方法将立即返回值 false。boolean tryLock();/*如果在给定的等待时间内有空闲,并且当前线程尚未中断,则获取该锁。如果锁可用,则此方法立即返回值 true。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下三种情况之一:锁由当前线程获取;或者其他线程中断当前线程,支持中断锁获取;或指定的等待时间已用*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//释放锁void unlock();//返回绑定到此 Lock 实例的新 Condition 实例。在等待条件之前,锁必须由当前线程持有Condition newCondition();
}

在这里插入图片描述

从上图中,我们可以得知juc包下的几个主要的实现类,绿色圈圈连接的是里面的内部类。

1) ReentrantLock

接下来从我们最熟知的ReentrantLock开始看起吧,他的简单使用:

public class LockTest {Lock lock = new ReentrantLock();int count = 0;public static void main(String[] args) throws InterruptedException {LockTest test = new LockTest();for (int i = 1; i <= 2; i++) {new Thread(test::add).start();}Thread.sleep(2000);/*如果不加锁的话,结果就不一定是两万了*/System.out.println(test.count);}public void add() {// 标准写法 try加锁 finally释放锁try {lock.lock();for (int i = 0; i < 10000; i++) {count++;}} finally {lock.unlock();}}
}

我们分析一下,首先是调用了其无参构造函数创建了一个对象,里面是new了一个看名字是非公平同步标记的对象,那他肯定会有公平的同步标记。

// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 无参构造
public ReentrantLock() {sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
// 然后调用lock.lock()实际是调用的sync.lock();
public void lock() {sync.lock();
}
// 然后调用lock.unlock()实际是调用的sync.release(1);;
public void unlock() {sync.release(1);
}
// 是ReentrantLock的静态内部类
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}
// 是ReentrantLock的静态内部类
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
// Sync继承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {.....
}

从上面额关系我们可以看出一切源头就是AbstractQueuedSynchronizer,也就是我们熟悉的AQS。在这篇文章的“补充知识点”环节中,对AQS做了一个简单的介绍及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487

【博客园】:https://blog.csdn.net/okok__TXF/article/details/146455487

它是是 Java 并发包 java.util.concurrent.locks 下的一个核心类,是构建锁和其他同步工具(如 ReentrantLockSemaphoreCountDownLatch 等)的基础框架。

里面定义了两种资源共享模式:

  • 独占模式(Exclusive):同一时刻只有一个线程能获取资源,如 ReentrantLock

在独占模式的时候,tryAcquire(int):尝试获取资源,成功返回 true,失败返回 falsetryRelease(int):尝试释放资源,成功返回 true,失败返回 false

  • 共享模式(Share):多个线程可同时获取资源,如 Semaphore(信号量)、CountDownLatch(倒计时 latch)。

在共享模式的时候,tryAcquireShared(int):尝试获取共享资源,负数表示失败;0 表示成功但无剩余资源;正数表示成功且有剩余资源。tryReleaseShared(int):尝试释放共享资源,释放后若允许唤醒后续等待节点,返回 true,否则 false

- 非公平锁

回到ReentrantLock中,我们以lock()方法举例子:【lock是无参构造的】非公平锁

//ReentrantLock.java
public void lock() {sync.lock(); // ===========1
}
//内部的抽象类Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}//具体实现类NonfairSync
final void lock() { // ===========2if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6return nonfairTryAcquire(acquires); // 这个是抽象类Sync的
}//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4// 这里的tryAcquire是NonfairSync.java里面的if (!tryAcquire(arg) && // ===========5acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

已经按顺序将1234567标注在了上面,模板方法的设计模式有时候真的会让人晕头转向。。

一句简简单单的lock.lock()方法做了什么呢?

首先 ,会直接尝试CAS获取锁【compareAndSetState(0, 1) 会通过 CAS 操作尝试将 AQS 中的 state 状态从 0 改为 1】,如果成功的话成功则设置当前线程为锁持有者,否则进入AQS的获取流程;【在这里,当线程调用 lock() 时,会先通过 CAS 操作尝试将 AQSstate0 改为 1此时不会检查等待队列中是否有其他线程在排队,只要 CAS 成功,就直接获取锁,体现了 “插队” 的特性。】

其次, 进入aqs的acquire流程

1.tryAcquire(arg)   2.addWaiter(Node.EXCLUSIVE)   3.acquireQueued(xxx)

第一个方法tryAcquire再次尝试获取锁(非公平锁的 tryAcquirenonfairTryAcquire(acquires)),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次尝试 CAS 抢占(体现非公平性,不检查队列);如果不是0,说明被抢占了,判断持有锁的线程是不是当前线程,如果是(体现可重入性),更新state,如果不是返回false。

然后, 若tryAcquire失败【返回false】,调用addWaiter(Node.EXCLUSIVE) 会将当前线程包装成一个独占模式的 Node 节点加入 AQS 队列。接着 acquireQueued 会使线程在队列中自旋等待,不断尝试获取锁或被唤醒后尝试获取,直到成功。

接下来分析一下lock.unlock()方法,这个就在代码里面注释解释了

//ReentrantLock.java
public void unlock() {//调用其内部同步器 sync 的 release 方法:sync.release(1); // ===========1
}
// 内部抽象类Sync
protected final boolean tryRelease(int releases) { // ===========3int c = getState() - releases; // 减少同步状态值(释放一次锁,`state` 减 1)// 检查当前线程是否为锁的持有者,不是则抛异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {// 当 `state` 减为 0 时,完全释放锁free = true;setExclusiveOwnerThread(null);// 清除独占锁的线程引用}setState(c);// 更新 `state` 值return free;// 返回是否完全释放锁
}//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {//tryRelease 尝试释放锁,由子类实现具体逻辑if (tryRelease(arg)) { // ===========2Node h = head;// 获取等待队列头节点if (h != null && h.waitStatus != 0)unparkSuccessor(h);// 唤醒后继节点 ===========4return true;}return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {  ===========5int ws = node.waitStatus;if (ws < 0) // 将头节点的 `waitStatus` 设为 0(取消之前的状态)compareAndSetWaitStatus(node, ws, 0);Node s = node.next; // 找到头节点的后继节点if (s == null || s.waitStatus > 0) { // 若后继节点为空或已取消,从队尾向前找第一个非取消节点s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null) // 唤醒找到的节点对应的线程LockSupport.unpark(s.thread);
}

在内部抽象类Sync中的tryRelease中:

  1. 减少 stateReentrantLock 支持重入,state 记录锁的重入次数。每次调用 unlock()state 减 1。
  2. 检查线程所有权:确保只有锁的持有者才能释放锁,否则抛出 IllegalMonitorStateException
  3. 完全释放锁:当 state 减为 0 时,将 setExclusiveOwnerThread(null),表示锁已完全释放,返回 true

ReentrantLock支持重入:【其实从名字就可以看出来了 – Reentrant(再进去的、就是可重入嘛)】

ReentrantLock lock = new ReentrantLock();
lock.lock();  // state=1,线程持有锁
lock.lock();  // state=2(重入)
lock.unlock(); // state=1(未完全释放)
lock.unlock(); // state=0,释放锁并唤醒等待线程
- 公平锁

那么ReentrantLock的公平锁是什么样子的呢?其实大致步骤都差不多,主要是在FairSync.java

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//公平锁会先检查等待队列是否有前驱节点,若有则不能抢锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; 
}
public final boolean hasQueuedPredecessors() {// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

公平锁在 state == 0 时,会先通过 hasQueuedPredecessors 检查等待队列。若有其他线程在排队,则当前线程不能抢占,必须入队等待,保证 “先来先得”,体现了公平性。而非公平锁跳过这一步,直接抢锁,这就是非公平性的核心体现。

2) 读写锁
//ReadWriteLock 维护一对关联的锁,一个用于只读作,一个用于写入。只要没有写入器,多个读取器线程就可以同时持有读取锁。
//写锁是独占的。读写锁允许在访问共享数据时实现比互斥锁允许的更高级别的并发。它利用了这样一个事实,
//即虽然一次只有一个线程(写入线程)可以修改共享数据,但在许多情况下,任意数量的线程都可以同时读取数据(因此是读取线程)。
//从理论上讲,与使用互斥锁相比,使用读写锁允许的并发性增加将导致性能改进。
public interface ReadWriteLock {//返回用于读取的锁。Lock readLock();//返回用于写入的锁。Lock writeLock();
}

读写锁是否会比使用互斥锁提高性能 取决于读取数据的频率与修改数据的频率、读取和写入作的持续时间以及数据的争用 - 即尝试同时读取或写入数据的线程数。例如,最初填充数据,此后不经常修改的集合;经常搜索(例如某种目录)是使用读写锁的理想候选者。但是,如果更新变得频繁,则数据将花费大部分时间进行独占锁定,并发性几乎没有增加。只有分析和测量才能确定使用读写锁是否适合您的应用程序。

读写锁允许多个线程同时读(没有写入时,多个线程允许同时读(提高性能)),但只要有一个线程在写,其他线程就必须等待(只允许一个线程写入(其他线程既不能写入也不能读取))。也就是读读不冲突、读写就要冲突了。

- ReadWriteLock

下面给出一个简单示例:ReadWriteLock

public class ReadWriteLockTest2 {private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();private static final Lock readLock = readWriteLock.readLock();private static final Lock writeLock = readWriteLock.writeLock();private static int[] a = new int[10];public static void main(String[] args) throws InterruptedException {// 1个线程写new Thread(ReadWriteLockTest2::write).start();for (int i = 0; i < 9; i++)  // 10个线程读new Thread(()-> System.out.println(get())).start();Thread.sleep(2000);}public static Object get() {readLock.lock();try {Thread.sleep(100);return a[1];} catch (InterruptedException e) {throw new RuntimeException(e);} finally {readLock.unlock();}}public static void write() {writeLock.lock();try {a[1]++;System.out.println("写进行~~~");Thread.sleep(1000);System.out.println("写ok~~~");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {writeLock.unlock();}}
}

写操作在执行的时候,读线程是会阻塞的。但是10个读线程之间并没有阻塞

- StampedLock

StampedLock对比 ReentrantReadWriteLock 有所增强,在原先读写锁的基础之上新增了乐观读的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。(乐观锁和悲观锁)

StampedLock具有三种控制读/写访问的模式:

1、写入(Writing):方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。

2、读取(Reading):方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本。

3、乐观读取(Optimistic Reading):tryOptimisticRead方法返回一个非0的stamp,只有当前同步状态没有被写模式所占有是才能获取到。他是在获取stamp值后对数据进行读取操作,最后验证该stamp值是否发生变化,如果发生变化则读取无效,代表有数据写入。这种方式能够降低竞争和提高吞吐量。

简单示例:

public class StampedLockTest {private static final StampedLock stampedLock = new StampedLock();private static double x = 1.0;private static double y = 1.0;public static void main(String[] args) {// 1. 一个线程写new Thread(() -> addXY(1, 1)).start();// 2. 10个线程读for (int i = 0; i < 10; i++) {new Thread(() -> System.out.println(getSArea())).start();}}private static double getSArea() {// 乐观读long stamp = stampedLock.tryOptimisticRead();double s1 = x * y;// 验证一下if (!stampedLock.validate(stamp)) { // 验证失败stamp = stampedLock.readLock(); // 升级为读锁try {s1 = x * y;} finally {stampedLock.unlockRead(stamp);}}return s1;}private static void addXY(double a, double b) {long stamp = stampedLock.writeLock();try {System.out.println("写进行~~");x += a;y += b;Thread.sleep(1000);System.out.println("写ok~~");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {stampedLock.unlockWrite(stamp);}}
}

写操作:writeLock() 返回一个 stamp(时间戳),表示获取写锁成功。写锁是独占的,获取时会阻塞所有读锁和其他写锁(除了乐观读)。通过 unlockWrite(stamp) 释放写锁,stamp 必须与获取时的一致,否则抛出异常。

乐观读: tryOptimisticRead():获取一个 乐观读时间戳,不实际加锁,直接读取数据(成本极低),然后**validate(stamp)**:检查该时间戳对应的读操作期间是否有写操作发生。若 stamp 有效(无写操作),则数据一致;否则需要升级为读锁。锁升级:若验证失败,说明数据可能被修改,通过 readLock() 获取读锁(阻塞直到写锁释放),确保后续读取的数据是最新的。

- 可重入性探讨

JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁

本小节看一下上面两种读写锁的可重入性,首先是ReadWriteLock,从他的实现类来看就是可重入的了【ReentrantReadWriteLock】

ReentrantReadWriteLock中也有Sync的抽象内部类,当调用写锁的lock时,实际是会经过里面重写的tryAcquire,从下面的代码可以知道同一线程可多次获取写锁:当线程获取写锁后,再次调用 writeLock() 会直接成功(无需等待),因为内部维护了一个 重入计数器(类似 ReentrantLock)。每次获取写锁时计数器加 1,释放时减 1,计数器为 0 时才真正释放锁。

protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) {if (w == 0 || current != getExclusiveOwnerThread())return false;if (w + exclusiveCount(acquires) > MAX_COUNT)....// Reentrant acquiresetState(c + acquires);return true;}....return true;
}

ReentrantReadWriteLock 基于 AQS(AbstractQueuedSynchronizer) 实现,通过 state 变量的高 16 位和低 16 位分别记录 读锁的共享次数写锁的重入次数

  • 写锁(独占模式):使用低 16 位记录当前线程的重入次数(和 ReentrantLock 类似)。
  • 读锁(共享模式):使用高 16 位记录所有线程的读锁获取次数,但会通过线程本地变量(ThreadLocal)记录当前线程的读锁重入次数,避免不同线程的计数干扰。

这种设计使得同一线程多次获取写锁或在读锁 / 写锁之间按规则重入时,不会出现死锁,符合可重入锁的定义。

public static void write() {writeLock.lock();try {writeLock.lock();a[1]++;System.out.println("写进行~~~");Thread.sleep(1000);System.out.println("写ok~~~");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {writeLock.unlock(); // 可重入writeLock.unlock();}
}

StampedLock是不可重入的,为什么呢?

  1. 没有锁计数机制:StampedLock 并没有像 ReentrantLock 那样维护一个锁的重入计数。在 ReentrantLock 中,state 变量用于记录锁的重入次数,每次获取锁时 state 加 1,释放锁时 state 减 1。而 StampedLock 中的 state 变量主要用于表示锁的状态和版本信息,并非用于记录重入次数。

  2. 如果一个线程已经持有了 StampedLock 的写锁或读锁,再次尝试获取相同类型的锁时,会出现以下情况:

    • 写锁情况:如果线程已经持有写锁,再次调用 writeLock() 方法,由于写锁是独占的,该线程会被阻塞,因为它会等待自己释放写锁后才能再次获取,这显然会导致死锁。
    • 读锁情况:如果线程已经持有读锁,再次调用 readLock() 方法,虽然读锁是共享的,但 StampedLock 并不会像可重入锁那样允许线程多次获取而不产生问题。而且如果在持有读锁的情况下尝试获取写锁,会导致死锁,因为写锁需要独占资源,而当前线程已经持有了读锁。
private static void addXY(double a, double b) {long stamp = stampedLock.writeLock();try {long lock = stampedLock.writeLock(); // 不可重入System.out.println("写进行~~");x += a;y += b;Thread.sleep(1000);System.out.println("写ok~~");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {stampedLock.unlockWrite(stamp);}
}

③ 锁案例

上面只掌握了一丢丢的理论,没有实践怎么行呢?

1) 交替打印

第一个,我们来实现一下三个线程交替打印A B C试试,第一个线程打印A,第二个B,第三个C

// synchronized实现
public class PrintABCSynchronized {private int now = 1;public static void main(String[] args) {PrintABCSynchronized obj = new PrintABCSynchronized();new Thread(obj::printA).start();new Thread(obj::printB).start();new Thread(obj::printC).start();}public void printA() {for (int i = 0; i < 10; i++) {synchronized (this) {while ( now != 1 ) { // 为什么用while,不用if?留给读者思考try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}System.out.println("A"); now = 2;this.notifyAll();}}}public void printB() {for (int i = 0; i < 10; i++) {synchronized (this) {while ( now != 2 ) { try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}System.out.println("B"); now = 3;this.notifyAll();}}}public void printC() {for (int i = 0; i < 10; i++) {synchronized (this) {while ( now != 3 ) {try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}System.out.println("C"); now = 1;this.notifyAll();}}}
}

wait - notify 【这个只能用在synchronized同步代码块中,是属于Object的方法】上面的缺陷很严重,那就是一下子就唤醒了所有挂起的线程,其实有的线程根本就不用唤醒,有没有一种办法,就是我想唤醒谁就唤醒谁呢?

public class PrintABCLock {private Lock lock = new ReentrantLock();private Condition a = lock.newCondition();private Condition b = lock.newCondition();private Condition c = lock.newCondition();private int flag = 1;public static void main(String[] args) {PrintABCLock obj = new PrintABCLock();new Thread(obj::printA).start();new Thread(obj::printB).start();new Thread(obj::printC).start();}public void printA()  {lock.lock();try {for (int i = 0; i < 10; i++) {while ( flag != 1 ) a.await();System.out.println('A'); flag = 2;b.signal();}} catch ( InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}public void printB() {lock.lock();try {for (int i = 0; i < 10; i++) {while ( flag != 2 ) b.await();System.out.println('B'); flag = 3;c.signal();}} catch ( InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}public void printC() {lock.lock();try {for (int i = 0; i < 10; i++) {while ( flag != 3 ) c.await();System.out.println('C'); flag = 1;a.signal();}} catch ( InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}
}
2) 阻塞队列

下面模仿jdk中ArrayBlockingQueue的源码,给了一个简洁的阻塞队列

class TBlockedQueue<T> {private final Lock lock;private final Condition notEmpty;private final Condition notFull;private final int capacity;private final LinkedList<T> list;public TBlockedQueue(int capacity) {if (capacity <= 0)throw new IllegalArgumentException("Capacity 不能小于1");this.capacity = capacity;list = new LinkedList<>();lock = new ReentrantLock();notEmpty = lock.newCondition();notFull = lock.newCondition();}public void add( T t ) {list.addLast(t);}// 1. 入队 -- 当队列已满时,向队列中添加元素的操作会被阻塞,直到队列有空间可用。public void put( T t ) {if (t == null) throw new NullPointerException();lock.lock();try {while (list.size() == capacity) notFull.await();list.addLast(t);notEmpty.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}// 2. 出队 -- 当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新元素加入public T take() {lock.lock();try {while (list.isEmpty()) notEmpty.await();T t = list.removeFirst();notFull.signal();return t;} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
}

这里自定义的 TBlockedQueue 是一个典型的 有界阻塞队列,其核心思路是通过 锁(Lock)和条件变量(Condition) 实现线程间的同步与协调,确保在多线程环境下对队列的操作是安全的。通过 lock.lock()lock.unlock() 包裹对共享资源 list 的操作,确保同一时刻只有一个线程修改队列。同时,使用 while 循环检查条件(如 list.size() == capacity),防止 虚假唤醒导致条件不满足时错误地继续执行。

  • notEmpty:当队列为空时,take 操作会等待此条件;当有元素入队时,通过 signal() 唤醒等待的消费者线程。
  • notFull:当队列已满时,put 操作会等待此条件;当有元素出队时,通过 signal() 唤醒等待的生产者线程。
3) AQS自定义锁

前面分析可以知道ReentrantLock是以AQS为基础框架来实现的,那么,此节我们自定义来实现一个锁。

见 “Java并发探索–下篇”

4.探索并发工具

5.虚拟线程

见 “Java并发探索–下篇” — 在下面找

【博客园】

https://www.cnblogs.com/jackjavacpp

【CSDN】

https://blog.csdn.net/okok__TXF

end.参考

  1. https://blog.csdn.net/agonie201218/article/details/128712507
  2. https://blog.csdn.net/xu_yong_lin/article/details/117521773
  3. https://www.cnblogs.com/java-bible/p/13930006.html
  4. https://blog.csdn.net/fighting_yu/article/details/89473175
  5. https://tech.meituan.com/2018/11/15/java-lock.html
  6. https://blog.csdn.net/weixin_44772566/article/details/137398521
  7. https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized详解】
  8. https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方网站— 神中神】

相关文章:

  • 智能座舱架构中芯片算力评估
  • 2025系统架构师---管道/过滤器架构风格
  • 【强化学习系列】贝尔曼最优方程
  • SQL常用数据清洗语句
  • Python初学 有差异的知识点总结(一)
  • 如何开展有组织的AI素养教育?
  • kubernetes常用命令 k8s指令大全
  • Oracle备份和恢复
  • 政务大模型的春天,AI办公先萌芽
  • 【软件工程】面向对象编程(OOP)概念详解
  • if consteval
  • 9. 使用Gazebo和Rviz显示机器人(包括运动控制,雷达,摄像头仿真以及显示)
  • yum install 失败
  • 政策支持与市场驱动:充电桩可持续发展的双轮引擎
  • cmake qt 项目编译
  • 亚马逊环保标识运营指南:抢占流量新赛道的6大策略解析
  • 对话式 BI:让数据洞察从“专业门槛”变为“对话本能”
  • 【Keil5-开发指南】
  • 使用Docker安装Harbor
  • 机器学习day2
  • 辽宁省全力开展辽阳一饭店火灾事故救援处置工作
  • 解读|特朗普“助攻”下加拿大自由党“惨胜”,卡尼仍需克服“特鲁多阴影”
  • 总书记考察的上海“模速空间”,要打造什么样的“全球最大”?
  • 五一假期上海推出首批16条“市民健康路线”,这些健康提示请收好
  • 金科服务:大股东博裕资本提出无条件强制性现金要约收购,总代价约17.86亿港元
  • 自称“最美”通缉犯出狱当主播?央广网:三观怎能跟着“五官”跑