2023-07-14
JAVA
0

目录

小谈线程并发工具
CountDownLatch
简介
简单使用
基本原理
CyclicBarrier
Semaphore
Exchanger
AQS(AbstractQueuedSynchronizer)同步器
FIFO等待队列的数据结构

小谈线程并发工具

  Java 并发包提供了许多线程并发工具,用于在多线程环境下管理和控制线程的执行,下面就详细介绍CountDownLatchCyclicBarrierSemaphoreExchanger的原理和使用。

CountDownLatch

简介

CountDownLatch 是 Java 并发编程中的一个工具类,它用于实现线程间的等待。它的作用是让某个线程等待其他线程完成操作后再继续执行

简单使用

运行代码

温馨提示

复制下面代码,在idea指定包下粘贴会自动生成Example.java

java
import java.util.concurrent.CountDownLatch; public class Example { public static void main(String[] args) throws InterruptedException { int workerCount = 3; CountDownLatch latch = new CountDownLatch(workerCount); // 创建并启动多个工作线程 for (int i = 0; i < workerCount; i++) { WorkerThread worker = new WorkerThread(latch); worker.start(); } System.out.println("等待工作线程完成..."); // 主线程等待所有工作线程完成 latch.await(); System.out.println("所有工作线程已完成,继续执行主线程操作"); } } class WorkerThread extends Thread { private final CountDownLatch latch; public WorkerThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { // 模拟工作线程执行操作 try { Thread.sleep(1000); // 假设操作耗时 1 秒 System.out.println("工作线程执行完成"); } catch (InterruptedException e) { e.printStackTrace(); } // 操作完成后调用 countDown() latch.countDown(); } }

执行结果

shell
等待工作线程完成... 工作线程执行完成 工作线程执行完成 工作线程执行完成 所有工作线程已完成,继续执行主线程操作

基本原理

  当某个线程完成了一定的操作后,调用countDown()方法可以将 CountDownLatch 的计数器减一。每次调用该方法,计数器都会减少一次。当计数器减到零时,所有被阻塞的线程将被释放,可以继续执行。

  1. 将 CountDownLatch 内部计数器的值减一。
  2. 检查计数器的值,如果减一后计数器的值为零,则唤醒所有因调用await()方法而阻塞的线程。
  • countDown()方法通常由多个工作线程并行调用,每个线程完成一定的操作后调用该方法。通过调用countDown()方法,工作线程告知主线程它已经完成了一部分工作,在countDown()方法内部,会对计数器进行原子操作,保证多个线程同时调用countDown()方法时,不会出现竞态条件。
  • 一旦计数器减到零,主线程将被唤醒,可以继续执行后续操作。这样就实现了主线程等待多个子线程完成某个任务后再继续执行的效果。
  • 在每次调用countDown()方法后,都会检查计数器的值,如果计数器的值减到零,就会唤醒所有因调用 await() 方法而阻塞的线程。唤醒操作也是通过AQS(AbstractQueuedSynchronizer)同步器来实现的,以确保被阻塞的线程能够被正确地唤醒。

注意

在使用 CountDownLatch 时,countDown()方法的调用次数必须与 CountDownLatch 的初始计数器值相匹配。如果countDown()方法的调用次数超过了初始值,计数器的值将保持为零,并且对后续调用不产生任何影响。一旦计数器达到零,后续的countDown()调用将被忽略。

CyclicBarrier

CyclicBarrier 是 Java 并发包中的一个同步工具,用于等待一组线程达到一个同步点(barrier point)。它可以用于在多个线程之间创建一个同步点,当所有线程都到达这个同步点时,它们可以继续执行。

CyclicBarrier 适用于这样的场景:多个线程在不同的子任务中执行,需要在某个点上等待其他所有线程完成自己的子任务,然后再继续执行下一阶段的任务。

CyclicBarrier 的主要特点包括:

  1. 同步点: CyclicBarrier 允许多个线程在同一个同步点上等待。当所有线程都到达同步点时,它们将继续执行。

  2. 可重用性: CyclicBarrier 可以被重复使用。一旦所有线程都到达同步点并释放了屏障,CyclicBarrier 又可以重新用于下一轮的同步。

  3. 可选的回调动作: 你可以在所有线程都达到同步点时执行一个可选的回调动作。

  4. 计数器重置: 在所有线程都到达同步点后,计数器会自动重置,从而可以用于下一轮的同步。

以下是一个简单的示例,展示了如何使用 CyclicBarrier 进行线程同步:

java
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int numThreads = 3; Runnable barrierAction = () -> System.out.println("All threads have reached the barrier."); CyclicBarrier cyclicBarrier = new CyclicBarrier(numThreads, barrierAction); for (int i = 0; i < numThreads; i++) { Thread thread = new Thread(() -> { try { System.out.println("Thread " + Thread.currentThread().getId() + " is waiting at the barrier."); cyclicBarrier.await(); // 等待所有线程达到同步点 System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } } }

在这个示例中,我们创建了一个 CyclicBarrier,并指定了需要等待的线程数量(numThreads)。每个线程执行到 cyclicBarrier.await() 时会等待其他线程到达同步点。一旦所有线程都到达同步点,就会执行可选的回调动作(在本例中是打印信息)并继续执行后续操作。

Semaphore

Semaphore 是 Java 并发包中的一个同步工具,用于控制同时访问某个资源的线程数量。它可以看作是一个计数器,用于管理许可(permit)的分发。Semaphore 主要用于限制同时访问某一资源的线程数量,从而控制并发度。

Semaphore 的主要方法包括:

  1. Semaphore(int permits) 构造方法,指定初始许可数量。

  2. void acquire() 获取一个许可,如果没有可用的许可,则会阻塞。

  3. void acquire(int permits) 获取指定数量的许可,如果没有足够的可用许可,则会阻塞。

  4. void release() 释放一个许可。

  5. void release(int permits) 释放指定数量的许可。

  6. int availablePermits() 获取当前可用的许可数量。

Semaphore 的经典应用之一是限制对有限资源的访问。例如,假设有一个数据库连接池,你可以使用 Semaphore 来控制同时可用的连接数。

以下是一个简单的示例,展示了如何使用 Semaphore 控制线程的并发访问数量:

java
import java.util.concurrent.Semaphore; public class SemaphoreExample { public static void main(String[] args) { int numThreads = 5; int numPermits = 2; // 同时允许的线程数 Semaphore semaphore = new Semaphore(numPermits); for (int i = 0; i < numThreads; i++) { Thread thread = new Thread(() -> { try { semaphore.acquire(); // 获取许可 System.out.println("Thread " + Thread.currentThread().getId() + " acquired a permit."); // 模拟线程执行一些操作 Thread.sleep(1000); semaphore.release(); // 释放许可 System.out.println("Thread " + Thread.currentThread().getId() + " released a permit."); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } } }

在此示例中,我们创建了一个有限数量的许可(numPermits),然后启动多个线程。每个线程在执行之前通过 acquire() 方法获取许可,执行一些操作后再通过 release() 方法释放许可。通过这种方式,我们可以限制同时访问的线程数量,从而控制并发度。

Exchanger

在 Java 并发包中,Exchanger 是一个用于两个线程之间交换数据的同步工具。它提供了一个点,允许两个线程在此点上交换数据。当两个线程都到达交换点时,它们可以交换数据,并继续执行。

Exchanger 提供了以下主要方法:

  1. exchange(V x) 该方法用于交换数据。当某个线程调用此方法时,它将阻塞,直到另一个线程也调用了相同的 exchange() 方法。然后两个线程交换数据,并继续执行。

  2. exchange(V x, long timeout, TimeUnit unit) 类似于上面的方法,但允许设置一个超时时间。如果超过指定的时间,交换操作还未完成,则会解除阻塞。

Exchanger 主要用于两个线程之间的数据传输,可以用于解决生产者-消费者模型等问题。例如,一个线程可以用于生成数据,另一个线程用于处理数据,它们可以通过 Exchanger 在合适的时间点进行数据交换。

下面是一个简单的示例,展示了如何使用 Exchanger 进行数据交换:

java
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); Thread producerThread = new Thread(() -> { try { String data = "Hello from Producer"; System.out.println("Producer is producing: " + data); exchanger.exchange(data); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { String receivedData = exchanger.exchange(null); System.out.println("Consumer received: " + receivedData); } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); try { producerThread.join(); consumerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }

在此示例中,生产者线程生成数据并调用 exchange() 方法,然后阻塞。消费者线程调用 exchange() 方法等待交换,一旦生产者线程调用了 exchange(),两个线程会交换数据并继续执行。

AQS(AbstractQueuedSynchronizer)同步器

AbstractQueuedSynchronizer(简称为AQS)是Java中用于实现同步器的抽象基类。它为实现各种同步机制(如ReentrantLock、CountDownLatch、Semaphore等)提供了基础框架。AQS通过内部维护一个FIFO的等待队列来管理线程的等待和唤醒,使得开发者能够相对容易地实现自定义的同步机制。

以下是关于AbstractQueuedSynchronizer的详细解释:

  1. 同步器基础: AQS 提供了一个底层的框架,用于实现各种同步器。它通过提供一组状态变量和操作来管理线程的访问权限和等待状态。

  2. 状态变量: AQS 的核心是状态变量,通过维护不同的状态来表示不同的同步状态。这个状态变量的具体含义和使用方式是由子类去定义和实现的。

  3. 队列管理: AQS 内部维护了一个等待队列,用于存放等待获取同步状态的线程。这个队列是一个FIFO队列,也就是先等待的线程先被唤醒。

  4. 模板方法: AQS 提供了一些模板方法,这些方法在子类中可以被覆写以实现特定的同步逻辑。这些方法包括tryAcquiretryReleasetryAcquireSharedtryReleaseShared等。

  5. 独占和共享模式: AQS 支持两种同步模式,独占模式(exclusive)和共享模式(shared)。独占模式是一次只能有一个线程获取同步状态,而共享模式允许多个线程同时获取同步状态。

  6. 独占模式实现: 如果一个子类希望实现独占模式的同步,需要覆写tryAcquiretryRelease方法。这些方法负责获取和释放同步状态。

  7. 共享模式实现: 如果一个子类希望实现共享模式的同步,需要覆写tryAcquireSharedtryReleaseShared方法。这些方法负责获取和释放共享的同步状态。

  8. 阻塞与唤醒: AQS 提供了方法用于线程的阻塞和唤醒,这些方法包括acquirereleaseacquireSharedreleaseShared等。

  9. CAS(比较并交换)操作: AQS 使用CAS操作来更新状态变量,确保线程之间的原子性操作。

等待队列是 AbstractQueuedSynchronizer (AQS) 中的一个重要数据结构,用于管理等待获取同步资源的线程。等待队列通常采用双向链表的形式来组织,它包括以下关键部分:

FIFO等待队列的数据结构

  1. Node 节点:等待队列中的每个线程都被封装成一个 Node 节点。Node 节点通常包含以下几个字段:

    • Thread 对象:表示等待线程的引用。
    • int waitStatus:表示节点的等待状态,用于标识节点的状态。等待状态的值可以表示不同的含义,例如,表示线程是否被取消、是否需要排队、是否已经获取锁等。
    • Node prev:指向前一个节点的引用,用于构成双向链表。
    • Node next:指向下一个节点的引用,也用于构成双向链表。
  2. 头节点(Head):等待队列中的头节点通常是一个虚拟节点,不代表任何一个等待线程,主要用于简化队列操作。头节点的 prev 字段指向自己,next 字段指向队列中的第一个等待线程节点。

  3. 尾节点(Tail):等待队列中的尾节点是一个指向队列中最后一个等待线程节点的引用。

  4. 入队操作:线程在等待获取同步资源时,会通过入队操作将自己封装成一个节点添加到等待队列的尾部。这通常是一个原子操作,确保线程被正确地添加到队列中。

  5. 出队操作:当线程获取到同步资源时,会执行出队操作,将自己从等待队列中移除。这同样是一个原子操作,确保线程的状态正确地变为已获取锁。

  6. 等待与唤醒机制:等待队列还包括了线程等待和唤醒的机制。当一个线程无法获取同步资源时,它会被阻塞并加入到等待队列中,等待其他线程释放资源或者唤醒它。唤醒操作通常由持有资源的线程执行。

  7. 队列的顺序:队列的顺序通常是先进先出(FIFO)的,也就是等待时间最长的线程在队列的前面。但在非公平锁的情况下,队列顺序可能会更灵活,等待时间短的线程有可能先被唤醒。

相关信息

等待队列是 AQS 中用于管理等待线程的数据结构,它采用双向链表的形式,每个节点代表一个等待线程。等待队列的操作是 AQS 实现同步的关键之一,通过合理管理等待队列,AQS 可以实现多种同步器,如独占锁、共享锁、信号量等。

总的来说,AbstractQueuedSynchronizer为实现自定义同步器提供了一个强大的框架。虽然它的理解和使用可能相对复杂,但它为开发者提供了在多线程环境下构建高效同步机制的基础工具。同时,Java并发包中的许多同步类,如ReentrantLockCountDownLatchSemaphore等,都是基于AQS实现的。