构建自定义的同步工具
类库中包含许多状态依赖的类,例如FutureTask、Semaphore和BlockingQueue等。在这些类的一些操作中有着基于状态的前提条件,例如,不能从一个空的队列中删除元素,或者获取一个尚未结束的任务的计算结果,在这些操作可以执行之前,必须等到队列进入“非空”状态,或者任务进入“已完成”状态。
创建状态依赖类最简单的方法是在类库中现有状态类的基础上进行构造。但如果类库没有提供你需要的功能,那么还可以使用Java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的Condition对象以及AbstractQueuedSynchronizer框架。
状态依赖性的管理
在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须非空”),那么这个条件将永远无法成真。因此,在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。但在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池可能在几条指令之前还是空的,但现在却变为非空的,因为另一个线程可能会返回一个元素到资源池。对于并发对象上依赖状态的方法,虽然有时候在前提条件不满足的情况下不会失败,但通常有一种更好的选择,即等待前提条件变为真。
依赖状态的操作可以一直阻塞直到可以执行,这比使它们先失败在实现起来要更为方便且更不易出错内置的条件队列可以使线程一直阻塞,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒它们。
下面的伪代码的加锁模式有些不同寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使得它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁,以便其他线程可以修改对象的状态,否则,前提条件就永远无法成真。在再次测试前提条件之前,必须重新获得锁。
acquire lock on object state
while(precondition does not hold) {
release lock
wait until precondition might hold
optionally fail if interrupted or timeout expires
reacquire lock
}
perform action
release lock
在生产者——消费者的设计中经常会使用像ArrayBlockingQueue这样的有界缓存。在有界缓存提供的put和take操作中都包含一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态,也可以保持阻塞直到对象进入正确的状态。
接下来介绍有界缓存的几种实现,其中将采用不同的方法来处理前提条件失败的问题,在每种实现中都扩展了下面程序的BaseBoundedBuffer,在这个类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf、head、tail和count)均由缓存的内置锁来保护。它还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层的状态将对子类隐藏。
@ ThreadSafe
public abstract class BaseBoundedBuffer<E> {
@GuardeBy( "this" ) private final E[] buf;
@GuardeBy( "this" ) private int tail;
@GuardeBy( "this" ) private int head;
@GuardeBy( "this" ) private int count;
protected BaseBoundedBuffer( int capacity) {
this.buf = (E[]) new Object[capacity];
}
protected synchronized final void doPut(E e) {
buf[tail] = e;
if (++tail == buf.length) {
tail = 0;
}
++count;
}
protected synchronized final E doTake() {
E E = buf[head];
buf[head] = null ;
if (++head == buf.length) {
head = 0;
}
--count;
return E;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
示例:将前提条件的失败传递给调用者
当不满足前提条件时,有界缓存不会执行相应的操作
@ ThreadSafe
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer( int size){
super (size);
}
public synchronized void put(V v){
if (isFull()){
throw new BufferFullException ();
}
doPut(v);
}
public synchronized V take(){
if (isEmpty())
throw new BufferEmptyExeption ();
return doTake();
}
}
尽管这种方式实现起来很简单,但使用起来却并非如此。异常应该用于发生异常条件的情况中。“缓存已满”并不是有界缓存的一个异常条件,就像“红灯”并不表示交通信号灯出现了异常。在实现缓存时得到的简化(使调用者管理状态依赖性)并不能抵消在使用时存在的复杂性,因为现在调用者必须做好捕捉异常的准备,并且在每次缓存操作时都需要重试。比如:
while (true ){
try {
V item = buffer.take();
// 对于item执行一些操作
break ;
} catch (BufferEmptyException e) {
Thread. sleep(SLEEP_GRANULARITY );
}
}
这种方法的一种变化形式是,当缓存处于某种错误的状态时返回一个错误值。这是一种改进,因为并没有放弃异常机制,抛出的异常意味着“对不起,请再试一次”,但这种方法并没有解决根本问题:调用者必须自行处理前提条件失败的情况。
除此之外,调用者可以不进入休眠状态,而直接重新调用take方法,这种方法被称为忙等或者自旋等待。调用者也可以sleep一段时间。除了忙等待和休眠之外,还有一种选择就是调用Thread.yield,这相当于给调度器一个提示:现在需要让出一定的时间使另一个线程运行。假如正在等待另一个线程执行工作,那么如果选择让出处理器而不是消耗整个CPU调度时间片,那么可以让整体的执行过程变快。
条件队列
条件队列就好像烤面包机中通知“面包已烤好”的铃声。条件队列中的元素是一个个正在等待相关条件的线程。
正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互关联的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。
Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获得锁。从直观上来理解,调用wait意味着“我要去休息了,但当发生特定的事情的时候唤醒我”,而调用通知方法就意味着“特定额事情发生了”。
下面的程序中使用wait、notifyAll来实现一个有界缓存。这比使用“休眠”的有界缓存更简单,而且高效,响应性也更高。但要注意:与使用“休眠”的有界缓存相比,条件队列并没有改变原来的语义,它只是在过个方面进行了优化:CPU效率、上下文切换开销和响应性等。但条件队列使得在表达和管理状态依赖性时更加简单和高效。
@ ThreadSafe
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
// 条件谓词:not-full (!isFull())
// 条件谓词:not-empty (!isEmpty())
public BoundedBuffer( int size) {
super (size);
}
// 阻塞并直道:not-full
public synchronized void put(V v) throws InterruptedException{
while (isFull()){
wait();
}
doPut(v);
notifyAll();
}
// 阻塞并直道:not-empty
public synchronized V take() throws InterruptedException{
while (isEmpty()){
wait();
}
V v = doTake();
notifyAll();
return v;
}
}
使用条件队列
条件谓词
要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。 没有条件谓词,条件等待机制就无法发挥作用。比如说“缓存不能为空”。
将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
过早唤醒
内置条件队列中有多个条件谓语,此时如果调用notifyAll其含义是通知所有wait,但是并不一定所有条件谓语都满足执行条件。也就是说不该被通知的wait也被通知了。基于这个原因,每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真就继续等待(或者失败)。所以最好在循环中调用wait。
当使用条件等待时(例如Object.wait或Condition.await):
- 通常都有一个条件谓词--包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
- 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
- 在一个循环中调用wait。
- 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
- 当调用wait、notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
- 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。
丢失的信号
已经满足通知的条件发出通知,但是之后才进入阻塞wait状态,所以wait永远等不到在其前面发出的notify。如果没有在调用wait之前检测条件谓词就会导致信号的丢失。
通知
每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。
调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而nitifyAll是唤醒所有在这个条件队列上等待的线程。由于调用wait和nitify等方法都是要加锁的,所以为了wait方法能够执行,需要调用nitify方法后尽快释放锁。
只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:
- 所有等待线程的类型都相同。只有一个条件谓词与队列相关,并且每个线程在从wait返回后将执行相同的操作。
- 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。
示例:阀门值
使用wait和notifyAll来实现可重新关闭的阀门。
@ThreadSafe
public class ThreadGate {
// 条件谓词:opened-since(n) (isOpen || generation>n)
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {
isOpen = false;
}
public sychronized void open() {
++generation;
isOpen = true;
notifyAll();
}
// 阻塞并直到:opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while(!isOpen && arrivalGeneration == generation)
wait();
}
}
在await中使用的条件谓词比测试isOpen复杂的多。这种条件谓词是必须的,因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放:当所有线程收到通知时,将重新请求锁并退出wait,而此时的阀门可能已经再次关闭了。因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门关闭时,递增一个“Generation”计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。
由于ThreadGate只支持等待打开阀门,因此它只在open中执行通知。要想既支持“等待打开”又支持“等待关闭”,那么ThreadGate必须在open和close中都进行通知。这很好地说明了为什么在维护状态依赖的类时是非常困难的——当增加一个新的状态依赖操作时,可能需要对多条修改对象的代码路径进行改动,才能正确地执行通知。
子类的安全问题
在使用条件通知或单次通知时,一些约束条件使得子类化过程变得更加复杂。要想支持子类化,那么在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以添加合适的通知机制来代表基类。
对于状态依赖的类,那么将其等待和通知协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中。当设计一个可被继承的状态依赖类时,至少需要公开条件队列和锁,并且将条件谓词和同步策略都写入文档。此外,还可能需要公开一些底层的状态变量。
另外一种选择就是完全禁止子类化,例如将类声明为final类型,或者将条件队列、锁和状态变量等隐藏起来,使子类看不见他们。
封装条件队列
通常,我们应该把条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方访问它。否则,调用者会自以为理解了在等待和通知上使用的协议,并且采用一种违背设计的方式来使用条件队列。
不幸的是,这条建议——将条件队列对象封装起来,与线程安全类的最常见设计模式并不一致,在这种模式中建议使用对象的内置锁来保护对象自身的状态。
入口协议与出口协议
入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。
在AbstractQueuedSynchronizer(java.util.concurrent包中大多数依赖状态的类都是基于这个类构建的)中使用出口协议。这个类并不是由同步器类执行自己的通知,而是要求同步器方法返回一个值来表示该类的操作是否已经解除了一个或多个等待线程的阻塞。这种明确的API调用需求使得更难以“忘记”在某些状态转换发生时进行通知。
显式的Condition对象
之前提过,在某些情况下,当内置锁过于灵活时,可以使用显式锁。正如Lock是一种广义的内置锁,Condition也是一种广义的内置条件队列。
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,因而在像BoundedBuffer这种累中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。
一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。正如Lock比内置加锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作。
与内置条件队列不同的是,对于每个Lock,可以有任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放。
特别注意:在Condition对象中,与wait、notify和notifyAll方法对应的分别是await、signal和signalAll。但是,Condition对Object进行了扩展,因而它也包含wait和notify方法。一定要确保使用正确的版本——await和signal。
使用显式条件变量的有界缓存:
@ThreadSafe
public class ConditionBoundedBuffer<T> {
protected final Lock = new ReentrantLock();
// 条件谓词:notFull(count<items.length)
private final Condition notFull = lock.newCondition();
// 条件谓词:notEmpty(count>0)
private final Condition notEmpty = lock.newCondtion();
@GuardedBy("lock")
private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock") private int tail,head, count;
// 阻塞并直到:notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while(count==items.length)
notFull.await();
items[tail]=x;
if(++tail==items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
//阻塞并直到:notEmpty
public void take() throws InterruptedException {
lock.lock();
try {
while(count==0)
notEmpty.await();
T x = items[head];
items[head] = null;
if(++head==items.length)
head = 0;
--count;
notFull.signal();
} finally {
lock.unlock();
}
}
}
ConditionBoundedBuffer的行为和BoundedBuffer相同,但它对条件队列的使用方式更容易理解——在分析使用多个Condition的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。通过将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单次通知的需求。signal比signalAll更高效,他能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。
在使用显式的Condition和内置条件队列之间进行选择时,与在ReentrantLock和synchronized之间进行选择时一样的:如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用Condition而不是内置条件队列。
Synchronizer剖析
在ReentrantLock和Semaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过,也可以等待,还可以取消。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。
实施上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReenstrantLock和Semaphore是基于AQS构建的,还有CountDownLatch、ReenstrantReadWriteLock、SynchronousQueue和FutureTask。
AQS解决了在实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。
基于AQS来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题(这是在没有使用AQS来构建同步器时的情况)。在SemaphoreOnLock中,获取许可的操作可能在两个时刻阻塞——当锁保护信号量状态时,以及当许可不可用时。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。
AbstractQueuedSynchronizer
在基于AQS构建的同步器类中,最基本的操纵包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用CountDownLatch时,“获取”操作意味着“等待并直到闭锁到达结束状态”,而在使用FutureTask时,则意味着“等待并直到任务已经完成”。
具体的内容还是查看原书吧!这里就不再赘述了,也可以查看源代码!
java.util.concurrent同步器类中的AQS
java.util.concurrent中的许多可阻塞类,例如ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue和FutureTash等,都是基于AQS搭建的。我们快速地浏览一下每个类是如何使用AQS的,不需要过于地深入了解细节。
ReentrantLock
ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。
当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。
ReentrantLock还利用了AQS对多个条件变量和多个等待线程集的内置支持。
简单介绍,具体可以查看原文或者阅读源码,以下均是。
Semaphore与CountDownLatch
Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子的方式来降低许可的计数。如果这个操作成功,那么就返回一个值表示获取操作成功。
FutureTask
初看,它不是一个同步器,但Future.get的语义非常类似于闭锁的语义——如果发生了某个事件,那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
在FutureTask中,AQS同步状态被用来保存任务的状态,例如:正在运行、已完成或已取消。FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,纸箱正在执行任务的线程,因而如果任务取消,该线程就会中断。
ReentrantReadWriteLock
其中存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法和释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。