同步工具类

锁(Lock)

同步锁(synchronized)和可重入锁(ReentrantLock)都是用于并发程序设计必不可少的手段,在JDK 5.0早期版本中,同步锁性能远远低于重入锁,但是在6.0版本之后,jdk对同步锁做了大量的优化,使得同步锁跟重入锁性能差距并不大,并且jdk团队表示,同步锁还有进一步升级优化的空间。

重入的概念是指在同一个线程内部,这种锁是可以反复进入的。

1
2
3
4
5
lock.lock();
lock.lock();
// do something
lock.unlock();
lock.unlock();

一个线程可多次获取锁,但同时也要释放相同的次数,否则该线程将持续拥有锁,其他线程将无法进入临界区。

重入锁的几个重要方法:

  • lock: 获取锁,如果锁被其他线程占用,则休眠等待。
  • lockInterruptibly: 获取锁,可以被其他线程所中断。
  • tryLock: 尝试获取锁,不等待。
  • tryLock(time, timeUnit): 在一定时间内尝试获取锁。
  • unlock: 释放锁

synchronized 与 ReentrantLock

对于synchronized来说,一个线程要么获取到锁开始执行,要么继续等待。但是对于重入锁来说,提供了更灵活的一种机制,那就是在等待锁的过程中,可以取消对锁的请求,这样可以有效避免死锁的可能。

公平锁与非公平锁

重入锁默认是非公平锁,可以通过构造函数参数实现公平锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Java source: ReentrantLock.java

// 默认ReentrantLock是非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
 }

 // 可以通过参数fair构建公平锁
 public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
 }

如果是非公平锁,在并发场景下,系统会随机从等待队列中挑选一个线程。如果是公平锁,系统会维护一个有序队列,会按照进入队列的次序有序执行,因此公平锁虽然避免了饥饿现象,但是会需要更高的成本来维护这个有序队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ReentrantLock例子,启动100个线程,每个线程往List中添加100个数,如果不做同步控制,最后得到的SIZE
// 大概率会小于10000
public class ThreadTest9 {

    public static void main(String[] args) throws InterruptedException {

        Lock lock = new ReentrantLock();
        List<Integer> arrays = new ArrayList<>();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    for (int j = 0; j < 100; j++) {
                        arrays.add(arrays.size() + 1);
                    }
                    lock.unlock();
                }
            });
            t.start();
            threads.add(t);
        }

        for(Thread t : threads){
            t.join();
        }

        System.out.println("SIZE: " + arrays.size());
    }
}

Condition

Condition和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condtion是与重入锁相关联的。通过Lock接口(重入锁就实现了这一接口)的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Condition的例子
public class ThreadTest10 implements Runnable {

    private static Lock lock = new ReentrantLock();
    private static Condition cond = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            System.out.println("t1 waiting @ " + System.currentTimeMillis());
            cond.await();
            System.out.println("t1 running @ " + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadTest10 test10 = new ThreadTest10();

        Thread t1 = new Thread(test10);
        t1.start();

        Thread.sleep(2000);

        lock.lock();
        cond.signal();
        lock.unlock();
    }
}

CountDownLatch

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// CountDownLatch的例子
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class ThreadTest7 {

    static class MyThread extends Thread {

        private CountDownLatch cd;

        MyThread(CountDownLatch cd){
            this.cd = cd;
        }

        @Override
        public void run() {
            // 随机休眠 1 ~ 10 秒
            try {
                Thread.sleep((new Random()).nextInt(10) * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(getName() + " finished @ " + System.currentTimeMillis());
            cd.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int n = 500;
        CountDownLatch cd = new CountDownLatch(n);

        for (int i = 0; i < n; i++) {
            (new MyThread(cd)).start();
        }

        cd.await();
        System.out.println("ALL DONE!");
    }
}

CyclicBarrier

CyclicBarrier和CountDownLatch非常类似,它也可以实现线程间的技术等待,但是它的功能比CountDownLatch更加复杂和强大。主要应用场景和CountDownLatch类似。CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// CyclicBarrier
import java.util.Random;
import java.util.concurrent.*;

public class ThreadTest8 {

    static class Runner extends Thread {
        private CyclicBarrier cb;
        private String name;

        Runner(CyclicBarrier cb, String name) {
            this.cb = cb;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                // 运动员准备
                Thread.sleep((new Random()).nextInt(1000));
                System.out.println(name + " Ready");

                // 等待所有Runner准备好
                cb.await();

                // 起跑
                System.out.println(name + " start running @ " + System.currentTimeMillis());
                int result = 9580 + (new Random()).nextInt(1000);
                Thread.sleep(result);

                // 跑完
                System.out.println(name + " finished @ " + (result / 1000.0));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        CyclicBarrier cb = new CyclicBarrier(8, () -> {
            // 所有Runner准备好后的回调
            System.out.println("Start Running...");
        });

        String[] names = new String[]{"布雷克", "盖伊", "鲍威尔", "加特林", "卡特尔", "格林", "穆林斯", "博尔特"};

        Executor executor = Executors.newFixedThreadPool(names.length);
        for (String name : names) {
            executor.execute(new Runner(cb, name));
        }
    }
}

Semaphore

信号量(Semaphore)为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。

在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有:

1
2
3
4
5
public void acquire();
public void acquireUninterruptibly();
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);
public void release();

acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// semaphore的例子,模拟一个排队参观的场景,100个线程,每次准入5个线程
// 记得在finally中使用release方法释放掉准入凭证,不然会卡死后面的线程
public class ThreadTest11 implements Runnable {

    private static Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        try {
            // 等待准入,如果不能进入,则一直阻塞在这里
            System.out.println(name + " waiting @ " + System.currentTimeMillis());
            semaphore.acquire();

            // 参观1 ~ 2秒,结束
            System.out.println(name + " start @" + System.currentTimeMillis());
            Thread.sleep((new Random()).nextInt(1000) + 1000);
            System.out.println(name + " finished @ " + System.currentTimeMillis());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 这里记得要释放,不然会锁死
            semaphore.release();
        }
    }

    public static void main(String[] args) {

        // 100个线程参观,每次准入5个人
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new ThreadTest11());
            t.start();
        }
    }
}

LockSupport

LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// LockSupport的例子
public class ThreadTest12 {

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(() -> {
            System.out.println("t1 waiting @ " + System.currentTimeMillis());
            LockSupport.park();
            System.out.println("t1 continue @ " + System.currentTimeMillis());
        });
        t1.start();

        // 暂停三秒,释放t1的park
        Thread.sleep(3000);
        LockSupport.unpark(t1);
    }
}

抽象队列同步器(AQS)

在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),内部实现都依赖AbstractQueuedSynchronizer类,接下去让我们看看Doug Lea大神是如何使用一个普通类就完成了代码块的并发访问控制。为了方便,本文中使用AQS代替AbstractQueuedSynchronizer。

AQS的代码在java.util.concurrent.locks.AbstractQueuedSynchronizer 文件中。

1
2
3
4
5
6
7
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    // ...
    
}

image-20210808124204271

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:

  • Exclusive(独占,只有一个线程能执行,如ReentrantLock)。
  • Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为 0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

下图展示了AQS在JDK包中的子类的情况:

image-20210902205150451