Condition的应用 àì夳堔傛蜴生んèń 2022-04-03 16:39 151阅读 0赞 JDK原话,假定有一个绑定的缓冲区,它支持 put 和 take 方法。如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待 set 中保存 put 线程和 take 线程,这样就可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程。可以使用两个 Condition 实例来做到这一点。 这里为什么要写上notFull和notEmpty,而不是直接使用一个变量来进行操作,因为两个notFull和notEmpty分别控制生产线程和消费线程,这样在唤醒的时候就会唤醒其中一类线程,如果只有一个实例的话那会唤醒阻塞队列中的一个线程,那么会让两类线程都进行争夺资源 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //有限缓存使用显示的条件变量, public class ConditionBoundedBuffer<T> { protected final Lock lock = new ReentrantLock(); //条件谓词,notFull(count < items.length) private final Condition notFull = lock.newCondition(); //条件谓词,notEmpty(count > 0) private final Condition notEmpty = lock.newCondition(); private final int BUFFER_SIZE = 10; //基于队列的循环缓存 private final T[] items = (T[])new Object[BUFFER_SIZE]; private int tail,head,count; //阻塞,直到:notFull public void put(T x) throws InterruptedException{ lock.lock(); try{ while(count == items.length){ notFull.await();//这个对象锁是lock,而非是调用该方法的对象了,如果不使用lock.lock(),而直接在该方法上加上synchronized会抛出异常 } items[tail] = x; if(++tail == items.length){ tail = 0; } ++count; notEmpty.signal(); }finally{ lock.unlock(); } } //阻塞,直到:notEmpty public T take() throws InterruptedException{ lock.lock(); try{ while(count == 0){ notEmpty.await(); } T x = items[head]; items[head] = null; if(++head == items.length){ head = 0; } --count; notFull.signal(); return x; }finally{ lock.unlock(); } } } import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //使用lock实现的计数信号量 //这并不是Semaphore的真正实现,而是继承AbstractQueuedSynchronizer实现的 public class SemaphoreOnLock { private final Lock lock = new ReentrantLock(); //条件谓词:permitsAvailable (permits > 0) private final Condition permitsAvailable = lock.newCondition(); private int permits; SemaphoreOnLock(int initialPermits){ lock.lock(); try{ permits = initialPermits; }finally{ lock.unlock(); } } //阻塞,直到:permitsAvailable public void acquire() throws InterruptedException{ lock.lock(); try{ while(permits <= 0){ permitsAvailable.await(); } --permits; }finally{ lock.unlock(); } } public void release(){ lock.lock(); try{ ++permits; permitsAvailable.signal(); }finally{ lock.unlock(); } } } 对SemaphoreOnLock类进行测试 import java.util.concurrent.TimeUnit; public class SemaphoreTest implements Runnable{ SemaphoreOnLock sema = new SemaphoreOnLock(3); public static void main(String[] args) { SemaphoreTest test = new SemaphoreTest(); for(int i=0; i<10; i++){ new Thread(test).start(); } } @Override public void run() { test(); } private void test() { try { sema.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } for(int i=1; i<=3; i++){ System.out.println(Thread.currentThread().getName()+"#执行:"+i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } sema.release(); } /** * 运行结果: * 当前还有3个许可.. 当前还有2个许可.. Thread-2#执行:1 当前还有1个许可.. Thread-3#执行:1 Thread-0#执行:1 Thread-2#执行:2 Thread-0#执行:2 Thread-3#执行:2 Thread-2#执行:3 Thread-0#执行:3 Thread-3#执行:3 当前还有2个许可.. Thread-5#执行:1 当前还有1个许可.. Thread-7#执行:1 当前还有1个许可.. Thread-9#执行:1 Thread-5#执行:2 Thread-7#执行:2 Thread-9#执行:2 */ }
还没有评论,来说两句吧...