面试官:啥?SynchronousQueue是钟?点?房?

今天这篇文章,我们继续讲架构师大刘的故事。

大刘有段时间经常会给一些程序员讲课。这一方面是由于团队培训的需要,一方面也是大刘自身想搞搞凡尔赛,嘚瑟一下自身的实力。

大刘讲课是允许公司任何一个人进去听的。提前一个星期把主题公布在公司群里,有人想听到日子直接去就是了。

有一次,大刘在聊并发话题的时候,为了彰显自己确实是个并发达人,用了个 SynchronousQueue 举例子。他说这个队列其实没有容积的概念,就是线程持有数据互相匹配。

嗯,谈到这里还是要说一下,大刘其实也不太懂 SynchronousQueue。只是一来这东西没人用,自然就没人懂;二来它的概念也比较晦涩,有些时候比较违背直觉,所以,即使随口说的一些话可能不太对,也未必会被发现,还能给人一种不明觉厉的感觉。

大刘用过几次,感觉良好。因此没事儿就要秀一下 SynchronousQueue,表示自己这么生僻的也懂,并发达人的名头是没有叫错的。

也就那一次,恰恰被人拆了台。

当时课上来了个新入职的技术,此人长得中等身材,相貌平平,只是脸却长的像种地多年的老农的巴掌。脸上的疙瘩如同老农巴掌上的老茧。这人姓张,这里由于他脸长得像个大巴掌,那就暂且叫他巴掌张。

这个巴掌张打断了大刘的话,言之凿凿说大刘说的是错的,说他看过这个 SynchronousQueue,并不是大刘说的这样。

大刘有点心虚,脖子渗出了一圈汗,但是并发达人的称呼大刘并不想丢掉。于是说了一大堆云里雾里的废话,把话题带偏了开去。并告诉巴掌张,下回要和他在这个舞台上 PK 一二, 要好好看看谁是真正的 SynchronousQueue 的知心朋友。

由于大刘感觉被巴掌张的巴掌糊了脸,便就此下了决心要研究透 SynchronousQueue。

Google 和百度一起查,东西合璧,洋为中用,搞了好是一阵子。最后有个犄角旮旯的小破网站,有人说了这么一句话:

SynchronousQueue 的目的就是为了接头,为了匹配,当接上头了就双方合作愉快,整个工作完成。但是一旦在接头中,任何一方还没到达,那么另一方就必须阻塞着等待。

这句话一下子就敲开了大刘的脑壳,让聪明的智商重新占领了高地。

为啥这句话就点亮了大刘那本来已经像灯泡的脑袋了呢?因为大刘想起了他每次的面试经历,就和这个接头是一样的。

大刘每次去面试,都很规矩的提前赶到新公司。但是大部分情况,时间到了之后都需要等很长时间才开始面试。大刘那时候也年轻,只是以为领导忙,所以倒也恭恭敬敬的等着。

直到大刘自己当了领导,去面试别人的时候,被 HR 委婉的提醒了下,要让候选人等一会儿再过去,显的公司业务很忙,让候选人对公司保持一定的敬畏。那时候,大刘才知道这是一种 PUA 术……

大刘对照着自己的面试经历,一下就理解了 SynchronousQueue 的概念。

SynchronousQueue 本身是为了交接、匹配而存在的。当一个线程往 SynchronousQueue 放东西,发现没线程在等着拿,就给阻塞掉——这就像面试者来早了等面试官。

当一个线程去 SynchronousQueue 拿东西,发现没东西,就去等的时候——就像面试官来早了等面试者。

搞懂 SynchronousQueue 的时候,正是一个冬天,屋外面的寒风在虎虎生威,屋里面的大刘在熠熠生辉。

只是一个堂而皇之摆在 JDK 底层并发包中的队列结构,SynchronousQueue 当然没那么简单,里面还存在着亿点点细节。

所以,大刘在整体方向搞懂之后,开始研究起了细节。他要奋发,狠狠把巴掌张的嚣张气焰压下去,大刘要当公司技术的头牌。

回到现实里,SynchronousQueue 真正的目的就是为了让两个线程的工作结果进行交接。这没什么问题。但是,在这个交接中是需要严格保密的,没有人可以窥视。

嗯,没错,就和你约了女朋友去钟点房那样的不能被窥视。

好,围绕这个 SynchronousQueue 的钟点房,咱们通过源代码,来看这亿点点细节。

首先,钟点房严格保密,里面是多少人,就不能让人知道。所以,就不能让别人通过方法得到具体的数据。对于 SynchronousQueue 来说,自然就是通过 size() 你得不到什么信息。

 
 
 
 
  1. /** 
  2. * Always returns zero. 
  3. * A {@code SynchronousQueue} has no internal capacity. 
  4. * @return zero 
  5. */ 
  6. public int size() { 
  7.   return 0; 
  8.  
  9. /** 
  10. * Always returns {@code true}. 
  11. * A {@code SynchronousQueue} has no internal capacity. 
  12. * @return {@code true} 
  13. */ 
  14. public boolean isEmpty() { 
  15.   return true; 

其次,钟点房也不能随便进去查房,看看都是谁。所以,自然就不能迭代。

 
 
 
 
  1. /** 
  2. * Returns an empty iterator in which {@code hasNext} always returns 
  3. * {@code false}. 
  4. * @return an empty iterator 
  5. */ 
  6. public Iterator iterator() { 
  7.   return Collections.emptyIterator(); 

再次,钟点房保护隐私,它也不能让你钻了漏子,不告诉你 XXX 是不是躲在了钟点房里。所以,你也不能知道钟点房里有没有这个人。

 
 
 
 
  1. /** 
  2. * Always returns {@code false}. 
  3. * A {@code SynchronousQueue} has no internal capacity. 
  4. * @param o the element 
  5. * @return {@code false} 
  6. */ 
  7. public boolean contains(Object o) { 
  8.   return false; 
  9.  
  10. /** 
  11. * Returns {@code false} unless the given collection is empty. 
  12. * A {@code SynchronousQueue} has no internal capacity. 
  13. * @param c the collection 
  14. * @return {@code false} unless given collection is empty 
  15. */ 
  16. public boolean containsAll(Collection c) { 
  17.   return c.isEmpty(); 

自然,钟点房也没什么权力赶人出去。

 
 
 
 
  1. /** 
  2. * Always returns {@code false}. 
  3. * A {@code SynchronousQueue} has no internal capacity. 
  4. * @param o the element to remove 
  5. * @return {@code false} 
  6. */ 
  7. public boolean remove(Object o) { 
  8.   return false; 

当然,作为一个商业化的钟点房,SynchronousQueue 还是很注意安全的,它贴心的提供了紧急转移的手段。

 
 
 
 
  1. /** 
  2. * @throws UnsupportedOperationException {@inheritDoc} 
  3. * @throws ClassCastException            {@inheritDoc} 
  4. * @throws NullPointerException          {@inheritDoc} 
  5. * @throws IllegalArgumentException      {@inheritDoc} 
  6. */ 
  7. public int drainTo(Collection c) { 
  8.   if (c == null) 
  9.     throw new NullPointerException(); 
  10.   if (c == this) 
  11.     throw new IllegalArgumentException(); 
  12.    
  13.   int n = 0; 
  14.     for (E e; (e = poll()) != null;) { 
  15.       c.add(e); 
  16.       ++n; 
  17.     } 
  18.   return n; 
  19.  
  20. /**  
  21. * @throws UnsupportedOperationException {@inheritDoc} 
  22. * @throws ClassCastException            {@inheritDoc} 
  23. * @throws NullPointerException          {@inheritDoc} 
  24. * @throws IllegalArgumentException      {@inheritDoc} 
  25. */ 
  26. public int drainTo(Collection c, int maxElements) { 
  27.   if (c == null) 
  28.     throw new NullPointerException(); 
  29.   if (c == this) 
  30.     throw new IllegalArgumentException(); 
  31.    
  32.   int n = 0; 
  33.     for (E e; n < maxElements && (e = poll()) != null;) { 
  34.       c.add(e); 
  35.       ++n; 
  36.     } 
  37.   return n; 

最后,钟点房就只能搞搞交接工作了。交接吗,自然是有交有接的,交的就得带东西。

 
 
 
 
  1. public void put(E e) throws InterruptedException { 
  2.   if (e == null) throw new NullPointerException(); 
  3.   // put:带着东西进屋子 
  4.   if (transferer.transfer(e, false, 0) == null) { 
  5.     Thread.interrupted(); 
  6.     throw new InterruptedException(); 
  7.   } 

接的肯定不会带着东西,得留地方拿东西。

 
 
 
 
  1. public E take() throws InterruptedException { 
  2.   // take:从屋子里把东西拿出来 
  3.   E e = transferer.transfer(null, false, 0); 
  4.   if (e != null) 
  5.     return e; 
  6.   Thread.interrupted(); 
  7.   throw new InterruptedException(); 

但是呢,这交接工作啊,得在专人安排下进行。

为什么需要专人来帮忙?因为有时候我们的钟点房太受欢迎了,客人多,得排队管管。管这些排队的就是 Transfer,它是钟点房的经理。

 
 
 
 
  1. /** 
  2. * The transferer. Set only in constructor, but cannot be declared 
  3. * as final without further complicating serialization.  Since 
  4. * this is accessed only at most once per public method, there 
  5. * isn't a noticeable performance penalty for using volatile 
  6. * instead of final here. 
  7. */ 
  8. private transient volatile Transferer transferer; 
  9.  
  10. /** 
  11. * Shared internal API for dual stacks and queues. 
  12. */ 
  13. abstract static class Transferer { 
  14.   /** 
  15.   * Performs a put or take. 
  16.   * 
  17.   * @param e if non-null, the item to be handed to a consumer; 
  18.   * if null, requests that transfer return an item 
  19.   * offered by producer. 
  20.   * @param timed if this operation should timeout 
  21.   * @param nanos the timeout, in nanoseconds 
  22.   * @return if non-null, the item provided or received; if null, 
  23.   * the operation failed due to timeout or interrupt -- 
  24.   * the caller can distinguish which of these occurred 
  25.   * by checking Thread.interrupted. 
  26.   */ 
  27.   abstract E transfer(E e, boolean timed, long nanos); 

Transfer 经理每次开门营业的时候,会收到总部给的牌子,告诉他管理工作要注意方式方法,比如公平有效,比如优先服务 VIP 客人之类的。

 
 
 
 
  1. /** 
  2. * 默认给vip客人开点后门 
  3. */ 
  4. public SynchronousQueue() { 
  5.   this(false); 
  6.  
  7. /** 
  8. * 总部递牌子,告诉Transfer到底是公平还是不公平, 
  9. */ 
  10. public SynchronousQueue(boolean fair) { 
  11.   transferer = fair ? new TransferQueue() : new TransferStack(); 

先看看适合劳苦大众的公平模式,先来先享受,晚来没折扣。

 
 
 
 
  1. static final class TransferQueue extends Transferer { 
  2.   static final class QNode{...} 
  3.   transient volatile QNode head;     
  4.   transient volatile QNode tail; 
  5.   transient volatile QNode cleanMe; 
  6.   TransferQueue() { 
  7.  //经典的链表套路,先搞个虚拟的头结点 
  8.     QNode h = new QNode(null, false);  
  9.     head = h; 
  10.     tail = h; 
  11.   } 
  12.   …… 
  13.   …… 

QNode 就是 Transfer 经理需要的牌子,上面记录点信息,别到时候弄错了。

 
 
 
 
  1. static final class QNode { 
  2.   volatile QNode next; // 下一个排队的哥们儿 
  3.   volatile Object item; // 这次哥们带来的要交接的东西 
  4.   volatile Thread waiter; // 交接的线程 
  5.   final boolean isData; // isData == true表示带着东西 
  6.  
  7.   QNode(Object item, boolean isData) { 
  8.     this.item = item; 
  9.     this.isData = isData; 
  10.   } 
  11.   
  12.   // ...省略一系列CAS方法 

怎么搞,秘密都在 transfer() 里。

 
 
 
 
  1. @SuppressWarnings("unchecked") 
  2.   E transfer(E e, boolean timed, long nanos) { 
  3.   //...先省略细节         

transfer 本质就是一直在等待交接完成或者交接被中断,被取消,或者等待超时。

 
 
 
 
  1. for (;;) { 
  2.   QNode t = tail; 
  3.   QNode h = head; 
  4.  //因为初始化是在构造函数里搞得,可能构造函数没有执行完,就被用上了,就会出现t或者h为null的情况 
  5.   if (t == null || h == null)          
  6.     continue; //啥也不能做 
  7.      
  8.  //h==t表示没人,t.isData == isData表示过来的哥们和前面的哥们目的一样,那就只能考虑排队等着了。 
  9.   if (h == t || t.isData == isData) {  
  10.     QNode tn = t.next; 
  11.     //线程不安全需要考虑的,现在的尾巴不对,指错了,重新确认下 
  12.   if (t != tail)                   
  13.       continue; 
  14.        
  15.   //队尾确定了,发现又来了人,把尾巴指向新来的人 
  16.     if (tn != null) {              
  17.       advanceTail(t, tn); 
  18.       continue; 
  19.     } 
  20.    
  21.     //超时了,别等了 
  22.     if (timed && nanos <= 0) 
  23.       return null; 
  24.        
  25.   //总算没事儿了,哥们可以登记进屋了 
  26.     if (s == null) 
  27.       s = new QNode(e, isData); 
  28.        
  29.   //中间可能有人插队,只能再等等 
  30.     if (!t.casNext(null, s))         
  31.       continue; 
  32.      
  33.   //准备进屋等着约的人 
  34.     advanceTail(t, s);               
  35.     Object x = awaitFulfill(s, e, timed, nanos); 
  36.      
  37.   //同一个人出来,那就是任务失败了 
  38.     if (x == s) { 
  39.       //清理下                    
  40.       clean(t, s); 
  41.       return null; 
  42.     } 
  43.      
  44.     if (!s.isOffList()) { //还没脱队 
  45.       advanceHead(t, s); //排前面单独处理 
  46.       if (x != null) //交接成功设一下标记 
  47.         s.item = s; 
  48.         s.waiter = null; 
  49.     } 
  50.      
  51.     return (x != null) ? (E)x : e; 

这段是不是看着很头痛?其实 Transfer 这小子也头痛。

它首先要面临的第一个问题:资源竞争的问题。

客人源源不断的来,由于 Transfer 强迫症,他想每次必须从绝对的队头或者队尾巴开始,所以,每次都要判断下,到底他看到的队头或者队尾,是不是真正的队头、队尾。

确定没问题了,新来的客人就开始被打造成真正的队尾。

然后,成为队尾的哥们就可以等着属于自己的 Mr.Right 过来交接了。等着交接一直到成功或者失败的方法就是 awaitFulfill(t, tn)。

这边有人在等待,同时另外一边,交接的人们也开始陆续过来了。

 
 
 
 
  1. else { // complementary-mode 
  2.   QNode m = h.next; // node to fulfill 
  3.   if (t != tail || m == null || h != head) 
  4.     continue; // inconsistent read 
  5.  
  6.     Object x = m.item; 
  7.     if (isData == (x != null) || // m already fulfilled 
  8.       x == m || // m cancelled 
  9.       !m.casItem(x, e)) { // 交接的核心语句 
  10.         advanceHead(h, m); // dequeue and retry 
  11.         continue; 
  12.       } 
  13.  
  14.   advanceHead(h, m); // successfully fulfilled 
  15.   LockSupport.unpark(m.waiter); 
  16.   return (x != null) ? (E)x : e; 

交接最核心的其实就是 m.casItem(x, e)。交接成功,大家各回各家了。

整体的流程如下:

开始就是个经典链表开局,head = tail

陆续开始有节点链接,put 的时候,isData = true;take 的时候,isData = false

可能会同时有很多的 put 操作,没有对应的 take 操作,他们就按照次序一个个链接起来,形成链表,并通过 awaitFulfill 方法等着对应的 take

也可能同时会有很多的 take 操作,而没有对应的 put 操作,会形成链表,并通过 awaitFulfill 方法等着对应的 put

take 操作会从链表头开始找匹配的 put,然后通过 casItem 方法交接

put 操作会从链表头开始找匹配的 take,然后通过 casItem 方法交接

所以,SynchronousQueue 你可以看到了,专门就是搞交接任务。

  • put 的哥们发现没人 take,就等在那里,等着take操作。
  • take的哥们儿发现没人put,也会等在那里,等着put操作。

这就是我们的 SynchronousQueue 钟点房做的事情。

OK,钟点房既然开门做生意,它也要赚钱的嘛。所以,它还得搞搞 VIP 客户收费,也得为 VIP 客户搞一些优待。

对于这些 VIP 客人,我们的 Transfer 经理会特意安排下,以栈的形式来安排客人,越后来的客人越大牌儿。所以,自然是后来的客人会优先搞定交接了。这里简短的介绍下,就不再赘述了。

Transfer 化身成 TransferStack,后来的优先服务。

开始自然是链表开局,一个无意义的链表头指向了 null

发现链表是空了,二话不说,客官,您进来先啦

和 TransferQueue 一样,如果都是 take 过来,模式就是 REQUEST,就得排队了

交接人出现,哥们可以收摊儿了

其余的不说了,一样的,说多了没劲

话说,大刘搞清楚了这些细节之后,次日,当巴掌张再次进行挑衅时,大刘彻底稳下来了。

当挨个把细节讲的一清二楚之后,看着巴掌张那张落寞的巴掌脸,瞬间也不觉得像巴掌了,而是像是在猜拳中出的石头剪刀布中的布。大刘没忍住,对着这个布比划出了个剪刀,光荣的结束了战斗。

大刘依然在技术流中独占鳌头。

我们下篇大刘的故事见。

本文转载自微信公众号「四猿外」,可以通过以下二维码关注。转载本文请联系四猿外公众号

网站栏目:面试官:啥?SynchronousQueue是钟?点?房?
文章地址:http://www.shufengxianlan.com/qtweb/news30/475780.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联