1、基础版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
interface Queue {

boolean offer(Object obj);

Object poll();

}
class FairnessBoundedBlockingQueue implements Queue {
// 当前大小
protected int size;

// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}

// 如果队列已满,通过返回值标识
public boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}

// 如果队列为空,head.next == null;返回空元素
public Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
return result;
}
return null;
}

class Node {
Object value;
Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}
  1. 定义支持队列的两个基础接口, poll 和 offer;
  2. 队列的实现,采用经典实现;
  3. 考虑在队列空的情况下, poll 返回为空,非阻塞;
  4. 队列在满的情况下, offer 返回 false ,入队不成功,无异常;

2、并发版本

如果在并发场景下,上述的实现面临一些问题,同时未实现给定的一些需求。通过添加 synchronized ,保证并发条件下的线程安全问题。注意此处做同步的原因是为了保证类的不变式。

以上,简单粗暴的加 synchronized 可以解决问题,但会引入新的问题:系统活性问题(此问题下文会解决)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class BoundedBlockingQueue implements Queue {

// 当前大小
protected int size;

// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}

// 如果队列已满,通过返回值标识
public synchronized boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}

// 如果队列为空,head.next == null;返回空元素
public synchronized Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
return result;
}
return null;
}

class Node {
Object value;
BoundedBlockingQueue.Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}

同时,简单加 synchronized 同步是无法实现阻塞等待;即

  1. 如果队列为空,那么出队的动作还是会立即返回,返回为空;
  2. 如果队列已满,那么入队动作还是会立即返回,返回操作不成功;

卫式方法

阻塞等待,可以通过简单的卫式方法来实现,此问题本质上可以抽象为:

  1. 任何一个方法都需要在满足一定条件下才可以执行;
  2. 执行方法前需要首先校验不变式,然后执行变更;
  3. 在执行完成后,校验是否满足后验不变式;

代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// 当前线程
synchronized Object action(Object arg) {
while(!condition) {
wait();
}
// 前置条件,不变式
checkPreCondition();
doAction();
// 后置条件,不变式
checkPostCondition();
}

// 其他线程
synchronized Object notifyAction(Object arg) {
notifyAll();
}

需要注意:

  1. 通常会采用 notifyAll 发送通知,而非 notify ;因为如果当前线程收到 notify 通知后被中断,那么系统将一直等待下去。
  2. 如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后,执行条件已经不满足,虽然当前线程持有互斥锁。
  3. 卫式条件的所有变量,有任何变更都需要发送 notifyAll 不然面临系统活性问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
interface Queue {

boolean offer(Object obj) throws InterruptedException;

Object poll() throws InterruptedException;

}
class FairnessBoundedBlockingQueue implements Queue {
// 当前大小
protected int size;

// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}

// 如果队列已满,通过返回值标识
public synchronized boolean offer(Object obj) throws InterruptedException {
size>=capacity {
wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
notifyAll(); // 可以出队
return true;
}

// 如果队列为空,阻塞等待
public synchronized Object poll() throws InterruptedException {
while (head.next == null) {
wait();
}
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
notifyAll(); // 可以入队
return result;
}
// 省略 Node 的定义
}

以上,实现了阻塞等待,但也引入了更大的性能问题

  1. 入队和出队动作阻塞等待同一把锁,恶性竞争;(这里不管入队还是出队操作的都是同一个队列,入队还是出队都操作了size,所以不能同时进行)
  2. 当队列变更时,所有阻塞线程被唤醒,大量的线程上下文切换,竞争同步锁,最终可能只有一个线程能执行;

需要注意的点:

  1. 阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理;
  2. 接口需要支持抛出中断异常;
  3. 队里变更需要 notifyAll 避免线程中断或异常,丢失消息;

3、锁拆分优化

以上第一个问题,可以通过锁拆分来解决,即:定义两把锁,读锁和写锁;读写分离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;

// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.head = new Node(null);
this.tail = head;
}

// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
offerLock.wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollLock.notifyAll();
}
return true;
}

// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
pollLock.wait();
}

result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerLock.notifyAll();
}
return result;
}
// 省略 Node 定义
}

以上

  1. 定义了两把锁, pollLock 和 offerLock 拆分出队和入队竞争;
  2. 入队锁同步的变量为:callOfferCount 和 tail;
  3. 出队锁同步的变量为:canPollCount 和 head;
  4. 出队的动作:首先拿到 pollLock 卫式等待后,完成出队动作;然后拿到 offerLock 发送通知,解除入队的等待线程。
  5. 入队的动作:首先拿到 offerLock 卫式等待后,完成入队的动作;然后拿到 pollLock 发送通知,解除出队的等待线程。

以上实现

  1. 确保通过入队锁和出队锁,分别保证入队和出队的原子性;
  2. 出队动作,通过特别的实现,确保出队只会变更 head ,避免获取 offerLock;
  3. 通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题;

问题:当有多个入队线程等待时,一次出队的动作会触发所有入队线程竞争,大量的线程上下文切换,最终只有一个线程能执行。
即,还有 读与读写与写 之间的竞争问题。

参考

《阿里技术博客》