《Java并发编程实战》学习笔记(3)

ゝ一世哀愁。 2023-07-08 07:59 32阅读 0赞

第五章:构建块

平台类库包含了一个并发构建块的丰富集合,比如线程安全容器和多种同步工具(Synchronizer)。

Synchronizer用来调节相互协作的线程间的控制流。

同步容器

同步容器类包括两部分,一个 是VectorHashTable,它们是早期JDK的一部分;另一个是它们的同系容器,在JDK1.2中才被加入的同步包装(wrapper)类。

这些类是由Collections.synchronizedXxx工厂方法创建的。

这些类通过封装它们的状态,并对每一个公共方法进行同步而实现了线程安全,这样一次只有一个线程能访问容器的状态。

同步容器中出现的问题

同步容器都是线程安全的。但是对于复合操作,有时你可能需要使用额外的客户端加锁(client-side locking)进行保护。

通常对容器的复合操作包括:迭代(反复获取元素,直到获得容器中的最后一个元素)、导航(根据一定的顺序寻找下一个元素)以及条件运算,比如“缺少即加入”(put-if-absent),检查Map中是否存在关键字K,如果没有,就加入mapping (K,V)。

在一个同步的容器中,这些复合操作即使没有客户端加锁的保护,技术上也是线程安全的,但是当其他线程能够并发修改容器的时候,它们就可能不会按照你期望的方式工作了。

  1. /** * 操作Vector的复合操作可能导致混乱的结果 */
  2. public static Object getLast(Vector list) {
  3. int lastIndex = list.size() - 1;
  4. return list.get(lastIndex);
  5. }
  6. public static void deleteLast(Vector list) {
  7. int lastIndex = list.size() - 1;
  8. list.remove(lastIndex);
  9. }

在这里插入图片描述

  1. /** * 使用客户端加锁,对Vector进行复合操作 */
  2. public static Object getLast(Vector list) {
  3. synchronized (list) {
  4. int lastIndex = list.size() - 1;
  5. return list.get(lastIndex) ;
  6. }
  7. }
  8. public static void deleteLast(Vector list) {
  9. synchronized (list) {
  10. int lastIndex = list.size() - 1;
  11. list.remove(lastIndex);
  12. }
  13. }

并发容器

Java5.0通过提供几种并发的容器类来改进同步容器。并发容器是为多线程并发访问而设计的。

  • Java 5.0添加了ConcurrentHashMap,来替代同步的哈希Map实现
  • 当多数操作为读取操作时,CopyOnWriteArrayList来替代List相应的同步实现

新的ConcurrentMap接口加入了对常见复合操作的支持,比如“缺少即加入(put-if-absent) ”、替换和条件删除。

用并发容器替换同步容器,这种作法以有很小风险带来了可扩展性显著的提高。

Java 5.0同样添加了两个新的容器类型:QueueBlockingQueue

  • Queue用来临时保存正在等待被进一步处理的一系列元素。基于Queue,有一系列的具体实现。
  • BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作。阻塞队列在生产者消费者设计中非常有用。

正像ConcurrentHashMap作为同步的哈希Map的一个并发替代品,Java 6加入了ConcurrentSkipListMapConcurrentSkipListset,用来作为同步的SortedMapSortedSet的并发替代品(比如用synchronizedMap包装的TreeMapTreeSet)。

ConcurrentHashMap

ConcurrentHashMapHashMap一样是一个哈希表,但是它使用完全不同的锁策略,可以提供更好的并发性和可伸缩性。

ConcurrentHashMap使用一个更加细化的锁机制,名叫分离锁。这个机制允许更深层次的共享访问。任意数量的读线程可以并发访问Map,读者和写者也可以并发访问Map,并且有限数量的写线程还可以并发修改Map。结果是,为并发访问带来更高的吞吐量,同时几乎没有损失单个线程访问的性能。

阻塞队列和生产者-消费者模式

阻塞队列(BlockingQueue)提供了可阻塞的puttake方法,它们与可定时的offerpoll是等价的。

阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“识别需要完成的工作”和“执行工作”。该模式不会发现一个工作便立即处理,而是把工作置入一个任务(“to do”)清单中,以备后期处理。

生产者和消费者以不同的或者变化的速度生产和消费着数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。

双端队列和工作窃取

Java 6同样新增了两个容器类型,Deque(发音是deck)和BlockingDeque,它们分别扩展了QueueBlockingQueue

Deque是一个双端队列,允许高效地在头和尾分别进行插入和移除。实现它们的是ArrayDequeLinkedBlockingDeque

正如阻塞队列适用于生产者-消费者模式一样,双端队列使它们自身与一种叫做工作窃取(work stealing) 的模式相关联。

一个消费者生产者设计中,所有的消费者只共享一个工作队列;在窃取工作的设计中,每一个消费者都有一个自己的双端队列。如果一个消费者完成了自己双端队列中的全部工作,它可以偷取其他消费者的双端队列中的末尾任务。

因为工作者线程并不会竞争一个共享的任务队列,所以窃取工作模试比传统的生产者-消费者设计有更佳的可伸缩性;大多数时候它们访问自己的双端队列,减少竞争。

当一个工作者必须要访问另一个队列时, 它会从尾部截取,而不是从头部,从而进一步降低对双端队列的争夺。

Synchronizer

Synchronizer是一个对象,它根据本身的状态调节线程的控制流。

阻塞队列可以扮演一个Synchronizer的角色;其他类型的Synchronizer包括信号量( semaphore)、关卡( barrier)以及闭锁(latch)。

在平台类库中存在一些Synchronizer类;如果这些不能满足你的需要,你同样可以按照第14章里描述的那样,创建一个你自己的Synchronizer

所有Synchronizer都享有类似的结构特性:它们封装状态,而这些状态决定着线程执行到在某一点时是通过还是被迫等待;它们还提供操控状态的方法,以及高效地等待Synchronizer进入到期望状态的方法。

闭锁

闭锁(latch)是一种Synchronizer,它可以延迟线程的进度直到线程到达终止(terminal)状态。

一个闭锁工作起来就像一道大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,门开了,允许所有线程都通过。

闭锁可以用来确保特定活动直到其他的活动完成后才发生,比如:

  • 确保一个计算不会执行,直到它需要的资源被初始化。一个二元闭锁(两个状态)可以用来表达“资源R已经被初始化”,并且所有需要用到R的活动首先都要在闭锁中等待。
  • 确保一个服务不会开始,直到它依赖的其他服务都已经开始。每一个服务会包含一个相关的二元闭锁;开启服务S会首先开始等待闭锁s中所依赖的其他服务, 在启动结束后,会释放闭锁S,这样所有依赖S的服务也可以开始处理了。
  • 等待,直到活动的所有部分都为继续处理作好充分准备,比如在多玩家的游戏中的所有玩家是否都准备就绪。这样的闭锁会在所有玩家准备就绪时,达到终点状态。

CountDownlatch是一个灵活的闭锁实现,用于上述各种情况;允许一个或多个线程等待一个事件集的发生。

闭锁的状态包括一个计数器, 初始化为一个正数,用来表现需要等待的事件数。

countDown方法对计数器做减操作,表示一个事件已经发生了,而await方法等待计数器达到零,此时所有需要等待的事件都已发生。如果计数器入口时值为非零,await方法会一直阻塞直到计数器为零,或者等待线程中断以及超时。

FutureTask

FutureTask同样可以作为闭锁。(FutureTask的实现描述了一个抽象的可携带结果的计算)。

FutureTask的计算是通过Callable实现的,它等价于一个可携带结果的Runnable,并且有3个状态:等待、运行和完成。

完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。一旦FutureTask进入完成状态,它会永远停止在这个状态上。

Future.get的行为依赖于任务的状态。如果它已经完成,get可以立刻得到返回结果,否则会被阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

FutureTask把计算的结果从运行计算的线程传送到需要这个结果的线程:FutureTask的规约保证了这种传递建立在结果的安全发布基础之上。

  1. /** * 使用FutureTask预载稍后需要的数据 */
  2. public class Preloader {
  3. private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
  4. public ProductInfo call() throws DataLoadException {
  5. return loadProductInfo();
  6. }
  7. });
  8. private final Thread thread = new Thread(future);
  9. public void start() {
  10. thread.start();
  11. }
  12. public ProductInfo get() throws DataLoadException, InterruptedException {
  13. try {
  14. return future.get();
  15. } catch (ExecutionException e) {
  16. Throwable cause = e.getCause();
  17. if (cause instanceof DataLoadException)
  18. throw (DataLoadException) cause;
  19. else
  20. throw launderThrowable(cause);
  21. }
  22. }
  23. }

关卡

关卡(barrier)类似于闭锁,它们都能够阻塞一组线程, 直到某些事件发生。

其中关卡与闭锁关键的不同在于,所有线程必须同时到达关卡点,才能继续处理。

闭锁等待的是事件;关卡等待的是其他线程。

关卡实现的协议,就像一些家庭成员指定商场中的集合地点:”我们每个人6:00在麦当劳见,到了以后不见不散,之后我们再决定接下来做什么。“

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点,这在并行迭代算法中非常有用,这个算法会把一个问题拆分成一系列相互独立的子问题。

当线程到达关卡点时,调用await,await会被阻塞,直到所有线程都到达关卡点。如果所有线程都到达了关卡点,关卡就被成功地突破,这样所有线程都被释放,关卡会重置以备下一次使用。

如果对await的调用超时,或者阻塞中的线程被中断,那么关卡就被认为是失败的,所有对await未完成的调用都通过BrokenBarrierException终止。

如果成功地通过关卡,await为每一个线程返回一个唯一的到达索引号, 可以用它来“选举”产生一个领导, 在下一次迭代中承担一些特殊工作。

  1. /** * 在一个细胞的 自动系统中用CyclicBarrier协调计箅 */
  2. public class CellularAutomata {
  3. private final Board mainBoard;
  4. private final CyclicBarrier barrier;
  5. private final Worker[] workers;
  6. public CellularAutomata(Board board) {
  7. this.mainBoard = board;
  8. int count = Runtime.getRuntime().availableProcessors();
  9. this.barrier = new CyclicBarrier(count, () -> mainBoard.commitNewValues());
  10. this.workers = new Worker[count];
  11. for (int i = 0; i < count; i++) {
  12. workers[i] = new Worker(mainBoard.getSubBoard(count, i));
  13. }
  14. }
  15. private class Worker implements Runnable {
  16. private final Board board;
  17. public Worker(Board board) {
  18. this.board = board;
  19. }
  20. @Override
  21. public void run() {
  22. while (!board.hasConverged()) {
  23. for (int x = 0; x < board.getMaxX(); x++) {
  24. for (int y = 0; y < board.getMaxY(); y++) {
  25. board.setNewValue(x, y, computeValue(x, y));
  26. }
  27. }
  28. try {
  29. barrier.await();
  30. } catch (InterruptedException ex) {
  31. return;
  32. } catch (BrokenBarrierException ex) {
  33. return;
  34. }
  35. }
  36. }
  37. public void start() {
  38. for (int i = 0; i < workers.length; i++) {
  39. new Thread(workers[i]).start();
  40. }
  41. mainBoard.waitForConvergence();
  42. }
  43. }
  44. }

发表评论

表情:
评论列表 (有 0 条评论,32人围观)

还没有评论,来说两句吧...

相关阅读

    相关 java并发编程笔记day3

    第四章 组合对象 4.1 设计线程安全的类 在没有进行全局检查的情况下,封装能保证线程的安全性。 设计线程安全类的过程包括一下三个基本要素:

    相关 JAVA并发编程学习笔记

    线程安全: 就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据