4、状态追踪解除竞争

此处可以通过状态追踪,解除读与读之间和写与写之间的竞争问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public interface Queue {

boolean offer(Object obj) throws InterruptedException;

Object poll() throws InterruptedException;

}

class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;

// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}

// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
offerLock.wait();
waitOfferCount--;
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}

// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
Object result;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
pollLock.wait();
waitPollCount--;
}

result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}

class Node {
Object value;
Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}

以上

  1. 通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题;
  2. 当队列变更时,根据追踪的状态,决定是否派发消息,触发线程阻塞状态解除;

但,上述的实现在某些场景下会运行失败,面临活性问题,考虑

情况一:

  1. 初始状态队列为空 线程 A 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==1;
  2. 此时线程 A 在执行 wait 时被中断,抛出异常, waitPollCount==1 并未被重置;
  3. 阻塞队列为空,但 waitPollCount==1 类状态异常;

情况二:

  1. 初始状态队列为空 线程 A B 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==2;
  2. 线程 C 执行入队动作,可以立即执行,执行完成后,触发 pollLock 解除一个线程等待 notify;
  3. 触发的线程在 JVM 实现中是随机的,假设线程 A 被解除阻塞;
  4. 假设线程 A 在阻塞过程中已被中断,阻塞解除后 JVM 检查 interrupted 状态,抛出 InterruptedException 异常;(线程A在阻塞时被中断,那么它的状态是interrupted,但是还处在被阻塞状态,此时去唤醒该线程,则会报错,而此次唤醒操作则丢失。线程A等中断结束后还会进入阻塞状态。)
  5. 此时队列中有一个元素,但线程 A 仍阻塞在 pollLock 中,且一直阻塞下去;

以上为解除阻塞消息丢失的例子,问题的根源在与异常处理。

5、解决异常问题

解决线程中断退出的问题,线程校验中断状态的场景

  1. JVM 通常只会在有限的几个场景检测线程的中断状态, wait, Thread.join, Thread.sleep;
  2. JVM 在检测到线程中断状态 Thread.interrupted() 后,会清除中断标志,抛出 InterruptedException;
  3. 通常为了保证线程对中断及时响应, run 方法中需要自主检测中断标志,中断线程,特别是对中断比较敏感需要保持类的不变式的场景;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

// guard: canPollCount, head, waitPollCount
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;

// guard: canOfferCount, tail, waitOfferCount
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}

// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
}
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
try {
offerLock.wait();
} catch (InterruptedException e) {
// 触发其他线程
offerLock.notify();
throw e;

} finally {
waitOfferCount--;
}
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}

// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
try {
pollLock.wait();
} catch (InterruptedException e) {
pollLock.notify();
throw e;
} finally {
waitPollCount--;
}
}

result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定义
}

以上

  1. 当等待线程中断退出时,捕获中断异常,通过 pollLock.notify 和 offerLock.notify 转发消息;
  2. 通过在 finally 中恢复状态追踪变量;

通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题。

6、解决公平性

公平性的问题的解决需要将状态变量的追踪转换为:请求监视器追踪

  1. 每个请求对应一个监视器;
  2. 通过内部维护一个 FIFO 队列,实现公平性;
  3. 在队列状态变更时,释放队列中的监视器;
1
2
3
4
5
6
7
8
9
10
boolean needToWait;
synchronized(this) {
needToWait = calculateNeedToWait();
if (needToWait) {
enqueue(monitor); // 请求对应的monitor
}
}
if (needToWait) {
monitor.doWait();
}

需要注意

  1. monitor.doWait() 需要在 this 的卫式语句之外,因为如果在内部, monitor.doWait 并不会释放 this锁;
  2. calculateNeedToWait() 需要在 this 的守卫之内完成,避免同步问题;
  3. 需要考虑中断异常的问题;

基于以上的逻辑抽象,实现公平队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;

// 头指针,empty: head.next == tail == null
protected Node head;

// 尾指针
protected Node tail;

// guard: canPollCount, head, pollQueue
protected final Object pollLock = new Object();
protected int canPollCount;

// guard: canOfferCount, tail, offerQueue
protected final Object offerLock = new Object();
protected int canOfferCount;

protected final WaitQueue pollQueue = new WaitQueue();
protected final WaitQueue offerQueue = new WaitQueue();

public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canOfferCount = capacity;
this.canPollCount = 0;
this.head = new Node(null);
this.tail = head;
}

// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
}
WaitNode wait = null;
synchronized(offerLock) {
// 在有阻塞请求或者队列为空时,阻塞等待
if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
wait = new WaitNode();
offerQueue.enq(wait);
} else {
// continue.
}
}

try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
offerQueue.doNotify();
throw e;
}

// 确保此时线程状态正常,以下不会校验中断
synchronized(offerLock) {
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollQueue.doNotify();
}
return true;
}

// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
WaitNode wait = null;
synchronized(pollLock) {
// 在有阻塞请求或者队列为空时,阻塞等待
if (canPollCount <= 0 || !pollQueue.isEmpty()) {
wait = new WaitNode();
pollQueue.enq(wait);
} else {
// ignore
}
}

try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
// 传递消息
pollQueue.doNotify();
throw e;
}

// 以下不会检测线程中断状态
synchronized(pollLock) {
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}

synchronized(offerLock) {
canOfferCount++;
offerQueue.doNotify();
}
return result;
}

class WaitQueue {

WaitNode head;
WaitNode tail;

WaitQueue() {
head = new WaitNode();
tail = head;
}

synchronized void doNotify() {
for(;;) {
WaitNode node = deq();
if (node == null) {
break;
} else if (node.doNotify()) {
// 此处确保NOTIFY成功
break;
} else {
// ignore, and retry.
}
}
}

synchronized boolean isEmpty() {
return head.next == null;
}

synchronized void enq(WaitNode node) {
tail.next = node;
tail = tail.next;
}

synchronized WaitNode deq() {
if (head.next == null) {
return null;
}
WaitNode res = head.next;
head = head.next;
if (head.next == null) {
tail = head; // 为空,迁移tail节点
}
return res;
}
}

class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}

synchronized void doWait() throws InterruptedException {
try {
while (!released) {
wait();
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果是NOTIFY之后收到中断的信号,不能抛出异常;需要做RELAY处理
Thread.currentThread().interrupt();
}
}
}

synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
// 明确释放了一个线程,返回true
return true;
} else {
// 没有释放新的线程,返回false
return false;
}
}
}
// 省略 Node 的定义
}

以上

  1. 核心是替换状态追踪变量为同步节点, WaitNode;
  2. WaitNode 通过简单的同步队列组织实现 FIFO 协议,每个线程等待各自的 WaitNode 监视器;
  3. WaitNode 内部维持 released 状态,标识线程阻塞状态是否被释放,主要是为了处理中断的问题;
  4. WaitQueue 本身是全同步的,由于已解决了读写竞争已经读写内部竞争的问题, WaitQueue 同步并不会造成问题;
  5. WaitQueue 是无界队列,是一个潜在的问题;但由于其只做同步的追踪,而且追踪的通常是线程,通常并不是问题;
  6. 最终的公平有界队列实现,无论是入队还是出队,首先卫式语句判定是否需要入队等待,如果入队等待,通过公平性协议等待;
    当信号释放时,借助读写锁同步更新队列;最后同样借助读写锁,触发队列更新消息;

参考

《阿里技术博客》