并发包-Countdownlatch

并发包-Countdownlatch

CountDownLatch 的定位是一个多线程中的同步辅助类。它可以让一个或多个线程等待,直到其他线程完成系列操作。


一、使用

场景一:分解任务,等待,汇总

在 Java Doc 中给出的一个典型场景:利用多线程来进行任务分解,主线程必须在所有子线程执行完成后才能继续执行。

主线程必须在 10个 子线程执行完成后,才能继续执行。

/**
 * CountDownLatch 初始化为10,启动10个工作现场,主线程等待所有工作线程结束后才能执行。
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(10);
        ExecutorService e = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; ++i) {
            e.execute(new WorkerRunnable(doneSignal, i));
        }
        doneSignal.await();
        System.out.println("所有 Worker 完成工作");
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    @Override
    public void run() {
        try {
            doWork(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        doneSignal.countDown();
    }

    void doWork(int i) throws InterruptedException {
        System.out.println(i + "子线程工作中");
        Thread.sleep(1000);
    }
}

场景二 :on/off latch, or gate

Java Doc中给出的另一个场景是,作为简单开关锁或者门:所有线程调用等待在门处等待,直到由调用countDown() 的线程打开它 (一批线程等待到信号才开始执行)。

/**
 * 现在工厂有一个工头和5个工人,工人们必须等待开始工头信号才能开始工作,工头等待所有工人工作完成后。
 */
public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(5);

        for (int i = 0; i < 5; ++i) {
            new Thread(new Worker(startSignal, doneSignal)).start();
        }
        doSomethingElse();
        System.out.println("所有 worker 可以开始工作了");
        startSignal.countDown();
        System.out.println("工头等待 worker 工作完成");
        doneSignal.await();
        System.out.println("所有 worker 工作完成,工头工作总结");
    }

    private static void doSomethingElse() {
        System.out.println("工头进行工作安排,所有 worker 都等待中");
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    @Override
    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    void doWork() throws InterruptedException {
        System.out.println("子线程工作中");
        Thread.sleep(1000);
    }
}

二、分析

CountDownLatch的 API中,重要操作包括构造函数、await()和countDown()操作。

CountDownLatch是通过一个计数器来实现的,其初始参数必须大于 0,当调用 countDown()时,计数器减一,知道计数器到达 0 ,await() 的线程才会被唤醒。

CountDownLatch中只有一个 Sync 属性,Sync 继承自 AbstractQueuedSynchronizer。

AQS是一个构建锁或者其他同步组件的基础框架,AQS内部维护了一个 state变量,CountDownLatch中构造函数的count就是就是赋值state的值。AQS的子类需要根据自己的功能重写tryAcquireShared(),去定义如何获取共享锁;重写tryReleaseShared() 定义释放共享锁需要做什么。

CountDownLatch中 state==0 才能拿到共享锁,countDown()操作本质就是讲 state减1,直到0的时候,其他await()的线程才能拿到共享锁继续往下执行。

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
​        // 直到 state==0,才能拿到共享锁
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

​        //通过CAS操作state,失败则不断从尝试,直到CAS操作成功。
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;
    
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注