4 同步器

Wu Jun 2020-02-21 20:40:12
05 Java > 00 Java 基础 > 07 并发

java.util.concurrent 包包含了几个能帮助人们管理相互合作的线程集的类。这些机制具有为线程之间的共用集结点模式提供的“预置功能”。如果有一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的库类而不要试图提供手工的锁与条件的集合。

它能做什么 说明
CyclicBarrier 允许线程集等待直至其中预定数目的线程到达一个公共障栅,然后可以选择执行一个处理障栅的工作 当大量的线程需要在它们的结果可用之前完成时
Phaser 类似于循环障栅,不过有一个可变的计数 Java SE 7 中引入
CountDownLatch 允许线程集等待直到计数器减为 0 当一个或多个线程需要等待直到指定数目的事件发生
Exchanger 允许两个线程在要交换的对象准备好时交换对象 当两个线程工作在同一个数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据
Semaphore 允许线程集等待直到被允许继续运行为止 限制访问资源的线程总数。如果许可数是 1,常常阻塞线程直到另一个线程给出许可为止
SynchronousQueue 允许一个线程把对象交给另一个线程 在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时

1 信号量(Semaphore)

信号量(Semaphore)可以控同时访问的线程个数。

方法
private Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
Thread.currentThread().getName()
semaphore.release();

2 闭锁(CountDownLatch)

CountDownLatch 让一个线程集等待直到计数变为 0。倒计时门栓是一次性的。一旦计数为 0,就不能在重用了。

方法
final CountDownLatch latch = new CountDownLatch(2);
latch.countDown();
latch.await();

3 栅栏(CyclicBarrier)

CyclicBarrier 类实现让一组线程等待至某个状态(barrier)之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier可以被重用。

方法
//构造一个障栅,并给出参与的线程数  
CyclicBarrier barrier = new CyclicBarrier(nthreads);
//每一个线程做一些工作,完成后在障栅上调用 await
public void run(){
    doWork();
    brrier.await();
    ...
}

如果任何一个在障栅上等待的线程离开了障栅,那么障栅就被破坏了。在这种情况下,所有其他线程的 await 方法抛出 BrokenBarrierException 异常。那些已经在等待的线程立即终止 await 的调用。

3.1 可选的障栅

可以提供一个可选的障栅(barrier action),当所有线程到达障栅的时候就会执行这一动作。

Runnable barrierAction = ...;
CyclicBarrier barrier = new CyclicBarrier(nthreads, barrierAction);

该动作可以收集那些单个线程的运行结果。

Phaser 类增加了更大的灵活性,允许改变不同阶段中参与线程的个数。

4 交换器(Exchanger)

当两个线程在同一数据缓冲区的两个实例上工作的时候,就可以使用交换器(Exchanger)。典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。

5 同步队列(SynchronousQueue)

同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用 SynchronousQueue 的 put 方法时,它会阻塞直到另一个线程调用 take 方法为止,反之亦然。与 Exchanger 的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。

即使 SynchronousQueue 类实现了 BlockingQueue 接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的 size 方法总是返回 0。 对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。

5.1 队列方法

5.2 八种队列

5.3 阻塞队列

阻塞队列(BlockingQueue)提供了可阻塞的puttake方法,以及支持定时的offerpoll方法

1)串行线程封闭
2)双端队列与工作密取

6 自定义同步工具

创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造。但如果类库没有提供你需要的功能,那么还可以使用 Java 语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的 Condition 对象以及 AbstractQueuedSynchronizer 框架。

6.1 状态依赖性的管理

并发程序中,依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来要更为方便且更不易出错。

条件队列

条件队列使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。

正如每个 Java 对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且 Object 中的 wait、not 的和 not 的 A11 方法就构成了内部条件队列的 API。

如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现,但条件队列使得在表达和管理状态依赖性时更加简单和高效。

6.2 使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确地使用。要尽量基于 LinkedBlockingQueue、Latch、Semaphore 和 FutureTask 等类来构造程序。

1)条件谓词

将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。

每一次 wait 调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的 wait 时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件为此的状态变量。

2)过早唤醒

每当线程从 wait 中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。

当使用条件等待时(例如 Object.wait 或 Condition.await):

3)丢失的信号

丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。

4)通知

每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。

大多数情况下应该优先选择 notifyAll 而不是单个的 notify。

只有同时满足一下两个条件时,才能用单一的 notify 而不是 notufyAll:

5)子类的安全问题

要想支持子类化,那么在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。

对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写人正式文档),要么完全阻止子类参与到等待和通知等过程中。另外一种选择就是完全禁止子类化。

6)封装条件队列

通常,应该把条件队列封装起来

而线程安全类的最常见设计模式,建议使用对象的内置锁来保护对象自身的状态

可设计为使用私有的锁对象和条件队列

7)入口协议与出口协议

入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。

6.3 显式的 Condition 对象

1)内置条件队列的缺陷

每个内置锁只能有一个相关联的条件队列,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。无法满足在使用 notifyAll 时所有等待线程为同一类型的需求

2)Condition

对于每个 Lock,可以有任意数量的 Condition 对象。调用 Lock 的 Lock.newCondition 方法创建Condition。

Condition 比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以使可中断的或不可中断的、基于时限的等待,以及公平或非公平的队列操作

在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、signal 和 signalAll。

3)选择

在使用显式的 Condition 和内置条件队列之间进行选择时,与在 ReentrantLock 和 synchronized 之间进行选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者 在每个锁上对应多个等待线程集,那么应该优先使用 Condition 而不是内置条件队列。

6.4 Synchronizer 剖析

AQS(AbstractQueuedSynchronizer),是 JDK 下提供的一套用于实现基于 FIFO 等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子 int 值来表示状态的同步器的基类。

基于 AQS 构建的有 ReentrantLock 、Semaphore、 CountDownLatch、 ReentrantReadWriteLock、Synchronousueue 和 FutureTask。

基于 AQS 来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

6.5 AbstractQueuedSynchronizer(AQS)

大多数开发者都不会直接使用 AQS,标准同步器类的集合能够满足绝大多数情况的需求。但如果能了解标唯同步器类的实现方式,那么对于理解它们的工作原理是非常有帮助的。

在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。其次,就是更新同步器的状态。

java.util.concurrent 中的所有同步器类都没有直接扩展 AQS,而是都将它们的相应功能委托给私有的 AQS 子类来实现。

6.6 java.util.concurrent 同步器类中的 AQS

1)ReentrantLock

ReentrantLock 将同步状态用于保存锁获取操作的次数,并且还维护一个 owner 变量来保存当前所有者线程的标识符。

2)Semaphore 与 CountDownLatch

Semaphore 将 AQS 的同步状态用于保存当前可用许可的数量。

CountDownLatch 在同步状态中保存的是当前的计数值。

3)FutureTask

在 FutureTask 中,AQS 同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。

FutureTask 还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。

此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

4)ReentrantReadWriteLock

基于 AQS 实现的 ReentrantReadWriteLock,单个 AQS 子类同时管理读取加锁和写入加锁。分别使用了一个 16 位的状态来表示写入锁和读取锁的计数。写入锁为独占锁,读取锁为共享锁。