/**
* 一个计数信号量。从概念上讲,信号量维护了一个许可集。
* 如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。
* 每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。
* 但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
* Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
* 例如,下面的类使用信号量控制对内容池的访问:
* class Pool {
* private static final int MAX_AVAILABLE = 100;
* private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
* public Object getItem() throws InterruptedException {
* available.acquire();
* return getNextAvailableItem();
* }
* public void putItem(Object x) {
* if (markAsUnused(x))
* available.release();
* }
* // Not a particularly efficient data structure; just for demo
* protected Object[] items = ... whatever kinds of items being managed
* protected boolean[] used = new boolean[MAX_AVAILABLE];
* protected synchronized Object getNextAvailableItem() {
* for (int i = 0; i < MAX_AVAILABLE; ++i) {
* if (!used[i]) {
* used[i] = true;
* return items[i];
* }
* }
* return null; // not reached
* }
* protected synchronized boolean markAsUnused(Object item) {
* for (int i = 0; i < MAX_AVAILABLE; ++i) {
* if (item == items[i]) {
* if (used[i]) {
* used[i] = false;
* return true;
* } else
* return false;
* }
* }
* return false;
* }
* }
* 获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。
* 该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。
*
* 注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。
* 信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
* 将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。
* 这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。
* 按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,
* 而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
* 此类的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。
* 特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,
* 从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true 时,
* 信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)
* 来选择线程、获得许可。
*
* 注意,FIFO 排序必然应用到这些方法内的指定内部执行点。 所以,可能某个线程先于另一个线程调用了 acquire,
* 但是却在该线程之后到达排序点,并且从方法返回时也类似。
*
* 还要注意,非同步的 tryAcquire 方法不使用公平设置,而是使用任意可用的许可。
* 通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。
* 为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
* 此类还提供便捷的方法来同时 acquire 和释放多个许可。小心,在未将公平设置为 true 时使用这些方法会
* 增加不确定延期的风险。
* 内存一致性效果:线程中调用“释放”方法(比如 release())之前的操作 happen-before
* 另一线程中紧跟在成功的“获取”方法(比如 acquire())之后的操作。
*/
public class Semaphore {
private static final long serialVersionUID = -3222578661600680210L;
/**
* 所有机制都通过AbstractQueuedSynchronizer子类实现
*/
private final Sync sync;
/**
* 信号量的同步实现。使用AQS状态表示许可证。子类化为公平和非公平版本。
*
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits); // 设置同步状态的值。
}
/**
* 返回此信号量中当前可用的许可数。
* @return 此信号量中的可用许可数
*/
final int getPermits() {
return getState();
}
/**
* 非公平锁
* 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
* @param acquires 要获取的许可数
* @return 获取给定数目的许可(如果提供了)并立即返回
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); // 返回同步状态的当前值。
int remaining = available - acquires; // 正在同步-要获取的许可数
// 有许可的数量 或者 当前状态值等于期望值(CAS)
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 如果允许释放许可,则返回 true;否则返回 false。
* @param releases 要释放的许可数
* @return 如果允许释放许可,则返回 true;否则返回 false。
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();// 返回同步状态的当前值。
int next = current + releases; // 同步数+释放数
if (next < current) // 溢出
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 当前状态值等于期望值(CAS)
return true;
}
}
/**
* 根据指定的缩减量减小可用许可的数目。
* 此方法在使用信号量来跟踪那些变为不可用资源的子类中很有用。
* @param reduction 要移除的许可数
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState(); // 返回同步状态的当前值。
int next = current - reductions; // 同步数-释放数
if (next > current) // 下溢
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))// 当前状态值等于期望值(CAS)
return;
}
}
/**
* 获取并返回立即可用的所有许可。
* @return 获取的许可数
*/
final int drainPermits() {
for (;;) {
int current = getState(); // 返回同步状态的当前值。
// 同步数为0或者当前状态值等于期望值(CAS)
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* 非公平的版本
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
/**
* 创建具有给定的许可数和非公平设置的 Semaphore。
* @param permits 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
*/
NonfairSync(int permits) {
super(permits);
}
/**
* 非公平
* 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
* @param acquires 要获取的许可数
* @return 获取给定数目的许可(如果提供了)并立即返回
*/
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平的版本
*
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
/**
* 建具有给定的许可数和公平设置的 Semaphore。
* @param permits 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
*/
FairSync(int permits) {
super(permits);
}
/**
* 公平
* 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
* @param acquires 要获取的许可数
* @return 获取给定数目的许可(如果提供了)并立即返回
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 如果在当前线程之前有一个排队的线程
return -1;
int available = getState(); // 返回同步状态的当前值。
int remaining = available - acquires; // 同步数-许可数
if (remaining < 0 || // 同步数多于许可数或者当前状态值等于期望值(CAS)
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* 创建具有给定的许可数和非公平的公平设置的 Semaphore。
* @param permits 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 创建具有给定的许可数和给定的公平设置的 Semaphore。
* @param permits 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
* @param fair 如果此信号量保证在争用时按先进先出的顺序授予许可,则为 true;否则为 false。
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
* 获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。
*
* 如果没有可用的许可,则在发生以下两种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
* 某些其他线程调用此信号量的 release() 方法,并且当前线程是下一个要被分配许可的线程;
* 或者其他某些线程中断当前线程。
*
* 如果当前线程:
* 被此方法将其已中断状态设置为 on ;
* 或者 在等待许可时被中断。
* 则抛出 InterruptedException,并且清除当前线程的已中断状态。
* @throws InterruptedException 如果当前线程被中断
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 从此信号量中获取许可,在有可用的许可前将其阻塞。
* 获取一个许可(如果提供了一个)并立即返回,将可用的允许数减 1。
*
* 如果没有可用的许可,则在其他某些线程调用此信号量的 release() 方法,
* 并且当前线程是下一个要被分配许可的线程前,禁止当前线程用于线程安排目的并使其处于休眠状态。
*
* 如果当前线程在等待许可时被中断,那么它将继续等待,但是与没有发生中断,
* 其将接收允许的时间相比,为该线程分配许可的时间可能改变。
* 当线程确实从此方法返回后,将设置其中断状态。
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
* 获取一个许可(如果提供了一个)并立即返回,其值为 true,将可用的许可数减 1。
*
* 如果没有可用的许可,则此方法立即返回并且值为 false。
* 即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire() 也将 立即获取许可(如果有一个可用),
* 而不管当前是否有正在等待的线程。在某些情况下,此“闯入”行为可能很有用,即使它会打破公平性也如此。
*
* 如果希望遵守公平设置,则使用 tryAcquire(0, TimeUnit.SECONDS) ,它几乎是等效的(它也检测中断)。
* @return 如果获取了许可,则返回 true;否则返回 false。
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
* 获取一个许可(如果提供了一个)并立即返回,其值为 true,将可用的许可数减 1。
*
* 如果没有可用的允许,则在发生以下三种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
* 其他某些线程调用此信号量的 release() 方法并且当前线程是下一个被分配许可的线程;
* 或者其他某些线程中断当前线程;
* 或者已超出指定的等待时间。
*
* 如果获取了许可,则返回值为 true。
*
* 如果当前线程:
* 被此方法将其已中断状态设置为 on ;
* 或者在等待获取许可的同时被中断。
* 则抛出 InterruptedException,并且清除当前线程的已中断状态。
*
* 如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于 0,则方法根本不等待。
* @param timeout 等待许可的最多时间
* @param unit timeout 参数的时间单位
* @return 如果获取了许可,则返回 true;如果获取许可前超出了等待时间,则返回 false
* @throws InterruptedException 如果当前线程是已中断的
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 释放一个许可,将其返回给信号量。
* 释放一个许可,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。
* 然后针对线程安排目的启用(或再启用)该线程。
* 不要求释放许可的线程必须通过调用 acquire() 来获取许可。通过应用程序中的编程约定来建立信号量的正确用法。
*/
public void release() {
sync.releaseShared(1);
}
/**
* 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
* 获取给定数目的许可(如果提供了)并立即返回,将可用的许可数减去给定的量。
*
* 如果没有足够的可用许可,则在发生以下两种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
* 其他某些线程调用此信号量的某个释放方法,当前线程是下一个被分配允许的线程并且可用许可的数目满足此请求;
* 或者其他某些线程中断当前线程。
*
* 如果当前线程:
* 被此方法将其已中断状态设置为 on ;
* 或者在等待许可时被中断。
* 则抛出 InterruptedException,并且清除当前线程的已中断状态。
*
* 任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,
* 就好像已通过调用 release() 而使许可可用一样。
* @param permits 要获取的许可数
* @throws InterruptedException 如果当前线程已被中断
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException(); // 如果 permits 为负
sync.acquireSharedInterruptibly(permits);
}
/**
* 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
* 获取给定数目的许可(如果提供了)并立即返回,将可用的许可数减去给定的量。
*
* 如果没有足够的可用许可,则在其他某些线程调用此信号量的某个释放方法,
* 当前线程是下一个要被分配许可的线程,并且可用的许可数目满足此请求前,
* 禁止当前线程用于线程安排目的并使其处于休眠状态。
*
* 如果当前的线程在等待许可时被中断,则它会继续等待并且它在队列中的位置不受影响。
* 当线程确实从此方法返回后,将其设置为中断状态。
* @param permits 要获取的许可数
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException(); // 如果 permits 为负
sync.acquireShared(permits);
}
/**
* 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
* 获取给定数目的许可(如果提供了)并立即返回,其值为 true,将可用的许可数减去给定的量。
*
* 如果没有足够的可用许可,则此方法立即返回,其值为 false,并且不改变可用的许可数。
* 即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire 也将 立即获取许可(如果有一个可用),
* 而不管当前是否有正在等待的线程。在某些情况下,此“闯入”行为可能很有用,即使它会打破公平性也如此。
*
* 如果希望遵守公平设置,则使用 tryAcquire(permits, 0, TimeUnit.SECONDS) ,它几乎是等效的(它也检测中断)。
* @param permits 要获取的许可数
* @return 如果获取了许可,则返回 true;否则返回 false
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException(); // 如果 permits 为负
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
* 获取给定数目的许可(如果提供了)并立即返回,其值为 true,将可用的许可数减去给定的量。
*
* 如果没有足够的可用许可,则在发生以下三种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
* 其他某些线程调用此信号量的某个释放方法,当前线程是下一个被分配许可的线程,并且可用许可的数目满足此请求;
* 或者其他某些线程中断当前线程;
* 或者已超出指定的等待时间。
*
* 如果获取了许可,则返回值为 true。
*
* 如果当前线程:
* 被此方法将其已中断状态设置为 on ;
* 或者在等待获取允许的同时被中断。
* 则抛出 InterruptedException,并且清除当前线程的已中断状态。
* 任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,
* 就好像已通过调用 release() 而使许可可用一样。
*
* 如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于 0,则方法根本不等待。
* 任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,
* 就好像已通过调用 release() 而使许可可用一样。
* @param permits 要获取的许可数
* @param timeout 等待许可的最多时间
* @param unit timeout 参数的时间单位
* @return 如果获取了许可,则返回 true;如果获取所有许可前超出了等待时间,则返回 false
* @throws InterruptedException 如果当前线程是已中断的
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException(); // 如果 permits 为负
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* 释放给定数目的许可,将其返回到信号量。
* 释放给定数目的许可,将可用的许可数增加该量。
*
* 如果任意线程试图获取许可,则选中某个线程并将刚刚释放的许可给予该线程。
*
* 如果可用许可的数目满足该线程的请求,则针对线程安排目的启用(或再启用)该线程;
* 否则在有足够的可用许可前线程将一直等待。如果满足此线程的请求后仍有可用的许可,
* 则依次将这些许可分配给试图获取许可的其他线程。
* 不要求释放许可的线程必须通过调用获取来获取该许可。通过应用程序中的编程约定来建立信号量的正确用法。
* @param permits 要释放的许可数
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException(); // 如果 permits 为负
sync.releaseShared(permits);
}
/**
* 返回此信号量中当前可用的许可数。
* 此方法通常用于调试和测试目的。
* @return 此信号量中的可用许可数
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* 获取并返回立即可用的所有许可。
* @return 获取的许可数
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* 根据指定的缩减量减小可用许可的数目。此方法在使用信号量来跟踪那些变为不可用资源的子类中很有用。
* 此方法不同于 acquire,在许可变为可用的过程中,它不会阻塞等待。
* @param reduction 要移除的许可数
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException(); // 如果 reduction 是负数
sync.reducePermits(reduction);
}
/**
* 如果此信号量的公平设置为 true,则返回 true。
* @return 如果此信号量的公平设置为 true,则返回 true
*/
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* 查询是否有线程正在等待获取。注意,因为同时可能发生取消,
* 所以返回 true 并不保证有其他线程等待获取许可。此方法主要用于监视系统状态。
* @return 如果可能有其他线程正在等待获取锁,则返回 true
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 返回正在等待获取的线程的估计数目。该值仅是估计的数字,
* 因为在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。
* 此方法用于监视系统状态,不用于同步控制。
* @return 正在等待此锁的线程的估计数目
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* 返回一个 collection,包含可能等待获取的线程。因为在构造此结果的同时实际的线程 set 可能动态地变化,
* 所以返回的 collection 仅是尽力的估计值。所返回 collection 中的元素没有特定的顺序。
* 此方法用于加快子类的构造速度,提供更多的监视设施。
* @return 线程 collection
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
/**
* 返回标识此信号量的字符串,以及信号量的状态。括号中的状态包括 String 类型的 "Permits =",后跟许可数。
* 覆盖:类 Object 中的 toString
* @return 标识此信号量的字符串,以及信号量的状态
*/
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
还没有评论,来说两句吧...