Golang锁原理如何实现

发布时间:2023-03-15 14:20:49 作者:iii
来源:亿速云 阅读:125

这篇文章主要介绍了Golang锁原理如何实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Golang锁原理如何实现文章都会有所收获,下面我们一起来看看吧。

什么是锁

为什么使用锁

并发编程中保证数据一致性和安全性的

Golang中的锁

Golang锁原理如何实现

Golang的提供的同步机制有sync模块下的Mutex、WaitGroup以及语言自身提供的chan等。 这些同步的方法都是以runtime中实现的底层同步机制(cas、atomic、spinlock、sem)为基础的

1. cas、atomic

cas(Compare And Swap)和原子运算是其他同步机制的基础

2. 自旋锁(spinlock)

自旋锁是指当一个线程在获取锁的时候,如果锁已经被其他线程获取,那么该线程将循环等待,然后不断地判断是否能够被成功获取,知直到获取到锁才会退出循环。获取锁的线程一直处于活跃状态
Golang中的自旋锁用来实现其他类型的锁,与互斥锁类似,不同点在于,它不是通过休眠来使进程阻塞,而是在获得锁之前一直处于活跃状态(自旋)

3. 信号量

实现休眠和唤醒协程的一种方式

信号量有两个操作P和V
P(S):分配一个资源
1. 资源数减1:S=S-1
2. 进行以下判断
    如果S<0,进入阻塞队列等待被释放
    如果S>=0,直接返回

V(S):释放一个资源
1. 资源数加1:S=S+1
2. 进行如下判断
    如果S>0,直接返回
    如果S<=0,表示还有进程在请求资源,释放阻塞队列中的第一个等待进程
    
golang中信号量操作:runtime/sema.go
P操作:runtime_Semacquire
V操作:runtime_Semrelease

mutex的使用

package main

import (
    "fmt"
    "sync"
)

var num int
var mtx sync.Mutex
var wg sync.WaitGroup

func add() {
    mtx.Lock()  //mutex实例无需实例化,声明即可使用

    defer mtx.Unlock()
    defer wg.Done()

    num += 1
}

func main() {
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go add()
    }

    wg.Wait()

    fmt.Println("num:", num)
}

mutex的必要性

锁在高度竞争时会不断挂起恢复线程从而让出cpu资源,原子变量在高度竞争时会一直占用cpu;原子操作时线程级别的,不支持协程

mutex演进

1. 互斥锁

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota
    mutexWoken
    mutexWaiterShift = iota  //根据 mutex.state >> mutexWaiterShift 得到当前等待的 goroutine 数目
)

state表示当前锁的状态,是一个共用变量

state:  |32|31|....|3|2|1|
         \__________/ | |
               |      | |
               |      | 当前mutex是否加锁
               |      |
               |      当前mutex是否被唤醒
               |
               等待队列的goroutine协程数

Lock 方法申请对 mutex 加锁的时候分两种情况

//如果已经加锁,那么当前协程进入休眠阻塞,等待唤醒
func (m *Mutex) Lock() {

    // 快速加锁:CAS更新state为locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false //当前goroutine是否被唤醒
    for {
        old := m.state // 保存当前state的状态
        new := old | mutexLocked // 新值locked位设置为1
        // 如果当前处于加锁状态,新到来的goroutine进入等待队列
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            //如果被唤醒,新值需要重置woken位为 0
            new &^= mutexWoken
        }
        
        // 两种情况会走到这里:1.休眠中被唤醒 2.加锁失败进入等待队列
        // CAS 更新,如果更新失败,说明有别的协程抢先一步,那么重新发起竞争。
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果更新成功,有两种情况
            // 1.如果为 1,说明当前 CAS 是为了更新 waiter 计数
            // 2.如果为 0,说明是抢锁成功,那么直接 break 退出。
            if old&mutexLocked == 0 { 
                break
            }
            runtime_Semacquire(&m.sema) // 此时如果 sema <= 0 那么阻塞在这里等待唤醒,也就是 park 住。走到这里都是要休眠了。
            awoke = true  // 有人释放了锁,然后当前 goroutine 被 runtime 唤醒了,设置 awoke true
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

UnLock 解锁分两步

//锁没有和某个特定的协程关联,可以由一个协程lock,另一个协程unlock
func (m *Mutex) Unlock() {
    if raceenabled {
        _ = m.state
        raceRelease(unsafe.Pointer(m))
    }

    // CAS更新state的状态为locked 注意:解锁的瞬间可能会有新的协程到来并抢到锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 释放了一个没上锁的锁会panic:原先的lock位为0
    if (new+mutexLocked)&mutexLocked == 0 { 
        panic("sync: unlock of unlocked mutex")
    }
    
    //判断是否需要释放资源
    old := new
    for {
        /**
         * 不需要唤醒的情况
         * 1.等待队列为0
         * 2.已经有协程抢到锁(上面的瞬间抢锁)
         * 3.已经有协程被唤醒
         */
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        //将waiter计数位减一,并设置state为woken(唤醒)
        //问:会同时有多个被唤醒的协程存在吗
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema) // cas成功后,再做sema release操作,唤醒休眠的 goroutine
            return
        }
        old = m.state
    }
}

知识点

使用&来判断位值,使用|来设置位值,使用&^来清空位置(内存对齐)

一代互斥锁的问题

处于休眠中的goroutine优先级低于当前活跃的,unlock解锁的瞬间最新的goroutine会抢到锁
大多数果锁的时间很短,所有的goroutine都要休眠,增加runtime调度开销

2. 自旋锁

Lock 方法申请对 mutex 加锁的时候分三种情况

func (m *Mutex) Lock() {
    //快速加锁,逻辑不变
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 { // 如果当前己经上锁,那么判断是否可以自旋
            //短暂的自旋过后如果无果,就只能通过信号量让当前goroutine进入休眠等待了
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                /**
                 * 自旋的操作:设置state为woken,这样在unlock的时候就不会唤醒其他协程.
                 * 自旋的条件:
                 * 1.当前协程未被唤醒 !awoke
                 * 2.其他协程未被唤醒 old&mutexWoken == 0
                 * 3.等待队列大于0
                 */
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                //进行自旋操作
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShit
        }
        if awoke {
            //todo 为什么加这个判断
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

path: runtime/proc.go

const (
        mutex_unlocked = 0
        mutex_locked   = 1
        mutex_sleeping = 2

        active_spin     = 4
        active_spin_cnt = 30
        passive_spin    = 1
)

/**
 * 有四种情况会返回false
 * 1.已经执行了很多次 iter >= active_spin 默认为4。避免长时间自旋浪费CPU
 * 2.是单核CPU ncpu <= 1 || GOMAXPROCS < 1 保证除了当前运行的Goroutine之外,还有其他的Goroutine在运行
 * 3.没有其他正在运行的p
 * 4 当前P的G队列为空 避免自旋锁等待的条件是由当前p的其他G来触发,这样会导致再自旋变得没有意义,因为条件永远无法触发
 */
func sync_runtime_canSpin(i int) bool {
        // sync.Mutex is cooperative, so we are conservative with spinning.
        // Spin only few times and only if running on a multicore machine and
        // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
        // As opposed to runtime mutex we don't do passive spinning here,
        // because there can be work on global runq or on other Ps.
        if i >= active_spin || ncpu <= 1 || gomaxprocs <=
        int32(sched.npidle+sched.nmspinning)+1 {
                return false
        }
        if p := getg().m.p.ptr(); !runqempty(p) {
                return false
        }
        return true
}

// 自旋逻辑
// procyeld函数内部循环调用PAUSE指令,PAUSE指令什么都不做,但是会消耗CPU时间
// 在这里会执行30次PAUSE指令消耗CPU时间等待锁的释放;
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

问题:

3. 公平锁

基本逻辑

LOCK流程:

type Mutex struct {
        state int32 
        sema  **uint32**
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
        Lock()
        Unlock()
}

//为什么使用位掩码表达式
//第3位到第32位表示等待在mutex上协程数量
const (
        mutexLocked = 1 << iota // mutex is locked 
        mutexWoken                                  
        mutexStarving           //新增饥饿状态
        mutexWaiterShift = iota                     
        starvationThresholdNs = 1e6 //饥饿状态的阈值:等待时间超过1ms就会进入饥饿状态
)


func (m *Mutex) Lock() {
        //快速加锁:逻辑不变
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
                if race.Enabled {
                        race.Acquire(unsafe.Pointer(m))
                }
                return
        }

        var waitStartTime int64 //等待时间
        starving := false   //饥饿标记
        awoke := false      //唤醒标记
        iter := 0           //循环计数器
        old := m.state      //保存当前锁状态
        for {
            // 自旋的时候增加了一个判断:如果处于饥饿状态就不进入自旋,因为饥饿模式下,释放的锁会直接给等待队列的第一个,当前协程直接进入等待队列
                if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                                awoke = true
                        }
                        runtime_doSpin()
                        iter++
                        old = m.state
                        continue
                }
                new := old
                // 当mutex不处于饥饿状态的时候,将new值设置为locked,也就是说如果是饥饿状态,新到来的goroutine直接排队
                if old&mutexStarving == 0 {
                        new |= mutexLocked
                }
                // 当mutex处于加锁锁或者饥饿状态时,新到来的goroutine进入等待队列
                if old&(mutexLocked|mutexStarving) != 0 {
                        new += 1 << mutexWaiterShift
                }
                // 当等待时间超过阈值,当前goroutine切换mutex为饥饿模式,如果未加锁,就不需要切换
                if starving && old&mutexLocked != 0 {
                        new |= mutexStarving
                }
                if awoke {
                        if new&mutexWoken == 0 {
                                throw("sync: inconsistent mutex state")
                        }
                        new &^= mutexWoken
                }
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    // mutex 处于未加锁,正常模式下,当前 goroutine 获得锁
                        if old&(mutexLocked|mutexStarving) == 0 {
                                break // locked the mutex with CAS
                        }
                        // 如果已经在排队了,就排到队伍的最前面
                        queueLifo := waitStartTime != 0
                        if waitStartTime == 0 {
                                waitStartTime = runtime_nanotime()
                        }
                        // queueLifo 为真的时候,当前goroutine会被放到队头,
                        // 也就是说被唤醒却没抢到锁的goroutine放到最前面
                        runtime_SemacquireMutex(&m.sema, queueLifo)
                        // 当前goroutine等待时间超过阈值,切换为饥饿模式,starving设置为true
                        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                        old = m.state
                        //如果当前是饥饿模式
                        if old&mutexStarving != 0 {
                                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                                        throw("sync: inconsistent mutex state")
                                }
                                // 如果切换为饥饿模式,等待队列计数减1
                                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                                // 如果等待时间小于1ms或者自己是最后一个被唤醒的,退出饥饿模式
                                if !starving || old>>mutexWaiterShift == 1 {
                                        delta -= mutexStarving
                                }
                                atomic.AddInt32(&m.state, delta)
                                break
                        }
                        awoke = true
                        iter = 0
                } else {
                        old = m.state
                }
        }

        if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
        }
}

UnLock 解锁分两步

func (m *Mutex) Unlock() {
        if race.Enabled {
                _ = m.state
                race.Release(unsafe.Pointer(m))
        }

        new := atomic.AddInt32(&m.state, -mutexLocked)
        if (new+mutexLocked)&mutexLocked == 0 {
                throw("sync: unlock of unlocked mutex")
        }

        if new&mutexStarving == 0 {
        // 正常模式
                old := new
                for {
                    /**
             * 不需要唤醒的情况
             * 1.等待队列为0
             * 2.已经有协程抢到锁(上面的瞬间抢锁)
             * 3.已经有协程被唤醒
             * 4.处于饥饿模式 在饥饿模式获取到锁的协程仍然处于饥饿状态,新的goroutine无法获取到锁
             */
                        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                                return
                        }
                        // Grab the right to wake someone.
                        new = (old - 1<<mutexWaiterShift) | mutexWoken
                        if atomic.CompareAndSwapInt32(&m.state, old, new) {
                                runtime_Semrelease(&m.sema, false)
                                return
                        }
                        old = m.state
                }
        } else {
            // 饥饿模式
                runtime_Semrelease(&m.sema, true)
        }
}

关于“Golang锁原理如何实现”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Golang锁原理如何实现”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. Java Synchronized 锁的实现原理与应用 (偏向锁,轻量锁,重量锁)
  2. 了解mysql锁实现机制原理

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

golang

上一篇:mybatis一对多嵌套查询怎么实现

下一篇:python项目如何打包成exe和安装包

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》