欢迎光临seo外链专员,seo网络推广专员网站

17.多线程总结(四)-并发工具类-CyclicBarrier(栅栏)的使用及源码探秘

作者:jcmp      发布时间:2021-05-06      浏览量:0
CyclicBarrier同样也是并发编

CyclicBarrier同样也是并发编程中使用很重要的一个并发工具类,它和CountdownLatch有相似的地方,但是又有不同。CyclicBarrier允许一组线程相互等待达到一个共同的点位,然后继续执行。举个例子,在100米跑比赛中,我们假设每一个参赛选手都是一个线程,在开始比赛时,需要等所有选手都到达起跑点时,才能开跑,哪怕有一个没有到达,其他的都要等待他。

和CountdownLatch的区别: CountdownLatch一般用于某个线程等待若干其他线程执行完任务,它再执行,不可重复用 CyclicBarrier一般用于一组线程相互等待至某个状态,然后这一组线程再同时执行,可重用。

public class TestCyclicBarrier { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(8); for (int i = 0; i < 8; i++) { int finalI = i; new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"就位"); try { Thread.sleep(finalI *1000); } catch (InterruptedException e) { e.printStackTrace(); } try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("全部就位,"+Thread.currentThread().getName()+"起跑"); } }).start(); } }}打印结果:Thread-0就位Thread-2就位Thread-1就位Thread-3就位Thread-4就位Thread-5就位Thread-6就位Thread-7就位全部就位,Thread-7起跑全部就位,Thread-0起跑全部就位,Thread-3起跑全部就位,Thread-2起跑全部就位,Thread-5起跑全部就位,Thread-6起跑全部就位,Thread-1起跑全部就位,Thread-4起跑。

一、源码

CyclicBarrier(int parties)

构造方法很简单,传入了一个int值parties,保存为成员变量 this.parties 和 this.count ,后边会发现主要是用的是count这个值,parties用于保存现场,在执行完成调用nextGeneration()方法的时候,将一切恢复到最初的数据,这就是为什么说CyclicBarrier可重用的原因。

public CyclicBarrier(int parties) { this(parties, null); }

它还有一个构造方法,可以传入一个Runnable,如果传入的话,这个Runnable会在所有线程到到指定的状态的时候开始运行。

public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

二、await()

await有两个重载方法,可以指定等待的时间,到达时间之后不再进行等待,内部是通过awaitNanos方法实现的。

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }

CyclicBarrier的核心实现就是在dowait方法中,和ReentrantLock相似,它的内部也是通过维护一个计数值来判断是否达到唤醒线程的条件的,我们来看下边源码。

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //CyclicBarrier内部的ReentrantLock锁 final ReentrantLock lock = this.lock; //锁住,确保释放锁之前其他线程进不来 lock.lock(); try { //Generation可以看作一个boolean值,它的内部维护了一个布尔变量 final Generation g = generation; //当他内部的broken变量被设置为true的时候,抛出异常 if (g.broken) throw new BrokenBarrierException(); //那么他什么时候会被设置为true?就是在这里,线程中断 if (Thread.interrupted()) { //breakBarrier方法中会将g.broken设置为true,并且唤醒所有等待中的 //线程,并抛出异常 breakBarrier(); throw new InterruptedException(); } //关键来了,将count值减1,因为前边是加了锁的,所以同一时刻只有一 //个线程可以操作,所以线程安全 int index = --count; //如果index=0了,那么就说明,所有需要的线程都就位了,那么就唤醒 if (index == 0) { // tripped //最后一个线程进来了,满足了条件,走到了这里 boolean ranAction = false; try { final Runnable command = barrierCommand; //如果在构造方法中传入了runnable,那么就执行它 if (command != null) command.run(); ranAction = true; //创建一个新的Generation对象,唤醒所有的线程,注意,这会导致 //下边的判断条件if (g != generation)满足,从而使被唤醒的线程 // 跳出无限循环 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //这是一个无限循环,看到这种我们首先要想到的是,退出循环的条件是 //什么?从下边的代码可以看到,除了抛出异常之外,唯一的退出点就是 // if (g != generation),理解这个地方很关键,假设我们设置的parties=3 //那么第一个进入的线程进来之后,肯定就开始进入wait方法了,直到最 //后一个线程进来,满足了index == 0,回去看上边的代码 for (;;) { try { //这是两种挂起线程的方式,默认因为timed=false,走的是第一种 if (!timed) trip.await(); else if (nanos > 0L) //指定超时时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

private void breakBarrier() { generation.broken = true; count = parties; //唤醒所有通过trip.await()方法进入等待的线程 trip.signalAll(); }

nextGeneration()

private void nextGeneration() { // signal completion of last generation //唤醒所有wait中的线程 trip.signalAll(); // set up next generation //恢复count值 count = parties; //重新new一个Generation generation = new Generation(); }