条件锁
type Cond struct {L Lockernotify notifyList
}
type notifyList struct {wait uint32 //表示当前 Wait 的最大 ticket 值notify uint32 //表示目前已唤醒的 goroutine 的 ticket 的最大值lock uintptr // key field of the mutexhead unsafe.Pointer //链表头tail unsafe.Pointer //链表尾
}
相关函数
func (c *Cond) Wait() {// 获取当前ticket,即wait的值,实际是atomic.Xadd(&l.wait, 1) - 1,这样wait原子加1,t := runtime_notifyListAdd(&c.notify)// 注意这里,必须先解锁,因为 runtime_notifyListWait 要切走 goroutine// 所以这里要解锁,要不然其他 goroutine 没法获取到锁了//功能是将当前 goroutine 插入到 notifyList 链表c.L.Unlock()runtime_notifyListWait(&c.notify, t)// 这里已经唤醒了,因此需要再度锁上c.L.Lock()
}
func (c *Cond) Signal() {runtime_notifyListNotifyOne(&c.notify)
}
//notifyList 是一个链表结构,我们为何不直接取链表最头部唤醒呢
//因为 notifyList 会有乱序的可能,获取 ticket 和加入 notifyList
//是两个独立的行为,并发操作有可能的乱序,大部分场景下比较一两次之后就会很快停止
func notifyListNotifyOne(l *notifyList) {***for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {if s.ticket == t {n := s.nextif p != nil {p.next = n} else {l.head = n}if n == nil {l.tail = p}s.next = nilreturn}}***
}
func (c *Cond) Broadcast() {runtime_notifyListNotifyAll(&c.notify)
}
//唤醒所有等待的goruntine
fuc notifyListNotifyAll(l *notifyList) {// Go through the local list and ready all waiters.for s != nil {next := s.nexts.next = nilreadyWithTime(s, 4)s = next}
}
互斥锁 mutex
自旋:新goroutine或被唤醒的goroutine首次获取不到锁,就会自旋(spin,通过循环不断尝试,在runtime实现的)的方式尝试检查锁,不需要上下文切换,提供性能。
饥饿模式:高并发情况下,新来的g一直自旋,waiter可能悲剧地获取不到锁。故waiter获取不到锁的时间超过阈值1毫秒,它会被插入到队列的前面,这个Mutex进入饥饿模式。
在饥饿模式下,Mutex的拥有者将直接把锁交给队列最前面的waiter。新的g不会尝试获取锁,不抢也不spin,乖乖地加入到等待队列的尾部。
如果拥有Mutex的waiter发现下面两种情况的其中之一,它就会把这个Mutex转换成正常模式:
- 此waiter已经是队列中的最后一个waiter了,没有其它的等待锁的goroutine了;
- 此waiter的等待时间小于1毫秒。
// A Locker represents an object that can be locked and unlocked.
type Locker interface { //锁接口Lock()Unlock()
}
type Mutex struct {//互斥锁实例state int32 //状态sema uint32 //信号量
}
const (mutexLocked = 1 << iota //0x0000 0001mutexWoken //0x0000 0010mutexStarving //0x0000 0100mutexWaiterShift = iota 3 //等待者数量的标志,最多可以阻塞2^29个goroutine。starvationThresholdNs = 1e6
)
mutexLocked代表锁的状态,如果为1代表已加锁
mutexWoken代表是否唤醒,如果为 1代表已唤醒
mutexStarving代表是否处于饥饿模式,如果为1 代表是
mutexWaiterShift 值是3,有人会问这里为什么不左移,而是直接是3,这是因为3这个会有1<<mutexWaiterShift代表1个waiter数量,也有右移代表获取mutex上面的waiter数量。
starvationThresholdNs代表判断饥饿模式的时间,如果等待时间超过这个时间就判断为饥饿
func (m *Mutex) Lock() {// Slow path (outlined so that the fast path can be inlined)m.lockSlow() //补贴源码,看图了
}
解锁,别贴代码,上图
读写锁
type RWMutex struct {w Mutex writerSem uint32 // 用于writer等待,wait读完成排队的信号量readerSem uint32 // 用于reader等待,wait写完成排队的信号量readerCount atomic.Int32 // 读锁的计数器readerWait atomic.Int32 // 有写锁时,等待读锁释放的数量
}
const rwmutexMaxReaders = 1 << 30 //go支持的最高加读锁的数量
func (rw *RWMutex) RLock() {// reader计数器+1// 如果小于0则表明有写锁在占有if rw.readerCount.Add(1) < 0 {// 等待写锁释放,runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)}
}
func (rw *RWMutex) RUnlock() {// reader计数器-1,// 如果小于0,说明有读锁在等待if r := rw.readerCount.Add(-1); r < 0 {rw.rUnlockSlow(r)}
}
func (rw *RWMutex) rUnlockSlow(r int32) {// 等待的读书数量-1,等于0说明已经没有等待的读锁if rw.readerWait.Add(-1) == 0 {// 最后一名读锁了,唤醒写锁runtime_Semrelease(&rw.writerSem, false, 1)}
}
func (rw *RWMutex) Lock() {// 与写锁竞争,获取互斥锁rw.w.Lock()// 把readerCount- rwmutexMaxReaders,表示有写锁r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders// 等待读锁数量readerwait+readerCount的数量,等待读锁完成.if r != 0 && rw.readerWait.Add(r) != 0 {//等待写锁信号量runtime_SemacquireRWMutex(&rw.writerSem, false, 0)}
}
func (rw *RWMutex) Unlock() {// 告诉读锁,写锁已完成,可以读了.r := rw.readerCount.Add(rwmutexMaxReaders)//唤醒所有读锁for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}// 允许其他写锁获取锁rw.w.Unlock()
}