Go运行时中的 Mutex

我在极客时间上开了一门面向中高级Go程序员的课程:Go 并发编程实战课,有读者问Go channel中的实现中使用了mutex,这个mutex和标准库中的Mutex有什么不同?正好在百度厂内分享Go相关课程中有同事也提出了相同的问题,所以我专门写一篇文章介绍一下。

成都创新互联于2013年开始,先为忻城等服务建站,忻城等地企业,进行企业商务咨询服务。为忻城企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

sync.Mutex是一个high level的同步原语,是为广大的Go开发者开发应用程序提供的一种数据结构,现在它的内部实现逻辑比较复杂了,包含spin和饥饿处理等逻辑,它底层使用了运行时的low level的一些函数和atomic的一些方法。

而运行时中的mutex是为运行时内部使用互斥锁而提供的一个同步原语,它提供了spin和等待队列,并没有去解决饥饿状态,而且它的实现和sync.Mutex的实现也是不一样的。它并没有以方法的方式提供Lock/Unlock,而是提供lock/unlock函数实现请求锁和释放锁。

Dan Scales 今年年初的时候又为运行时的锁增加了static locking rank的功能。他为运行时的架构无关的锁( architecture-independent locks)定义了rank,并且又定义了一些运行时的锁的偏序(此锁之前允许持有哪些锁)。这是运行时锁的一个巨大改变,但是很遗憾并没有一篇设计文档详细去描述这个功能的设计,你可以通过提交的comment(#0a820007)和代码中的注释去了解runtime内部锁的代码变化。

本质上来说,这个功能用来检查锁的顺序是不是按照文档设计的顺序执行的,如果有违反设定的顺序,就有可能死锁发生。因为缺乏准确的文档说明,并且这个功能主要是用来检查运行时锁的执行顺序的,所以在本文中我把这一段逻辑抹去不介绍了。实际Go运行时要开始这个检查的话,你需要设置变量GOEXPERIMENT=staticlockranking。

那么接下来我们看看运行时的mutex的数据结构的定义以及lock/unlock的实现。

运行时mutex数据结构

运行时的mutex数据结构很简单,如下所示,定义在runtime2.go中:

 
 
 
  1. type mutex struct { 
  2.     lockRankStruct 
  3.     // Futex-based impl treats it as uint32 key, 
  4.     // while sema-based impl as M* waitm. 
  5.     // Used to be a union, but unions break precise GC. 
  6.     key uintptr 

如果不启用lock ranking,其实lockRankStruct就是一个空结构:

 
 
 
  1. type lockRankStruct struct { 

那么对于运行时的mutex,最重要的就是key字段了。这个字段针对不同的架构有不同的含义。

对于dragonfly、freebsd、linux架构,mutex会使用基于Futex的实现, key就是一个uint32的值。 Linux提供的Futex(Fast user-space mutexes)用来构建用户空间的锁和信号量。Go 运行时封装了两个方法,用来sleep和唤醒当前线程:

  • futexsleep(addr uint32, val uint32, ns int64):原子操作`if addr == val { sleep }`。
  • futexwakeup(addr *uint32, cnt uint32):唤醒地址addr上的线程最多cnt次。

对于其他的架构,比如aix、darwin、netbsd、openbsd、plan9、solaris、windows,mutex会使用基于sema的实现,key就是M* waitm。Go 运行时封装了三个方法,用来创建信号量和sleep/wakeup:

  • func semacreate(mp *m):创建信号量
  • func semasleep(ns int64) int32: 请求信号量,请求不到会休眠一段时间
  • func semawakeup(mp *m):唤醒mp

基于这两种实现,分别有不同的lock和unlock方法的实现,主要逻辑都是类似的,所以接下来我们只看基于Futex的lock/unlock。

请求锁lock

如果不使用lock ranking特性,lock的逻辑主要是由lock2实现的。

 
 
 
  1. func lock(l *mutex) { 
  2.     lockWithRank(l, getLockRank(l)) 
  3. func lockWithRank(l *mutex, rank lockRank) { 
  4.     lock2(l) 
  5. func lock2(l *mutex) { 
  6.     // 得到g对象 
  7.     gp := getg() 
  8.     // g绑定的m对象的lock计数加1 
  9.     if gp.m.locks < 0 { 
  10.         throw("runtime·lock: lock count") 
  11.     } 
  12.     gp.m.locks++ 
  13.     // 如果有幸运光环,原来锁没有被持有,一把就获取到了锁,就快速返回了 
  14.     v := atomic.Xchg(key32(&l.key), mutex_locked) 
  15.     if v == mutex_unlocked { 
  16.         return 
  17.     } 
  18.     // 否则原来的可能是MUTEX_LOCKED或者MUTEX_SLEEPING 
  19.     wait := v 
  20.     // 单核不进行spin,多核CPU情况下会尝试spin 
  21.     spin := 0 
  22.     if ncpu > 1 { 
  23.         spin = active_spin 
  24.     } 
  25.      
  26.     for { 
  27.         // 尝试spin,如果锁已经释放,尝试抢锁 
  28.         for i := 0; i < spin; i++ { 
  29.             for l.key == mutex_unlocked { 
  30.                 if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { 
  31.                     return 
  32.                 } 
  33.             } 
  34.             // PAUSE 
  35.             procyield(active_spin_cnt) 
  36.         } 
  37.         // 再尝试抢锁, rescheduling. 
  38.         for i := 0; i < passive_spin; i++ { 
  39.             for l.key == mutex_unlocked { 
  40.                 if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { 
  41.                     return 
  42.                 } 
  43.             } 
  44.             osyield() 
  45.         } 
  46.         // 再尝试抢锁,并把key设置为mutex_sleeping,如果抢锁成功,返回 
  47.         v = atomic.Xchg(key32(&l.key), mutex_sleeping) 
  48.         if v == mutex_unlocked { 
  49.             return 
  50.         } 
  51.          
  52.         // 否则sleep等待 
  53.         wait = mutex_sleeping 
  54.         futexsleep(key32(&l.key), mutex_sleeping, -1) 
  55.     } 

unlock

如果不使用lock ranking特性,unlock的逻辑主要是由unlock2实现的。

 
 
 
  1. func unlock(l *mutex) { 
  2.     unlockWithRank(l) 
  3. func unlockWithRank(l *mutex) { 
  4.     unlock2(l) 
  5. func unlock2(l *mutex) { 
  6.     // 将key的值设置为mutex_unlocked 
  7.     v := atomic.Xchg(key32(&l.key), mutex_unlocked) 
  8.     if v == mutex_unlocked { 
  9.         throw("unlock of unlocked lock") 
  10.     } 
  11.     // 如果原来有线程在sleep,唤醒它 
  12.     if v == mutex_sleeping { 
  13.         futexwakeup(key32(&l.key), 1) 
  14.     } 
  15.     //得到当前的goroutine以及和它关联的m,将锁的计数减1 
  16.     gp := getg() 
  17.     gp.m.locks-- 
  18.     if gp.m.locks < 0 { 
  19.         throw("runtime·unlock: lock count") 
  20.     } 
  21.     if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack 
  22.         gp.stackguard0 = stackPreempt 
  23.     } 

总体来说,运行时的mutex逻辑还不太复杂,主要是需要处理不同的架构的实现,它休眠唤醒的对象是m,而sync.Mutex休眠唤醒的对象是g。

网站栏目:Go运行时中的 Mutex
分享链接:http://www.shufengxianlan.com/qtweb/news45/216745.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联