欢迎光临seo外链专员,seo网络推广专员网站

[Java源码][并发J.U.C]---并发工具类CountDownLatch

作者:jcmp      发布时间:2021-05-11      浏览量:0
一、例子package com.sour

一、例子

package com.sourcecode.concurrencytools;public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); System.out.println(2); //c.countDown(); /** * 打开注释 会依次打印1,2,3 * 关闭注释 会依次打印1,2 Main线程会阻塞在await()方法 */ } }).start(); c.await(); System.out.println("3"); }}

二、实现思路

package com.sourcecode.concurrencytools;import com.sourcecode.reentrantreadwritelock.AbstractQueuedSynchronizer;import java.util.concurrent.TimeUnit;public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } // 返回当前AQS的状态值 int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 其实跟传入的参数acquires没有什么实质的作用 // 根据当前AQS的状态值是否为0,如果为0就获得锁,如果不为0会进入到AQS中的acquireSharedInterruptibly方法中 // 具体的操作需要了解AQS return (getState() == 0) ? 1 : -1; } // 释放 逻辑非常简单 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }}

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 记录一下旧的头节点 setHead(node); // 将当前节点设置为头节点 /** * 如果propagate > 0 说明锁还可以被别的线程拿到 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

package com.sourcecode.concurrencytools;import java.util.concurrent.TimeUnit;public class CountDownLatchTest3 { static CountDownLatch c = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { Thread thread = new MyThread(); thread.start(); TimeUnit.SECONDS.sleep(1); thread.interrupt(); //c.countDown(); System.out.println(Thread.currentThread() + "----->finished!"); } static class MyThread extends Thread { public void run() { try { System.out.println(Thread.currentThread() + "----->before await"); c.await(); System.out.println(Thread.currentThread() + "----->after await"); } catch (InterruptedException e) { System.out.println(Thread.currentThread() + "----->in interrupted exception."); } System.out.println(Thread.currentThread() + "----->finished!"); } }}

Thread[Thread-0,5,main]----->before awaitThread[main,5,main]----->finished!Thread[Thread-0,5,main]----->in interrupted exception.Thread[Thread-0,5,main]----->finished!

三、参考