同步工具类
锁(Lock)
同步锁(synchronized)和可重入锁(ReentrantLock)都是用于并发程序设计必不可少的手段,在JDK 5.0早期版本中,同步锁性能远远低于重入锁,但是在6.0版本之后,jdk对同步锁做了大量的优化,使得同步锁跟重入锁性能差距并不大,并且jdk团队表示,同步锁还有进一步升级优化的空间。
重入的概念是指在同一个线程内部,这种锁是可以反复进入的。
1
2
3
4
5
|
lock.lock();
lock.lock();
// do something
lock.unlock();
lock.unlock();
|
一个线程可多次获取锁,但同时也要释放相同的次数,否则该线程将持续拥有锁,其他线程将无法进入临界区。
重入锁的几个重要方法:
- lock: 获取锁,如果锁被其他线程占用,则休眠等待。
- lockInterruptibly: 获取锁,可以被其他线程所中断。
- tryLock: 尝试获取锁,不等待。
- tryLock(time, timeUnit): 在一定时间内尝试获取锁。
- unlock: 释放锁
synchronized 与 ReentrantLock
对于synchronized来说,一个线程要么获取到锁开始执行,要么继续等待。但是对于重入锁来说,提供了更灵活的一种机制,那就是在等待锁的过程中,可以取消对锁的请求,这样可以有效避免死锁的可能。
公平锁与非公平锁
重入锁默认是非公平锁,可以通过构造函数参数实现公平锁。
1
2
3
4
5
6
7
8
9
10
11
|
// Java source: ReentrantLock.java
// 默认ReentrantLock是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 可以通过参数fair构建公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
|
如果是非公平锁,在并发场景下,系统会随机从等待队列中挑选一个线程。如果是公平锁,系统会维护一个有序队列,会按照进入队列的次序有序执行,因此公平锁虽然避免了饥饿现象,但是会需要更高的成本来维护这个有序队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// ReentrantLock例子,启动100个线程,每个线程往List中添加100个数,如果不做同步控制,最后得到的SIZE
// 大概率会小于10000
public class ThreadTest9 {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
List<Integer> arrays = new ArrayList<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
for (int j = 0; j < 100; j++) {
arrays.add(arrays.size() + 1);
}
lock.unlock();
}
});
t.start();
threads.add(t);
}
for(Thread t : threads){
t.join();
}
System.out.println("SIZE: " + arrays.size());
}
}
|
Condition
Condition和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condtion是与重入锁相关联的。通过Lock接口(重入锁就实现了这一接口)的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// Condition的例子
public class ThreadTest10 implements Runnable {
private static Lock lock = new ReentrantLock();
private static Condition cond = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
System.out.println("t1 waiting @ " + System.currentTimeMillis());
cond.await();
System.out.println("t1 running @ " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ThreadTest10 test10 = new ThreadTest10();
Thread t1 = new Thread(test10);
t1.start();
Thread.sleep(2000);
lock.lock();
cond.signal();
lock.unlock();
}
}
|
CountDownLatch
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// CountDownLatch的例子
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class ThreadTest7 {
static class MyThread extends Thread {
private CountDownLatch cd;
MyThread(CountDownLatch cd){
this.cd = cd;
}
@Override
public void run() {
// 随机休眠 1 ~ 10 秒
try {
Thread.sleep((new Random()).nextInt(10) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " finished @ " + System.currentTimeMillis());
cd.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
int n = 500;
CountDownLatch cd = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
(new MyThread(cd)).start();
}
cd.await();
System.out.println("ALL DONE!");
}
}
|
CyclicBarrier
CyclicBarrier和CountDownLatch非常类似,它也可以实现线程间的技术等待,但是它的功能比CountDownLatch更加复杂和强大。主要应用场景和CountDownLatch类似。CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
// CyclicBarrier
import java.util.Random;
import java.util.concurrent.*;
public class ThreadTest8 {
static class Runner extends Thread {
private CyclicBarrier cb;
private String name;
Runner(CyclicBarrier cb, String name) {
this.cb = cb;
this.name = name;
}
@Override
public void run() {
try {
// 运动员准备
Thread.sleep((new Random()).nextInt(1000));
System.out.println(name + " Ready");
// 等待所有Runner准备好
cb.await();
// 起跑
System.out.println(name + " start running @ " + System.currentTimeMillis());
int result = 9580 + (new Random()).nextInt(1000);
Thread.sleep(result);
// 跑完
System.out.println(name + " finished @ " + (result / 1000.0));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(8, () -> {
// 所有Runner准备好后的回调
System.out.println("Start Running...");
});
String[] names = new String[]{"布雷克", "盖伊", "鲍威尔", "加特林", "卡特尔", "格林", "穆林斯", "博尔特"};
Executor executor = Executors.newFixedThreadPool(names.length);
for (String name : names) {
executor.execute(new Runner(cb, name));
}
}
}
|
Semaphore
信号量(Semaphore)为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。
在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有:
1
2
3
4
5
|
public void acquire();
public void acquireUninterruptibly();
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);
public void release();
|
acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// semaphore的例子,模拟一个排队参观的场景,100个线程,每次准入5个线程
// 记得在finally中使用release方法释放掉准入凭证,不然会卡死后面的线程
public class ThreadTest11 implements Runnable {
private static Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
// 等待准入,如果不能进入,则一直阻塞在这里
System.out.println(name + " waiting @ " + System.currentTimeMillis());
semaphore.acquire();
// 参观1 ~ 2秒,结束
System.out.println(name + " start @" + System.currentTimeMillis());
Thread.sleep((new Random()).nextInt(1000) + 1000);
System.out.println(name + " finished @ " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 这里记得要释放,不然会锁死
semaphore.release();
}
}
public static void main(String[] args) {
// 100个线程参观,每次准入5个人
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new ThreadTest11());
t.start();
}
}
}
|
LockSupport
LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// LockSupport的例子
public class ThreadTest12 {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("t1 waiting @ " + System.currentTimeMillis());
LockSupport.park();
System.out.println("t1 continue @ " + System.currentTimeMillis());
});
t1.start();
// 暂停三秒,释放t1的park
Thread.sleep(3000);
LockSupport.unpark(t1);
}
}
|
抽象队列同步器(AQS)
在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),内部实现都依赖AbstractQueuedSynchronizer类,接下去让我们看看Doug Lea大神是如何使用一个普通类就完成了代码块的并发访问控制。为了方便,本文中使用AQS代替AbstractQueuedSynchronizer。
AQS的代码在java.util.concurrent.locks.AbstractQueuedSynchronizer
文件中。
1
2
3
4
5
6
7
|
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// ...
}
|

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:
- Exclusive(独占,只有一个线程能执行,如ReentrantLock)。
- Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为 0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
下图展示了AQS在JDK包中的子类的情况:
