您好,登录后才能下订单哦!
本篇内容介绍了“Semaphore的原理和实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
// Go 语言中暴露的 semaphore 实现 // 具体的用法是提供 sleep 和 wakeup 原语 // 以使其能够在其它同步原语中的竞争情况下使用 // 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的 // 只不过语义上更简单一些 // // 也就是说,不要认为这些是信号量 // 把这里的东西看作 sleep 和 wakeup 实现的一种方式 // 每一个 sleep 都会和一个 wakeup 配对 // 即使在发生 race 时,wakeup 在 sleep 之前时也是如此 // // See Mullender and Cox, ``Semaphores in Plan 9,'' // http://swtch.com/semaphore.pdf // 为 sync.Mutex 准备的异步信号量 // semaRoot 持有一棵 地址各不相同的 sudog(s.elem) 的平衡树 // 每一个 sudog 都反过来指向(通过 s.waitlink)一个在同一个地址上等待的其它 sudog 们 // 同一地址的 sudog 的内部列表上的操作时间复杂度都是 O(1)。顶层 semaRoot 列表的扫描 // 的时间复杂度是 O(log n),n 是被哈希到同一个 semaRoot 的不同地址的总数,每一个地址上都会有一些 goroutine 被阻塞。 // 访问 golang.org/issue/17953 来查看一个在引入二级列表之前性能较差的程序样例,test/locklinear.go // 中有一个复现这个样例的测试 type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32 // Number of waiters. Read w/o the lock. } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte } func semroot(addr *uint32) *semaRoot { return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root }
┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐ │ 0 │ 1 │ 2 │ 3 │ 4 │ ..... │ 250 │ └─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘ │ │ │ │ └──┐ └─┐ │ │ │ │ ▼ ▼ ┌─────────┐ ┌─────────┐ │ struct │ │ struct │ ├─────────┴─────────┐ ├─────────┴─────────┐ │ root semaRoot │──┐ │ root semaRoot │──┐ ├───────────────────┤ │ ├───────────────────┤ │ │ pad │ │ │ pad │ │ └───────────────────┘ │ └───────────────────┘ │ │ │ ┌────────────────┘ ┌────────────────┘ │ │ │ │ ▼ ▼ ┌──────────┐ ┌──────────┐ │ semaRoot │ │ semaRoot │ ├──────────┴────────┐ ├──────────┴────────┐ │ lock mutex │ │ lock mutex │ ├───────────────────┤ ├───────────────────┤ │ treap *sudog │ │ treap *sudog │ ├───────────────────┤ ├───────────────────┤ │ nwait uint32 │ │ nwait uint32 │ └───────────────────┘ └───────────────────┘
treap 结构:
┌──────────┐ ┌─┬─▶│ sudog │ │ │ ├──────────┴────────────┐ ┌─────────────────────┼─┼──│ prev *sudog │ │ │ │ ├───────────────────────┤ │ │ │ │ next *sudog │────┐ │ │ │ ├───────────────────────┤ │ │ │ │ │ parent *sudog │ │ │ │ │ ├───────────────────────┤ │ │ │ │ │ elem unsafe.Pointer │ │ │ │ │ ├───────────────────────┤ │ │ │ │ │ ticket uint32 │ │ │ │ │ └───────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ▼ ┌──────────┐ │ │ ┌──────────┐ │ sudog │ │ │ │ sudog │ ├──────────┴────────────┐ │ │ ├──────────┴────────────┐ │ prev *sudog │ │ │ │ prev *sudog │ ├───────────────────────┤ │ │ ├───────────────────────┤ │ next *sudog │ │ │ │ next *sudog │ ├───────────────────────┤ │ │ ├───────────────────────┤ │ parent *sudog │───┘ └─────────────────────────│ parent *sudog │ ├───────────────────────┤ ├───────────────────────┤ │ elem unsafe.Pointer │ │ elem unsafe.Pointer │ ├───────────────────────┤ ├───────────────────────┤ │ ticket uint32 │ │ ticket uint32 │ └───────────────────────┘ └───────────────────────┘
在这个 treap 结构里,从 elem 的视角(其实就是 lock 的 addr)来看,这个结构是个二叉搜索树。从 ticket 的角度来看,整个结构就是一个小顶堆。
所以才叫树堆(treap)。
相同 addr,即对同一个 mutex 上锁的 g,会阻塞在同一个地址上。这些阻塞在同一个地址上的 goroutine 会被打包成 sudog,组成一个链表。用 sudog 的 waitlink 相连:
┌──────────┐ ┌──────────┐ ┌──────────┐ │ sudog │ ┌─────▶│ sudog │ ┌─────▶│ sudog │ ├──────────┴────────────┐ │ ├──────────┴────────────┐ │ ├──────────┴────────────┐ │ waitlink *sudog │─────┘ │ waitlink *sudog │──────┘ │ waitlink *sudog │ ├───────────────────────┤ ├───────────────────────┤ ├───────────────────────┤ │ waittail *sudog │ │ waittail *sudog │ │ waittail *sudog │ └───────────────────────┘ └───────────────────────┘ └───────────────────────┘
中间的元素的 waittail 都会指向最后一个元素:
┌──────────┐ │ sudog │ ├──────────┴────────────┐ │ waitlink *sudog │ ├───────────────────────┤ │ waittail *sudog │───────────────────────────────────────────────────────────┐ └───────────────────────┘ │ ┌──────────┐ │ │ sudog │ │ ├──────────┴────────────┐ │ │ waitlink *sudog │ │ ├───────────────────────┤ │ │ waittail *sudog │─────────────────────────┤ └───────────────────────┘ ▼ ┌──────────┐ │ sudog │ ├──────────┴────────────┐ │ waitlink *sudog │ ├───────────────────────┤ │ waittail *sudog │ └───────────────────────┘
对外封装
在 sema.go 里实现的内容,用 go:linkname 导出给 sync、poll 库来使用,也是在链接期做了些手脚:
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile) } //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool) { semrelease1(addr, handoff) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile) } //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease func poll_runtime_Semrelease(addr *uint32) { semrelease(addr) }
sem 本身支持 acquire 和 release,其实就是 OS 里常说的 P 操作和 V 操作。
func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
type semaProfileFlags int const ( semaBlockProfile semaProfileFlags = 1 << iota semaMutexProfile ) // Called from runtime. func semacquire(addr *uint32) { semacquire1(addr, false, 0) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // 低成本的情况 if cansemacquire(addr) { return } // 高成本的情况: // 增加 waiter count 的值 // 再尝试调用一次 cansemacquire,成本了就直接返回 // 没成功就把自己作为一个 waiter 入队 // sleep // (之后 waiter 的 descriptor 被 signaler 用 dequeue 踢出) s := acquireSudog() root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 for { lock(&root.lock) // 给 nwait 加一,这样后来的就不会在 semrelease 中进低成本的路径了 atomic.Xadd(&root.nwait, 1) // 检查 cansemacquire 避免错过了唤醒 if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // 在 cansemacquire 之后的 semrelease 都可以知道我们正在等待 // (上面设置了 nwait),所以会直接进入 sleep // 注: 这里说的 sleep 其实就是 goparkunlock root.queue(addr, s, lifo) goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3) } releaseSudog(s) }
func semrelease(addr *uint32) { semrelease1(addr, false) } func semrelease1(addr *uint32, handoff bool) { root := semroot(addr) atomic.Xadd(addr, 1) // 低成本情况: 没有 waiter? // 这个 atomic 的检查必须发生在 xadd 之前,以避免错误唤醒 // (具体参见 semacquire 中的循环) if atomic.Load(&root.nwait) == 0 { return } // 高成本情况: 搜索 waiter 并唤醒它 lock(&root.lock) if atomic.Load(&root.nwait) == 0 { // count 值已经被另一个 goroutine 消费了 // 所以我们不需要唤醒其它 goroutine 了 unlock(&root.lock) return } s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // 可能会很慢,所以先解锁 acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5) } } func readyWithTime(s *sudog, traceskip int) { if s.releasetime != 0 { s.releasetime = cputicks() } goready(s.g, traceskip) }
sudog 按照地址 hash 到 251 个 bucket 中的其中一个,每一个 bucket 都是一棵 treap。而相同 addr 上的 sudog 会形成一个链表。
为啥同一个地址的 sudog 不需要展开放在 treap 中呢?显然,sudog 唤醒的时候,block 在同一个 addr 上的 goroutine,说明都是加的同一把锁,这些 goroutine 被唤醒肯定是一起被唤醒的,相同地址的 g 并不需要查找才能找到,只要决定是先进队列的被唤醒(fifo)还是后进队列的被唤醒(lifo)就可以了。
// queue 函数会把 s 添加到 semaRoot 上阻塞的 goroutine 们中 // 实际上就是把 s 添加到其地址对应的 treap 上 func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.g = getg() s.elem = unsafe.Pointer(addr) s.next = nil s.prev = nil var last *sudog pt := &root.treap for t := *pt; t != nil; t = *pt { if t.elem == unsafe.Pointer(addr) { // Already have addr in list. if lifo { // treap 中在 t 的位置用 s 覆盖掉 t *pt = s s.ticket = t.ticket s.acquiretime = t.acquiretime s.parent = t.parent s.prev = t.prev s.next = t.next if s.prev != nil { s.prev.parent = s } if s.next != nil { s.next.parent = s } // 把 t 放在 s 的 wait list 的第一个位置 s.waitlink = t s.waittail = t.waittail if s.waittail == nil { s.waittail = t } t.parent = nil t.prev = nil t.next = nil t.waittail = nil } else { // 把 s 添加到 t 的等待列表的末尾 if t.waittail == nil { t.waitlink = s } else { t.waittail.waitlink = s } t.waittail = s s.waitlink = nil } return } last = t if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { pt = &t.prev } else { pt = &t.next } } // 把 s 作为树的新的叶子插入进去 // 平衡树使用 ticket 作为堆的权重值,这个 ticket 是随机生成的 // 也就是说,这个结构以元素地址来看的话,是一个二叉搜索树 // 同时用 ticket 值使其同时又是一个小顶堆,满足 // s.ticket <= both s.prev.ticket and s.next.ticket. // https://en.wikipedia.org/wiki/Treap // http://faculty.washington.edu/aragon/pubs/rst89.pdf // // s.ticket 会在一些地方和 0 相比,因此只设置最低位的 bit // 这样不会明显地影响 treap 的质量? s.ticket = fastrand() | 1 s.parent = last *pt = s // 按照 ticket 来进行旋转,以满足 treap 的性质 for s.parent != nil && s.parent.ticket > s.ticket { if s.parent.prev == s { root.rotateRight(s.parent) } else { if s.parent.next != s { panic("semaRoot queue") } root.rotateLeft(s.parent) } } } // dequeue 会搜索到阻塞在 addr 地址的 semaRoot 中的第一个 goroutine // 如果这个 sudog 需要进行 profile,dequeue 会返回它被唤醒的时间(now),否则的话 now 为 0 func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { ps := &root.treap s := *ps for ; s != nil; s = *ps { if s.elem == unsafe.Pointer(addr) { goto Found } if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) { ps = &s.prev } else { ps = &s.next } } return nil, 0 Found: now = int64(0) if s.acquiretime != 0 { now = cputicks() } if t := s.waitlink; t != nil { // 替换掉同样在 addr 上等待的 t。 *ps = t t.ticket = s.ticket t.parent = s.parent t.prev = s.prev if t.prev != nil { t.prev.parent = t } t.next = s.next if t.next != nil { t.next.parent = t } if t.waitlink != nil { t.waittail = s.waittail } else { t.waittail = nil } t.acquiretime = now s.waitlink = nil s.waittail = nil } else { // 向下旋转 s 到叶节点,以进行删除,同时要考虑优先级 for s.next != nil || s.prev != nil { if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket { root.rotateRight(s) } else { root.rotateLeft(s) } } // Remove s, now a leaf. // 删除 s,现在是叶子节点了 if s.parent != nil { if s.parent.prev == s { s.parent.prev = nil } else { s.parent.next = nil } } else { root.treap = nil } } s.parent = nil s.elem = nil s.next = nil s.prev = nil s.ticket = 0 return s, now }
“Semaphore的原理和实现方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。