1、基础版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   | interface Queue {          boolean offer(Object obj);
      Object poll();      } class FairnessBoundedBlockingQueue implements Queue {          protected int size;
           protected final int capacity;
           protected Node head;
           protected Node tail;
      public FairnessBoundedBlockingQueue(int capacity) {         this.capacity = capacity;         this.head = new Node(null);         this.tail = head;         this.size = 0;     }
           public boolean offer(Object obj) {         if (size < capacity) {             Node node = new Node(obj);             tail.next = node;             tail = node;             ++size;             return true;         }         return false;     }
           public Object poll() {         if (head.next != null) {             Object result = head.next.value;             head.next.value = null;             head = head.next;              --size;             return result;         }         return null;     }
      class Node {         Object value;         Node next;         Node(Object obj) {             this.value = obj;             next = null;         }     } }
  | 
 
- 定义支持队列的两个基础接口, poll 和 offer;
 
- 队列的实现,采用经典实现;
 
- 考虑在队列空的情况下, poll 返回为空,非阻塞;
 
- 队列在满的情况下, offer 返回 false ,入队不成功,无异常;
 
2、并发版本
如果在并发场景下,上述的实现面临一些问题,同时未实现给定的一些需求。通过添加 synchronized ,保证并发条件下的线程安全问题。注意此处做同步的原因是为了保证类的不变式。
以上,简单粗暴的加 synchronized 可以解决问题,但会引入新的问题:系统活性问题(此问题下文会解决)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
   | public class BoundedBlockingQueue implements Queue {
           protected int size;
           protected final int capacity;
           protected Node head;
           protected Node tail;
      public BoundedBlockingQueue(int capacity) {         this.capacity = capacity;         this.head = new Node(null);         this.tail = head;         this.size = 0;     }
           public synchronized boolean offer(Object obj) {         if (size < capacity) {             Node node = new Node(obj);             tail.next = node;             tail = node;             ++size;             return true;         }         return false;     }
           public synchronized Object poll() {         if (head.next != null) {             Object result = head.next.value;             head.next.value = null;             head = head.next;              --size;             return result;         }         return null;     }
      class Node {         Object value;         BoundedBlockingQueue.Node next;         Node(Object obj) {             this.value = obj;             next = null;         }     } }
  | 
 
同时,简单加 synchronized 同步是无法实现阻塞等待;即
- 如果队列为空,那么出队的动作还是会立即返回,返回为空;
 
- 如果队列已满,那么入队动作还是会立即返回,返回操作不成功;
 
卫式方法
阻塞等待,可以通过简单的卫式方法来实现,此问题本质上可以抽象为:
- 任何一个方法都需要在满足一定条件下才可以执行;
 
- 执行方法前需要首先校验不变式,然后执行变更;
 
- 在执行完成后,校验是否满足后验不变式;
 
代码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | 
  synchronized Object action(Object arg) {     while(!condition) {         wait();     }          checkPreCondition();     doAction();          checkPostCondition(); }
 
  synchronized Object notifyAction(Object arg) {     notifyAll(); }
 
  | 
 
需要注意:
- 通常会采用 notifyAll 发送通知,而非 notify ;因为如果当前线程收到 notify 通知后被中断,那么系统将一直等待下去。
 
- 如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后,执行条件已经不满足,虽然当前线程持有互斥锁。
 
- 卫式条件的所有变量,有任何变更都需要发送 notifyAll 不然面临系统活性问题。
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
   | interface Queue {
      boolean offer(Object obj) throws InterruptedException;
      Object poll() throws InterruptedException;
  } class FairnessBoundedBlockingQueue implements Queue {          protected int size;
           protected final int capacity;
           protected Node head;
           protected Node tail;
      public FairnessBoundedBlockingQueue(int capacity) {         this.capacity = capacity;         this.head = new Node(null);         this.tail = head;         this.size = 0;     }
           public synchronized boolean offer(Object obj) throws InterruptedException {         size>=capacity {             wait();         }         Node node = new Node(obj);         tail.next = node;         tail = node;         ++size;         notifyAll();          return true;     }
           public synchronized Object poll() throws InterruptedException {         while (head.next == null) {             wait();         }         Object result = head.next.value;         head.next.value = null;         head = head.next;          --size;         notifyAll();          return result;     }      }
  | 
 
以上,实现了阻塞等待,但也引入了更大的性能问题
- 入队和出队动作阻塞等待同一把锁,恶性竞争;(这里不管入队还是出队操作的都是同一个队列,入队还是出队都操作了size,所以不能同时进行)
 
- 当队列变更时,所有阻塞线程被唤醒,大量的线程上下文切换,竞争同步锁,最终可能只有一个线程能执行;
 
需要注意的点:
- 阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理;
 
- 接口需要支持抛出中断异常;
 
- 队里变更需要 notifyAll 避免线程中断或异常,丢失消息;
 
3、锁拆分优化
以上第一个问题,可以通过锁拆分来解决,即:定义两把锁,读锁和写锁;读写分离。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
   | 
  class FairnessBoundedBlockingQueue implements Queue {          protected final int capacity;
           protected Node head;
           protected Node tail;
           protected final Object pollLock = new Object();     protected int canPollCount;
           protected final Object offerLock = new Object();     protected int canOfferCount;
      public FairnessBoundedBlockingQueue(int capacity) {         this.capacity = capacity;         this.canPollCount = 0;         this.canOfferCount = capacity;         this.head = new Node(null);         this.tail = head;     }
           public boolean offer(Object obj) throws InterruptedException {         synchronized(offerLock) {             while(canOfferCount <= 0) {                 offerLock.wait();             }             Node node = new Node(obj);             tail.next = node;             tail = node;             canOfferCount--;         }         synchronized(pollLock) {             ++canPollCount;             pollLock.notifyAll();         }         return true;     }
           public Object poll() throws InterruptedException {         Object result = null;         synchronized(pollLock) {             while(canPollCount <= 0) {                 pollLock.wait();             }
              result = head.next.value;             head.next.value = null;             head = head.next;             canPollCount--;         }         synchronized(offerLock) {             canOfferCount++;             offerLock.notifyAll();         }         return result;     }      }
 
  | 
 
以上
- 定义了两把锁, pollLock 和 offerLock 拆分出队和入队竞争;
 
- 入队锁同步的变量为:callOfferCount 和 tail;
 
- 出队锁同步的变量为:canPollCount 和 head;
 
- 出队的动作:首先拿到 pollLock 卫式等待后,完成出队动作;然后拿到 offerLock 发送通知,解除入队的等待线程。
 
- 入队的动作:首先拿到 offerLock 卫式等待后,完成入队的动作;然后拿到 pollLock 发送通知,解除出队的等待线程。
 
以上实现
- 确保通过入队锁和出队锁,分别保证入队和出队的原子性;
 
- 出队动作,通过特别的实现,确保出队只会变更 head ,避免获取 offerLock;
 
- 通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题;
 
问题:当有多个入队线程等待时,一次出队的动作会触发所有入队线程竞争,大量的线程上下文切换,最终只有一个线程能执行。
即,还有 读与读 和 写与写 之间的竞争问题。
参考
《阿里技术博客》