区块链技术博客
www.b2bchain.cn

【Java并发编程实战】5.5.4 栅栏 CyclicBarrier

这篇文章主要介绍了【Java并发编程实战】5.5.4 栅栏 CyclicBarrier的讲解,通过具体代码实例进行17699 讲解,并且分析了【Java并发编程实战】5.5.4 栅栏 CyclicBarrier的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=17699

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

文章目录

    • 1. 什么是栅栏
    • 2. CyclicBarrier的使用
      • 2.1 构造方法
      • 2.2 重要方法
      • 2.3 基本使用
    • 3. CyclicBarrier 使用场景
    • 4. CyclicBarrier源码解析
      • 4.1 构造函数
      • 4.2 await()方法
      • 4.3 其他相关方法和成员变量解读

1. 什么是栅栏

栅栏(Barrier)类似于闭锁,他能阻塞一组线程直到某个事件发生后再全部同时执行。CyclicBarrier 字面意思是回环栅栏,回环的意思是它能够被重复利用,当然前提是在所有线程释放了以后。

举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。

2. CyclicBarrier的使用

2.1 构造方法

public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction) 

解析:

  • parties 是参与线程的个数
  • 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务

2.2 重要方法

public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 

解析:

线程调用 await() 表示自己已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

2.3 基本使用

我们将展示基本用法,演示一组线程需要等待所有线程到达执行点后再继续执行,并且可以重复进入执行点:

import java.util.Random; import java.util.concurrent.CyclicBarrier;  public class CyclicBarrierDemo {     static class TaskThread extends Thread {          CyclicBarrier barrier;          public TaskThread(CyclicBarrier barrier) {             this.barrier = barrier;         }          @Override         public void run() {             try {                 Thread.sleep(new Random().nextInt(5000)); // 模拟线程在不同时刻到执行点(栅栏)                 System.out.println(getName() + " 到达栅栏 A");                 barrier.await(); // 阻塞                 System.out.println(getName() + " 冲破栅栏 A");                  Thread.sleep(new Random().nextInt(5000));                 System.out.println(getName() + " 到达栅栏 B");                 barrier.await(); // 重复进入执行点,并阻塞                 System.out.println(getName() + " 冲破栅栏 B");             } catch (Exception e) {                 e.printStackTrace();             }         }     }      public static void main(String[] args) {         // 创建线程数         int threadNum = 5;         // 线程数和满足冲破栅栏的线程相同,方便理解         CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {              @Override             public void run() {                 // 当最后一个线程进入栅栏时,触发冲破栅栏,会额外执行run方法体                 System.out.println(Thread.currentThread().getName() + " 完成最后任务");             }         });          for (int i = 0; i < threadNum; i++) {             new TaskThread(barrier).start();         }     }  }  

结果分析:

Thread-2 到达栅栏 A Thread-3 到达栅栏 A Thread-4 到达栅栏 A Thread-0 到达栅栏 A Thread-1 到达栅栏 A      //`5个线程先后到达栅栏处阻塞` Thread-1 完成最后任务    //`最后到达的是 Thread-1,并触发栅栏的run()方法体` Thread-1 冲破栅栏 A      //`集体冲破栅栏` Thread-2 冲破栅栏 A Thread-3 冲破栅栏 A Thread-4 冲破栅栏 A Thread-0 冲破栅栏 A Thread-3 到达栅栏 B     //`再次到达栅栏处阻塞,重复之前的逻辑` Thread-0 到达栅栏 B Thread-1 到达栅栏 B Thread-4 到达栅栏 B Thread-2 到达栅栏 B Thread-2 完成最后任务   //`最后到达的是 Thread-2,并触发栅栏的run()方法体` Thread-2 冲破栅栏 B     //`集体冲破栅栏` Thread-3 冲破栅栏 B Thread-0 冲破栅栏 B Thread-1 冲破栅栏 B Thread-4 冲破栅栏 B  

从打印结果可以看出,所有线程会等待全部线程到达栅栏之后才会继续执行,并且最后到达的线程会完成 Runnable 的任务。

3. CyclicBarrier 使用场景

可以用于多线程计算数据,最后合并计算结果的场景。

4. CyclicBarrier源码解析

4.1 构造函数

CyclicBarrier的类图如下:
【Java并发编程实战】5.5.4 栅栏 CyclicBarrier
通过类图我们可以看到,CyclicBarrier内部使用了ReentrantLock和Condition两个类,它有两个构造函数:

public CyclicBarrier(int parties) {     this(parties, null); }   public CyclicBarrier(int parties, Runnable barrierAction) {     if (parties <= 0) throw new IllegalArgumentException();     this.parties = parties;     this.count = parties;     this.barrierCommand = barrierAction; } 
  • CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数parties表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    parties表示如果想突破栅栏,进入阻塞状态的线程的个数必须达到parties定义的值,这就产生一个很有意思的问题,当parties值大于线程总数或小于线程总数时,是什么情况呢?可以参考《栅栏 CyclicBarrier的构造函数入参parties详解》

  • CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

    参考前文的例子,当触发冲破屏障的线程,也就是最后一个达到栅栏处的线程会触发barrierAction。

4.2 await()方法

调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:

public int await() throws InterruptedException, BrokenBarrierException {     try {         // 不超时等待,如果线程不够parties,会一直等待         return dowait(false, 0L);     } catch (TimeoutException toe) {         throw new Error(toe); // cannot happen     } } 
public int await(long timeout, TimeUnit unit)     throws InterruptedException,             BrokenBarrierException,             TimeoutException {     // 超时等待,如果线程不够parties,超时后会退出等待     return dowait(true, unit.toNanos(timeout)); } 

这两个方法最终都会调用dowait(boolean, long)方法,它也是CyclicBarrier的核心方法,该方法定义如下:

private int dowait(boolean timed, long nanos)     throws InterruptedException, BrokenBarrierException,             TimeoutException {     // 获取独占锁     final ReentrantLock lock = this.lock;     lock.lock();     try {         // 当前代         final Generation g = generation;         // 如果这代损坏了,抛出异常         if (g.broken)             throw new BrokenBarrierException();           // 如果线程中断了,抛出异常         if (Thread.interrupted()) {             // 将损坏状态设置为true             // 并通知其他阻塞在此栅栏上的线程             breakBarrier();             throw new InterruptedException();         }           // 获取下标         int index = --count;         //核心代码, 如果是 0,说明最后一个线程调用了该方法         if (index == 0) {  // tripped             boolean ranAction = false;             try {                 final Runnable command = barrierCommand;                 // 执行栅栏任务,构造删除传入的                 if (command != null)                     command.run();                 ranAction = true;                 // 更新一代,将count重置,将generation重置                 // 唤醒之前等待的线程                 nextGeneration();                 return 0;             } finally {                 // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true                 if (!ranAction)                     breakBarrier();             }         }           // loop until tripped, broken, interrupted, or timed out         for (;;) {             try {                  // 如果没有时间限制,则直接等待,直到被唤醒                 if (!timed)                     trip.await();                 // 如果有时间限制,则等待指定时间                 else if (nanos > 0L)                     nanos = trip.awaitNanos(nanos);             } catch (InterruptedException ie) {                 // 当前代没有损坏                 if (g == generation && ! g.broken) {                     // 让栅栏失效                     breakBarrier();                     throw ie;                 } else {                     // 上面条件不满足,说明这个线程不是这代的                     // 就不会影响当前这代栅栏的执行,所以,就打个中断标记                     Thread.currentThread().interrupt();                 }             }               // 当有任何一个线程中断了,就会调用breakBarrier方法             // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常             if (g.broken)                 throw new BrokenBarrierException();               // g != generation表示正常换代了,返回当前线程所在栅栏的下标             // 如果 g == generation,说明还没有换代,那为什么会醒了?             // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。             // 正是因为这个原因,才需要generation来保证正确。             if (g != generation)                 return index;                          // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常             if (timed && nanos <= 0L) {                 breakBarrier();                 throw new TimeoutException();             }         }     } finally {         // 释放独占锁         lock.unlock();     } } 

dowait(boolean, long)方法的主要逻辑处理比较简单,如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:

  • 最后一个线程到达,即index == 0
    调用nextGeneration()方法,冲破栅栏,并唤醒等待的线程
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态

4.3 其他相关方法和成员变量解读

Generation 类,CyclicBarrier的静态内部类,拥有broken 属性,表示barrier的状态:

     /**          * barrier每一次使用都代表了一个generation实例.          * 当barrier发生trip或者reset时,对应的generation会发生改变.          * 由于非确定性,锁可能会分配给等待线程,因此可能会存在许多和使用barrier的线程相关的generation.          * 但是每次只能**这些线程中的一个(使用计数的那个),并且其他的线程要么broken要么trip.          * 如果出现了一个暂停,但并未reset,则不需要一个**的generation.          */      private static class Generation {             boolean broken = false;      } 
          //barrier入口的保护锁     private final ReentrantLock lock = new ReentrantLock();      //trip前的等待条件     private final Condition trip = lock.newCondition();      //同时使用这个barrier的线程个数     private final int parties;      //当trip时,barrier定义的屏障操作.     private final Runnable barrierCommand;      //当前generation     private Generation generation = new Generation();      //等待线程数.每一个generation上,count值都会从parties降为0.     //针对每一个新的generation或者当前barrier被broken时,count值都会被reset.     private int count;       

nextGeneration()方法表示冲破栅栏,重置栅栏状态,并唤醒所有其他等待的线程:

//当barrier发生trip时,用于更新状态并唤醒每一个线程. //这一方法只在持有lock时被调用.  private void nextGeneration() {      // signal completion of last generation      //标志最后一个generation完成.      trip.signalAll();      // set up next generation      //为下一个generation赋初值      count = parties;      generation = new Generation();  } 

breakBarrier()方法表示栅栏需要被提前打开要做的事情,比如超时会触发该方法:

 //为当前barrier的generation类的唯一变量broken赋值,并唤醒所有线程.  //这一方法只在持有lock时被调用.  private void breakBarrier() {      generation.broken = true;      count = parties;      trip.signalAll();//唤醒所有线程.  } 

参考:

《Java并发编程之CyclicBarrier详解》 参考源码解析
《CyclicBarrier源码分析-java8》

《CyclicBarrier 使用详解》 参考可重入的例子

本文转自互联网,侵权联系删除【Java并发编程实战】5.5.4 栅栏 CyclicBarrier

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 【Java并发编程实战】5.5.4 栅栏 CyclicBarrier
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们