4、状态追踪解除竞争
此处可以通过状态追踪,解除读与读之间和写与写之间的竞争问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| public interface Queue {
boolean offer(Object obj) throws InterruptedException;
Object poll() throws InterruptedException;
}
class FairnessBoundedBlockingQueue implements Queue { protected final int capacity;
protected Node head;
protected Node tail;
protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount;
protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; }
public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; offerLock.wait(); waitOfferCount--; } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; }
public Object poll() throws InterruptedException { Object result; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; pollLock.wait(); waitPollCount--; }
result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } class Node { Object value; Node next; Node(Object obj) { this.value = obj; next = null; } } }
|
以上
- 通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题;
- 当队列变更时,根据追踪的状态,决定是否派发消息,触发线程阻塞状态解除;
但,上述的实现在某些场景下会运行失败,面临活性问题,考虑
情况一:
- 初始状态队列为空 线程 A 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==1;
- 此时线程 A 在执行 wait 时被中断,抛出异常, waitPollCount==1 并未被重置;
- 阻塞队列为空,但 waitPollCount==1 类状态异常;
情况二:
- 初始状态队列为空 线程 A B 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==2;
- 线程 C 执行入队动作,可以立即执行,执行完成后,触发 pollLock 解除一个线程等待 notify;
- 触发的线程在 JVM 实现中是随机的,假设线程 A 被解除阻塞;
- 假设线程 A 在阻塞过程中已被中断,阻塞解除后 JVM 检查 interrupted 状态,抛出 InterruptedException 异常;(线程A在阻塞时被中断,那么它的状态是interrupted,但是还处在被阻塞状态,此时去唤醒该线程,则会报错,而此次唤醒操作则丢失。线程A等中断结束后还会进入阻塞状态。)
- 此时队列中有一个元素,但线程 A 仍阻塞在 pollLock 中,且一直阻塞下去;
以上为解除阻塞消息丢失的例子,问题的根源在与异常处理。
5、解决异常问题
解决线程中断退出的问题,线程校验中断状态的场景
- JVM 通常只会在有限的几个场景检测线程的中断状态, wait, Thread.join, Thread.sleep;
- JVM 在检测到线程中断状态 Thread.interrupted() 后,会清除中断标志,抛出 InterruptedException;
- 通常为了保证线程对中断及时响应, run 方法中需要自主检测中断标志,中断线程,特别是对中断比较敏感需要保持类的不变式的场景;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| class FairnessBoundedBlockingQueue implements Queue { protected final int capacity;
protected Node head;
protected Node tail;
protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount;
protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; }
public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; try { offerLock.wait(); } catch (InterruptedException e) { offerLock.notify(); throw e;
} finally { waitOfferCount--; } } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; }
public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; try { pollLock.wait(); } catch (InterruptedException e) { pollLock.notify(); throw e; } finally { waitPollCount--; } }
result = head.next.value; head.next.value = 0; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } }
|
以上
- 当等待线程中断退出时,捕获中断异常,通过 pollLock.notify 和 offerLock.notify 转发消息;
- 通过在 finally 中恢复状态追踪变量;
通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题。
6、解决公平性
公平性的问题的解决需要将状态变量的追踪转换为:请求监视器追踪。
- 每个请求对应一个监视器;
- 通过内部维护一个 FIFO 队列,实现公平性;
- 在队列状态变更时,释放队列中的监视器;
1 2 3 4 5 6 7 8 9 10
| boolean needToWait; synchronized(this) { needToWait = calculateNeedToWait(); if (needToWait) { enqueue(monitor); } } if (needToWait) { monitor.doWait(); }
|
需要注意
- monitor.doWait() 需要在 this 的卫式语句之外,因为如果在内部, monitor.doWait 并不会释放 this锁;
- calculateNeedToWait() 需要在 this 的守卫之内完成,避免同步问题;
- 需要考虑中断异常的问题;
基于以上的逻辑抽象,实现公平队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
| class FairnessBoundedBlockingQueue implements Queue { protected final int capacity;
protected Node head;
protected Node tail;
protected final Object pollLock = new Object(); protected int canPollCount;
protected final Object offerLock = new Object(); protected int canOfferCount;
protected final WaitQueue pollQueue = new WaitQueue(); protected final WaitQueue offerQueue = new WaitQueue();
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canOfferCount = capacity; this.canPollCount = 0; this.head = new Node(null); this.tail = head; }
public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } WaitNode wait = null; synchronized(offerLock) { if (canOfferCount <= 0 || !offerQueue.isEmpty()) { wait = new WaitNode(); offerQueue.enq(wait); } else { } }
try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { offerQueue.doNotify(); throw e; }
synchronized(offerLock) { Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollQueue.doNotify(); } return true; }
public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; WaitNode wait = null; synchronized(pollLock) { if (canPollCount <= 0 || !pollQueue.isEmpty()) { wait = new WaitNode(); pollQueue.enq(wait); } else { } }
try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { pollQueue.doNotify(); throw e; }
synchronized(pollLock) { result = head.next.value; head.next.value = 0; head = head.next; canPollCount--; }
synchronized(offerLock) { canOfferCount++; offerQueue.doNotify(); } return result; }
class WaitQueue {
WaitNode head; WaitNode tail;
WaitQueue() { head = new WaitNode(); tail = head; }
synchronized void doNotify() { for(;;) { WaitNode node = deq(); if (node == null) { break; } else if (node.doNotify()) { break; } else { } } }
synchronized boolean isEmpty() { return head.next == null; }
synchronized void enq(WaitNode node) { tail.next = node; tail = tail.next; }
synchronized WaitNode deq() { if (head.next == null) { return null; } WaitNode res = head.next; head = head.next; if (head.next == null) { tail = head; } return res; } }
class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; }
synchronized void doWait() throws InterruptedException { try { while (!released) { wait(); } } catch (InterruptedException e) { if (!released) { released = true; throw e; } else { Thread.currentThread().interrupt(); } } }
synchronized boolean doNotify() { if (!released) { released = true; notify(); return true; } else { return false; } } } }
|
以上
- 核心是替换状态追踪变量为同步节点, WaitNode;
- WaitNode 通过简单的同步队列组织实现 FIFO 协议,每个线程等待各自的 WaitNode 监视器;
- WaitNode 内部维持 released 状态,标识线程阻塞状态是否被释放,主要是为了处理中断的问题;
- WaitQueue 本身是全同步的,由于已解决了读写竞争已经读写内部竞争的问题, WaitQueue 同步并不会造成问题;
- WaitQueue 是无界队列,是一个潜在的问题;但由于其只做同步的追踪,而且追踪的通常是线程,通常并不是问题;
- 最终的公平有界队列实现,无论是入队还是出队,首先卫式语句判定是否需要入队等待,如果入队等待,通过公平性协议等待;
当信号释放时,借助读写锁同步更新队列;最后同样借助读写锁,触发队列更新消息;
参考
《阿里技术博客》