Java中的并发工具介绍 1.CountDownLatchCountDownLatch是一个同步辅助类它运行一个线程或者多个线程等待其他线程完成操作。它使用一个计数器进行初始化调用 countDown()方法会使计数器减一当计数器的值减为 0时等待的线程会被唤醒。可以把它想象成一个倒计时器当倒计时结束(计数器为 0)时等待的线程就会去执行自己的任务代码例子public class demo18 { public static void main(String[] args) { CountDownLatch latchnew CountDownLatch(3); for (int i 0; i 3; i) { Thread tnew Thread(()-{ try { System.out.println(Thread.currentThread().getName()在执行任务); Thread.sleep(2000); latch.countDown(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t.start(); } Thread t2new Thread(()-{ try { System.out.println(t2在等待其他线程完成任务); latch.await(); System.out.println(t2去执行任务); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t2.start(); } }2.CycliBarrierCycliBarrier允许一组线程互相等待知道到达一个公共的屏障点这组线程才可以继续执行后续操作并且这个屏障可以被重复循环使用与CountDownLatch不同CycliBarrier侧重线程之间的等待而不是等待某一个线程完全结束某些操作示例代码public class demo13 { // 4个线程参与屏障等待所有线程到位后执行回调 private static final int THREAD_NUM 3; private static final CyclicBarrier barrier new CyclicBarrier(THREAD_NUM, () - { // 屏障回调所有线程到达后优先执行 System.out.println( 全部线程抵达屏障进入下一阶段 \n); }); public static void main(String[] args) { // 创建4个工作线程 for (int i 1; i THREAD_NUM; i) { int workerId i; new Thread(() - { try { // 第一阶段准备工作 System.out.println(工人 workerId 完成前期准备等待其他人); barrier.await(); // 到达屏障阻塞等待所有人 // 第二阶段施工工作 System.out.println(工人 workerId 开始正式施工); Thread.sleep(1000); // 模拟施工耗时 System.out.println(工人 workerId 施工完成等待所有人收工); barrier.await(); // 屏障自动重置第二轮等待生效 // 全部完工 System.out.println(工人 workerId 所有阶段结束收工回家); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }3.Semaphore信号量Semaphore是一个计数信号量用于控制同时访问某一个共享资源的线程数量。通过acquire()方法获取许可使用release()方法释放许可。如果没有许可可用线程将被阻塞直到有许可被释放为止所以Semaphore可以用来限制某些资源数据库连接池、文件操作等的并发量示例代码public class demo15 { public static void main(String[] args) throws InterruptedException { Semaphore semaphorenew Semaphore(2); for (int i 0; i 5; i) { Thread tnew Thread(()-{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()获得了许可); Thread.sleep(2000); System.out.println(Thread.currentThread().getName()释放了许可); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t.start(); } Thread.sleep(1000); Thread threadnew Thread(()-{ System.out.println(thread线程尝试获取许可); try { semaphore.acquire(); System.out.println(thread成功获取许可); Thread.sleep(2000); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); thread.start(); } }4.Future和CallableCallable是一个类似于Runnable的接口但是Callable可以返回具体线程执行完任务后的结果并且可以抛出异常Future可以用来接收一个异步计算的结果可以通过它来获取Callable任务执行后的结果或取消任务4.1获取Callable任务的结果示例代码public class demo16 { public static void main(String[] args) { ExecutorService executorService Executors.newSingleThreadExecutor(); CallableInteger callable()-{ System.out.println(Thread.currentThread().getName()开始执行任务); Thread.sleep(2000); return 42; }; FutureInteger future executorService.submit(callable); System.out.println(主线程执行其他任务); try { Integer result future.get(); System.out.println(Callable执行的结果result); } catch (Exception e){ throw new RuntimeException(); } executorService.shutdown(); } }4.2取消正在运行的任务public class demo19 { public static void main(String[] args) throws InterruptedException, ExecutionException { // 创建单线程池 ExecutorService pool Executors.newSingleThreadExecutor(); // 1. 定义耗时Callable任务 CallableInteger task () - { System.out.println(任务开始执行); // 模拟耗时计算 5秒 for (int i 1; i 5; i) { Thread.sleep(1000); System.out.println(任务执行中第 i 秒); } return 100; }; // 提交任务返回Future FutureInteger future pool.submit(task); // 主线程等待1秒此时任务还在运行中 Thread.sleep(1000); System.out.println(主线程准备取消任务); // 核心取消任务true代表中断正在运行的线程 Thread.sleep(4000); boolean cancelSuccess future.cancel(true); System.out.println(终止线程池执行的任务); System.out.println(取消是否成功 cancelSuccess); System.out.println(任务是否被取消 future.isCancelled()); // 尝试获取结果取消后调用get()会抛出CancellationException try { Integer res future.get(); System.out.println(任务结果 res); } catch (ExecutionException e) { System.out.println(任务执行异常); } // 关闭线程池 pool.shutdown(); } }4.3任务未启动时被取消public class demo20 { public static void main(String[] args) throws InterruptedException { ExecutorService pool Executors.newSingleThreadExecutor(); // 任务1占用线程3秒 CallableVoid task1 () - { Thread.sleep(3000); return null; }; // 任务2排队等待执行 CallableString task2 () - { return 排队任务执行完毕; }; pool.submit(task1); FutureString future2 pool.submit(task2); // 直接取消还没运行的排队任务 boolean cancel future2.cancel(false); System.out.println(排队任务取消结果 cancel); // true pool.shutdown(); } }Thread.sleep() 确实会释放 CPU 时间片但不会释放线程池里的工作线程单线程池只有 1 条工作线程只要它没执行完当前任务队列里的 task2 永远不会被执行。也就是说sleep(3000)后线程进入阻塞态操作系统把 CPU 分给别的线程此时这个线成不占用 CPU但这条工作线程仍然被绑定给 task1线程池不会将这个工作线程释放3 秒休眠结束后这条线程继续执行 task1 剩余逻辑对应到代码这里 task1 已经结束SingleThreadExecutor执行规则单线程池内部最多只有一个线程核心线程的数量为1线程的最多数量也是1此时的执行逻辑是提交第 1 个任务创建唯一工作线程执行再提交任务不会新建任何线程全部丢到阻塞队列排队哪怕任务阻塞、sleep、长时间耗时也不会扩容第二条线程如果唯一线程意外死亡线程池会自动新建一条替补线程维持单线程。sleep(3000)线程进入阻塞态操作系统把 CPU 分给别的线程此时它不占用 CPU 但这条工作线程仍然被绑定给 task1线程池不会把它抽走去跑队列任务 3 秒休眠结束后这条线程继续执行 task1 剩余逻辑这里 task1 已经结束5.关于newSingleThreadExecutor()中唯一的线程“意外死亡”线程意外死亡有两种场景1.execute()提交任务线程池里的线程执行任务时抛异常但是没有try-catch这条工作线程直接终止被线程池销毁2.线程死锁、无限阻塞这种情况需要考我们自己的业务代码才能执行后面的addWorker()方法创建替补线程5.1底层自动重建替补线程源码逻辑如果线程是异常崩溃退出completedAbruptlytrue先把死掉的线程从池中移除判断当前池中活跃线程数 corePoolSize这里 corePoolSize1立刻调用 addWorker() 创建全新工作线程替补上去继续消费队列任务