本文围绕 Go 语言中 sync 包展开,对其各类同步原语的使用方法进行介绍。
sync.Mutex
Mutex用于实现互斥锁,用于保护多个 goroutine 并发访问的共享资源。它可以防止数据竞争,确保只有一个 goroutine 能访问临界区代码。
结构
type Mutex struct {state int32sema uint32
}
方法
- Lock() 获取锁
- TryLock() 尝试获取锁
- Unlock() 释放锁
sync.RWMutex
RWMutex是读写互斥锁,允许同时多个读操作或一次写操作(写操作为独占锁,不允许同时有其他读写操作)。
结构
type RWMutex struct {w Mutex // held if there are pending writerswriterSem uint32 // semaphore for writers to wait for completing readersreaderSem uint32 // semaphore for readers to wait for completing writersreaderCount atomic.Int32 // number of pending readersreaderWait atomic.Int32 // number of departing readers
}
方法
- RLock() 获取读锁
- RUnlock() 释放读锁
- Lock() 获取写锁
- Unlock() 释放写锁
使用示例
// Cache 定义一个简单的缓存结构体
type Cache struct {data map[string]string// 使用读写锁来保护缓存数据rwMutex sync.RWMutex
}// Get 从缓存中获取数据
func (c *Cache) Get(key string) string {// 加读锁,允许多个读操作同时进行c.rwMutex.RLock()// 函数返回时自动释放读锁defer c.rwMutex.RUnlock()// 从缓存中获取数据return c.data[key]
}func (c *Cache) Create(book *Object) error {ms.Lock()defer ms.Unlock()//中间省略...return nil
}
sync.Once
Once用于确保某个函数只被执行一次。提供Do方法,接收一个无参数、无返回值的函数f,并保证无论有多少个gorountine同时调用Do方法,函数f都只会被调用一次。适宜单例模式、资源初始化等场景。
方法
- func (o *Once) Do(f func()) 方法接收一个无参数、无返回值的函数 f 作为参数,该方法会确保 f 只被执行一次。
代码实现
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do://// if o.done.CompareAndSwap(0, 1) {// f()// }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the o.done.Store must be delayed until after f returns.if o.done.Load() == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}
}func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done.Load() == 0 {defer o.done.Store(1)f()}
}
使用示例
package mainimport ("fmt""sync"
)var (dbConnection stringonce sync.Once
)func initDB() {dbConnection = "Connected to database"fmt.Println("Database connection is initialized.")
}func GetDBConnection() string {once.Do(initDB)return dbConnection
}func main() {var wg sync.WaitGroupfor i := 0; i < 4; i++ {wg.Add(1)go func() {defer wg.Done()conn := GetDBConnection()fmt.Println(conn)}()}wg.Wait()
}
sync.Oncefunc
sync.OnceFunc 允许你将一个普通的 func() 变成仅执行一次的函数,而不用显式地使用 sync.Once 结构体。
方法
- func OnceFunc(f func()) func() 传入一个 func(),返回一个新的 func(),这个返回的函数只会执行一次,无论被调用多少次
- func OnceValue[T any](f func() T) func() T sync.OnceValue 用于包装一个计算一次的函数,并返回一个单个值。
- func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2) sync.OnceValues 和 sync.OnceValue 类似,但支持返回多个值(包括 error)
代码示例
OnceFunc
package mainimport ("fmt""sync"
)func main() {// 只执行一次,返回一个值onceGetID := sync.OnceValue(func() int {fmt.Println("Generating ID...")return 42})fmt.Println(onceGetID()) // 生成 IDfmt.Println(onceGetID()) // 直接返回缓存的值
}
OnceValue
package mainimport ("fmt""sync"
)func main() {// 只执行一次,返回两个值onceFetchData := sync.OnceValues(func() (string, error) {fmt.Println("Fetching data from database...")return "User Data", nil})data, err := onceFetchData()fmt.Println(data, err) // 第一次调用,执行函数data, err = onceFetchData()fmt.Println(data, err) // 第二次调用,直接返回缓存值
}
sync.WaitGroup
用于等待一组goroutine完成它们的任务。
方法
- Add(delta int) 用于设置或修改等待的 goroutine 数量,delta 可以是正数、负数或零
- Done goroutine完成任务后,调用Done方法,内部执行了wg.Add(-1)
- Wait 阻塞当前的 goroutine,直到计数器的值变为 0
代码示例
package mainimport ("context""fmt""sync""time"
)var wg sync.WaitGroupfunc workerWithTimeout(ctx context.Context, duration time.Duration) {// 从上下文中获取值if v := ctx.Value("language"); v != nil {fmt.Printf("context language value : %v\n", v)} else {fmt.Printf("no language key found in context\n")}select {case <-time.After(duration):fmt.Printf("任务执行完成\n")case <-ctx.Done():fmt.Printf("任务执行超时被取消,%v\n", ctx.Err())}wg.Done()
}func main() {ctx := context.Background()//设置超时时间5sctx, cancel := context.WithTimeout(ctx, 5*time.Second)defer cancel()// 创建一个带有值的上下文ctx = context.WithValue(ctx, "language", "Go")wg.Add(1)go workerWithTimeout(ctx, 6*time.Second)// 等待子任务完成wg.Wait()
}
sync.Map
sync.Map是并发安全的 map,它可以在 高并发环境下安全地存取键值对,适用于读多写少的共享数据(如果读多写多场景下,sync.Map反而是劣势,可以直接用传统Map+sync.Mutex),而不需要 sync.Mutex 加锁。
方法
- Store(key, value) 存储键值对
- Load(key) (value, bool) 获取键对应的值,返回 bool 表示是否存在
- Delete(key) 删除键
- LoadOrStore(key, value) (actual, loaded bool) 若 key 存在,则返回原值,loaded=true;否则存入新值,loaded=false
- Range(func(key, value interface{}) bool) 遍历 sync.Map,回调函数返回 false 时停止遍历
内部实现原理
sync.Map 内部使用了两个数据结构:read 和 dirty,以及一个 misses 计数器:
- read:是一个 atomic.Value 类型,存储的是一个只读的映射,允许并发的无锁读操作。大多数读操作可以直接在 read 中完成,无需加锁,因此读操作的性能较高。
- dirty:是一个普通的 map,存储了一些可能未同步到 read 中的键值对。写操作会先作用于 dirty,当 misses 计数器达到一定阈值时,会将 dirty 提升为 read,并清空 dirty。
- misses:计数器用于记录从 read 中读取键值对失败的次数,当 misses 达到 dirty 的长度时,会触发 dirty 提升为 read 的操作。
type Map struct {mu Mutex// read contains the portion of the map's contents that are safe for// concurrent access (with or without mu held).//// The read field itself is always safe to load, but must only be stored with// mu held.//// Entries stored in read may be updated concurrently without mu, but updating// a previously-expunged entry requires that the entry be copied to the dirty// map and unexpunged with mu held.read atomic.Pointer[readOnly]// dirty contains the portion of the map's contents that require mu to be// held. To ensure that the dirty map can be promoted to the read map quickly,// it also includes all of the non-expunged entries in the read map.//// Expunged entries are not stored in the dirty map. An expunged entry in the// clean map must be unexpunged and added to the dirty map before a new value// can be stored to it.//// If the dirty map is nil, the next write to the map will initialize it by// making a shallow copy of the clean map, omitting stale entries.dirty map[any]*entry// misses counts the number of loads since the read map was last updated that// needed to lock mu to determine whether the key was present.//// Once enough misses have occurred to cover the cost of copying the dirty// map, the dirty map will be promoted to the read map (in the unamended// state) and the next store to the map will make a new dirty copy.misses int
}func (m *Map) Load(key any) (value any, ok bool) {read := m.loadReadOnly()e, ok := read.m[key]if !ok && read.amended {m.mu.Lock()//双重检查// Avoid reporting a spurious miss if m.dirty got promoted while we were// blocked on m.mu. (If further loads of the same key will not miss, it's// not worth copying the dirty map for this key.)read = m.loadReadOnly()e, ok = read.m[key]//如果read读取不到,才从dirty读数据if !ok && read.amended {e, ok = m.dirty[key]// Regardless of whether the entry was present, record a miss: this key// will take the slow path until the dirty map is promoted to the read// map.m.missLocked()}m.mu.Unlock()}if !ok {return nil, false}return e.load()
}func (m *Map) missLocked() {m.misses++//misses达到dirty长度时,将dirty数据推到readif m.misses < len(m.dirty) {return}m.read.Store(&readOnly{m: m.dirty})m.dirty = nilm.misses = 0
}
使用示例
package mainimport ("fmt""sync"
)func main() {var sm sync.Map// 存储值sm.Store("name", "Alice")sm.Store("age", 25)// 读取值if value, ok := sm.Load("name"); ok {fmt.Println("Name:", value)}// 删除键sm.Delete("age")// 再次读取(已删除的键)if _, ok := sm.Load("age"); !ok {fmt.Println("Key 'age' not found")}
}
sync.Cond
sync.Cond 是 Go 语言中的一个同步原语,用于在多线程环境中进行条件变量的通知和等待。它通常和 sync.Mutex 或 sync.RWMutex 一起使用,用来在某些条件下暂停和唤醒 goroutine。多数情况下首选channels而不是Cond,channel 更简单和高效。在复杂的条件等待和精细的控制,比如多个条件需要同时满足时,sync.Cond 提供了更多的灵活性。
结构
type Cond struct {noCopy noCopy// L is held while observing or changing the conditionL Lockernotify notifyListchecker copyChecker
}
方法
- NewCond(l Locker) *Cond 初始化一个条件变量,并传入一个 sync.Mutex(这个锁用于保护共享资源)
- Wait() 使当前 goroutine 进入等待状态,会自动释放关联的锁,并阻塞当前 goroutine,直到被其他 goroutine 调用 Signal() 或 Broadcast() 方法唤醒。
- Signal() 唤醒一个等待在该条件变量上的 goroutine。如果有多个 goroutine 在等待,只会唤醒其中一个。
- Broadcast() 唤醒所有等待在该条件变量上的 goroutine。
代码示例
package mainimport ("fmt""sync"
)var cond = sync.NewCond(&sync.Mutex{})
var counter = 0func worker(id int) {cond.L.Lock() // 锁定条件变量defer cond.L.Unlock()for counter < 5 {// 需要等待的条件cond.Wait()}fmt.Printf("Worker %d: counter reached 5\n", id)
}func main() {// 启动多个工作 goroutinefor i := 1; i <= 3; i++ {go worker(i)}// 主 goroutine 更新 counter 的值cond.L.Lock()counter = 5cond.L.Unlock()// 唤醒所有等待中的 goroutinecond.Broadcast()// 等待所有 goroutine 完成fmt.Scanln()
}
sync.Pool
sync.Pool 用于对象的缓存和复用,其核心目的是减少内存分配和垃圾回收的压力,提高程序的性能。
- 存储对象:sync.Pool 维护一组临时对象,供多个 goroutine 共享。
- 自动回收:GC 运行时,sync.Pool 可能会清空内部缓存的对象。
- 多 goroutine 安全:sync.Pool 是并发安全的,可用于高并发场景。
方法
- New 字段:这是一个函数类型的字段,用于创建新的对象。当池中没有可用的对象时,会调用这个函数来创建一个新的对象。
- Get() interface{}:从池中获取一个对象。如果池中存在可用的对象,则返回该对象;如果池中没有可用的对象,则调用 New 字段指定的函数创建一个新的对象并返回。
- Put(x interface{}):将一个对象放回池中,以便后续复用。
代码示例
package mainimport ("fmt""sync"
)var bufferPool = sync.Pool{New: func() any {fmt.Println("Allocating new buffer...")return make([]byte, 1024) // 1KB 缓冲区},
}func main() {// 获取缓冲区buf := bufferPool.Get().([]byte)fmt.Println("Buffer size:", len(buf))// 归还缓冲区bufferPool.Put(buf)// 再次获取buf2 := bufferPool.Get().([]byte) // 复用缓存的 buffmt.Println("Buffer reused:", len(buf2))
}