Java并发编程-线程通讯
Java并发编程-线程通讯
- 线程通讯
- 常见场景
- 实现方法
- 等待通知
- 信号量实现
- 栅栏实现
- 锁机制实现
- CountDownLatch
- CountDownLatch 常用方法说明
- 模拟报表统计场景
- 其他样例
- CyclicBarrier
线程通讯
线程通讯指的是多个线程之间通过共享内存或消息传递等方式来协调和同步它们的执行。在多线程编程中,通常会出现多个线程需要共同完成某个任务的情况,这时就需要线程之间进行通讯,以保证任务能够顺利地执行线程通讯的实现方式主要有以下两种:
- 共享内存:多个线程可以访问同一个共享内存区域,通过读取和写入内存中的数据来进行通讯和同步。
- 消息传递:多个线程之间通过消息队列、管道、信号量等机制来传递信息和同步状态。
常见场景
线程通讯的常见场景有以下几个:
- 多个线程共同完成某个任务:例如一个爬虫程序需要多个线程同时抓取不同的网页,然后将抓取结果合并保存到数据库中。这时需要线程通讯来协调各个线程的执行顺序和共享数据。
- 避免资源冲突:多个线程访问共享资源时可能会引发竞争条件,例如多个线程同时读写一个文件或数据库。这时需要线程通讯来同步线程之间的数据访问,避免资源冲突。
- 保证顺序执行:在某些情况下,需要保证多个线程按照一定的顺序执行,例如一个多线程排序算法。这时需要线程通讯来协调各个线程的执行顺序。
- 线程之间的互斥和同步:有些场景需要确保只有一一个线程能够访问某个共享资源,例如一个计数器。这时需要Λ使用线程通讯机制来实现线程之间的互斥和同步。
实现方法
线程通讯的实现方法有以下几种:
- 等待和通知机制:使用 Object 类的 wait() 和 notify() 方法来实现线程之间的通讯。当一个线程需要等待另个线程执行完某个操作时,它可以调用 wait() 方法使自己进入等待状态,同时释放占有的锁,等待其他线程调用 notify() 或 notifyAl() 方法来唤醒它。被唤醒的线程会重新尝试获取锁并继续执行。
- 信号量机制:使用 Java 中的 Semaphore 类来实现线程之间的同步和互斥。Semaphore 是一个计数器,用来控制同时访问某个资源的线程数。当某个线程需要访问共享资源时,它必须先从 Semaphore 中获取一个许可证,如果已经没有许可证可用,线程就会被阻塞,直到其他线程释放了许可证。
- 栅栏机制:使用 Java 中的 CyclicBarrier 类来实现多个线程之间的同步,它允许多个线程在指定的屏障处等待,并在所有线程都达到屏障时继续执行。
- 锁机制:使用 Java 中的 Lock 接口和 Condition 接口来实现线程之间的同步和互斥。Lock 是一种更高级的互斥机制,它允许多个条件变量(Condition)并支持在同一个锁上等待和唤醒。
等待通知
public class ApplicationMainTest {public static void main(String[] args){Object lock = new Object();ThreadA threadA = new ThreadA(lock);ThreadB threadB = new ThreadB(lock);threadA.start();threadB.start();}static class ThreadA extends Thread{private Object lock;public ThreadA(Object lock) {this.lock = lock;}public void run () {synchronized (lock) {System.out.println("ThreadA start...");try {lock.wait();//线程A等待} catch (Exception e) {e.printStackTrace();}System.out.println("ThreadA end...");}}}static class ThreadB extends Thread{private Object lock;public ThreadB (Object lock) {this.lock = lock;}public void run () {synchronized (lock) {System.out.println("ThreadB start...");lock.notify(); // 唤醒线程ASystem.out.println("ThreadB end...");}}}
}
在这个示例中,定义了一个共享对象 lock,ThreadA 线程先获取 lock 锁,并调用lock.wait() 方法进入等待状态。ThreadB 线程在获取 lock 锁之后,调用 lock.notify() 方法唤醒 ThreadA 线程,然后 ThreadB 线程执行完毕。
运行以上程序的执行结果如下:
ThreadA start…
ThreadB start…
ThreadB end…
ThreadA end…
信号量实现
import java.util.concurrent.Semaphore;public class ApplicationMainTest {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 5; i++) {new Thread(new Worker(i, semaphore)).start();}}static class Worker implements Runnable {private int id;private Semaphore semaphore;public Worker(int id, Semaphore semaphore) {this.id = id;this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire();System.out.println("Worker " + id + " acquired permit.");Thread.sleep(1000);System.out.println("Worker " + id + " released permit.");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}}
}
在这个示例中,创建了一个 Semaphore 对象,并且设置了许可数为 3。然后创建了5个Worker 线程,每个Worker 线程需要获取 Semaphore 的许可才能执行任务。每个 Worker 线程在执行任务之前先调用semaphore.acquire() 方法获取许可,如果没有许可则会阻塞,直到 Semaphore 释放许可。执行完任务之后调用 semaphore.release()方法释放许可。
运行以上程序的执行结果如下:
Worker 1 acquired permit.
Worker 0 acquired permit.
Worker 2 acquired permit.
Worker 1 released permit.
Worker 0 released permit.
Worker 2 released permit.
Worker 3 acquired permit.
Worker 4 acquired permit.
Worker 4 released permit.
Worker 3 released permit.
栅栏实现
在 Java 中,可以使用 CyclicBarrier 或 CountDownLatch 来实现线程的同步,它们两个使用类似,接下来我们就是 CyclicBarrier 来演示一下线程的同步,CyclicBarrier 的示例代码如下:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class ApplicationMainTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {@Overridepublic void run() {System.out.println("All threads have reached the barrier.");}});for (int i = 1; i <= 3; i++) {new Thread(new Worker(i, cyclicBarrier)).start();}}static class Worker implements Runnable {private int id;private CyclicBarrier cyclicBarrier;public Worker(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {System.out.println("Worker " + id + " is working ...");Thread.sleep((long) Math.random()*2000);System.out.println("Worker " + id + " has reached the barrier...");cyclicBarrier.await();System.out.println("Worker " + id + " is continuing the work...");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}
}
运行以上程序的执行结果如下:
Worker 1 is working …
Worker 2 is working …
Worker 3 is working …
Worker 3 has reached the barrier…
Worker 1 has reached the barrier…
Worker 2 has reached the barrier…
All threads have reached the barrier.
Worker 2 is continuing the work…
Worker 1 is continuing the work…
Worker 3 is continuing the work…
从以上执行结果可以看出,CyclicBarrier 保证了所有 Worker 线程都到达 Barrier 之后才能继续执行后面的任务,这样可以保证线程之间的同步和协作。在本示例中,所有线程都在 Barrier 处等待了一段时间,等所有线程都到达 Barrier 之后才继续执行后面的任务。
锁机制实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ApplicationMainTest {private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();private volatile boolean flag = false;public static void main(String[] args) {ApplicationMainTest demo = new ApplicationMainTest();new Thread(demo::waitCondition).start();new Thread(demo::signalCondition).start();}private void waitCondition() {lock.lock();try {while (!flag) {System.out.println(Thread.currentThread().getName() + " is waiting for sig ");condition.await();}System.out.println(Thread.currentThread().getName() + " received signal.");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}private void signalCondition() {lock.lock();try {Thread.sleep(3000);//模拟等待一段时间后发送信号flag = true;System.out.println(Thread.currentThread().getName()+ " sends signal.");condition.signalAll();} catch(InterruptedException e){e.printStackTrace();} finally {lock.unlock();}}}
47
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ApplicationMainTest {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private volatile boolean flag = false;public static void main(String[] args) {ApplicationMainTest demo = new ApplicationMainTest();new Thread(demo::waitCondition).start();new Thread(demo::signalCondition).start();
}private void waitCondition() {lock.lock();try {while (!flag) {System.out.println(Thread.currentThread().getName() + " is waiting for sig ");condition.await();}System.out.println(Thread.currentThread().getName() + " received signal.");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}
}private void signalCondition() {lock.lock();try {Thread.sleep(3000);//模拟等待一段时间后发送信号flag = true;System.out.println(Thread.currentThread().getName()+ " sends signal.");condition.signalAll();} catch(InterruptedException e){e.printStackTrace();} finally {lock.unlock();}
}
}
在这个示例中,创建了一个 Condition 对象和一个 Lock 对象,然后创建了两个线程,一个线程等待 Condition信号,另一个线程发送 Condition 信号。
等待线程在获得锁后,判断标志位是否为 true,如果为 false,则等待 Condition 信号;如果为 true,则继续执行后面的任务。
发送线程在获得锁后,等待一段时间后,将标志位设置为true,并且发送 Condition 信号。
运行以上程序的执行结果如下:
Thread-0 is waiting for sig
Thread-1 sends signal.
Thread-0 received signal.
从上面执行结果可以看出,等待线程在等待 Condition 信号的时候被阻塞,直到发送线程发送了 Condition 信号,等待线程才继续执行后面的任务。Condition 对象提供了一种更加灵活的线程通信方式,可以精确地控制线程的等待和唤醒。
小结
线程通讯指的是多个线程之间通过共享内存或消息传递等方式来协调和同步它们的执行,它的实现方法有很多:比如 wait() 和 notify() 的等待和通知机制、Semaphore 信号量机制、CyclicBarrier 栅栏机制,以及 Condition 的锁机制等。
CountDownLatch
CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。
CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,每次调用 countDown() 方法,计数器减1,当计数器递减到0时,会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。
CountDownLatch 常用方法说明
//构造方法,创建一个值为count 的计数器。
CountDownLatch(int count);
//阻塞当前线程,将当前线程加入阻塞队列。
await();
//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,
await(long timeout, TimeUnit unit);
//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
countDown();
模拟报表统计场景
运营系统有统计报表、业务为统计每日的用户新增数量、订单数量、商品的总销量、总销售额…等多项指标统一展示出来,因为数据量比较大,统计指标涉及到的业务范围也比较多,所以这个统计报表的页面一直加载很慢,所以需要对统计报表这块性能需进行优化。
统计报表页面涉及到的统计指标数据比较多,每个指标需要单独的去查询统计数据库数据,单个指标只要几秒钟,但是页面的指标有10多个,所以整体下来页面渲染需要将近一分钟。
因为主线程需要每个线程的统计结果进行聚合,然后返回给前端渲染,所以这里需要提供一种机制让主线程等所有的子线程都执行完之后再对每个线程统计的指标进行聚合。 这里我们使用CountDownLatch 来完成此功能。
//用于聚合所有的统计指标
private static Map map=new HashMap();
//创建计数器,这里需要统计4个指标
private static CountDownLatch countDownLatch = new CountDownLatch(4);public static void main(String[] args) {//记录开始时间long startTime=System.currentTimeMillis();Thread countUserThread = new Thread(new Runnable() {public void run() {try {System.out.println("正在统计新增用户数量...");Thread.sleep(3000);//任务执行需要3秒map.put("userNumber",1);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计新增用户数量完毕");} catch (InterruptedException e) {e.printStackTrace();}}});Thread countOrderThread = new Thread(new Runnable() {public void run() {try {System.out.println("正在统计订单数量...");Thread.sleep(3000);//任务执行需要3秒map.put("countOrder",2);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计订单数量完毕");} catch (InterruptedException e) {e.printStackTrace();}}});Thread countGoodsThread = new Thread(new Runnable() {public void run() {try {System.out.println("正在统计商品销量...");Thread.sleep(3000);//任务执行需要3秒map.put("countGoods",3);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计商品销量完毕");} catch (InterruptedException e) {e.printStackTrace();}}});Thread countMoneyThread = new Thread(new Runnable() {public void run() {try {System.out.println("正在统计总销售额...");Thread.sleep(3000);//任务执行需要3秒map.put("countMoney",4);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计销售额完毕");} catch (InterruptedException e) {e.printStackTrace();}}});//启动子线程执行任务countUserThread.start();countGoodsThread.start();countOrderThread.start();countMoneyThread.start();try {//主线程等待所有统计指标执行完毕countDownLatch.await();long endTime=System.currentTimeMillis();//记录结束时间System.out.println("------统计指标全部完成--------");System.out.println("统计结果为:"+map.toString());System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒");} catch (InterruptedException e) {e.printStackTrace();}
}
执行结果如下:
正在统计新增用户数量…
正在统计订单数量…
正在统计总销售额…
正在统计商品销量…
统计销售额完毕
------统计指标全部完成--------
统计新增用户数量完毕
统计订单数量完毕
统计商品销量完毕
统计结果为:{countOrder=2, userNumber=1, countGoods=3}
任务总执行时间为3秒
其他样例
public static void main(String[] args) {int numThreads = 3;System.out.println("子线程数量:" + numThreads);CountDownLatch latch = new CountDownLatch(numThreads);System.out.println("for循环前 " + Thread.currentThread().getName() +" 线程,,,");for (int i = 0; i < numThreads; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成");latch.countDown();}).start();}System.out.println("for循环后 " + Thread.currentThread().getName() +" 线程。。。");try {System.out.println(Thread.currentThread().getName() + " 主线程等待所有线程完成...");latch.await();System.out.println(Thread.currentThread().getName() + " 所有线程已完成,主线程继续执行");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("分隔符=======");
}
运行以上程序的执行结果如下:
子线程数量:3
for循环前 main 线程,,,
for循环后 main 线程。。。
main 主线程等待所有线程完成…
Thread-1 开始工作
Thread-2 开始工作
Thread-0 开始工作
Thread-1 工作完成
Thread-2 工作完成
Thread-0 工作完成
main 所有线程已完成,主线程继续执行
分隔符=======
在上述代码中,主线程创建了一个 CountDownLatch,并初始化为 numThreads。每个子线程在完成工作后调用 countDown() 方法,主线程通过 await() 方法等待,直到所有子线程都调用了 countDown(),即计数器为零,然后主线程继续执行。
CyclicBarrier
java.util.concurrent.CyclicBarrier 也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个屏障点。与 CountDownLatch 不同的是,CyclicBarrier 可以重复使用。
https://blog.csdn.net/qq_21383435/article/details/110276919
public static void main(String[] args) {int numThreads = 3;CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {System.out.println("所有线程已到达屏障点,开始下一步操作");});for (int i = 0; i < numThreads; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 开始工作");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {System.out.println(Thread.currentThread().getName() + " 到达屏障点");barrier.await();} catch (Exception e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 继续工作");}).start();}
}
运行以上程序的执行结果如下:
Thread-0 开始工作
Thread-1 开始工作
Thread-2 开始工作
Thread-2 到达屏障点
Thread-0 到达屏障点
Thread-1 到达屏障点
所有线程已到达屏障点,开始下一步操作
Thread-1 继续工作
Thread-2 继续工作
Thread-0 继续工作
在这个示例中,创建了一个 CyclicBarrier,当所有 numThreads 个线程都调用 await() 方法到达屏障点时,会执行传入的 Runnable 任务(在这个例子中是打印一条消息),然后所有线程继续执行后续操作。如果需要,CyclicBarrier 可以被再次使用。