从计算机底层深入Golang高并发
1.源码流程架构图
2.源码解读
runtime/proc.go下的newpro()
func newproc(fn *funcval) {//计算额外参数的地址argpgp := getg()pc := getcallerpc()//s1使用systemstack调用newproc1 systemstack(func() {newg := newproc1(fn, gp, pc)_p_ := getg().m.p.ptr()//s1将放到运行队列 runqput(_p_, newg, true)//s1 主是否启动,是否唤醒if mainStarted {wakep()}})
}func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {//调用getg获取当前的g,会编译为讯取FS寄存器(TLS),这里会获到g_g_ := getg()if fn == nil {_g_.m.throwing = -1 // do not dump full stacksthrow("go of nil func value")}//禁用抢咪,因为它可以在本地var中保存p,进入可见 设g对应的m的locks++acquirem() // disable preemption because it can be holding p in a local var//获取m拥有的p_p_ := _g_.m.p.ptr()//新建一个gnewg := gfget(_p_)if newg == nil {newg = malg(_StackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}if newg.stack.hi == 0 {throw("newproc1: newg missing stack")}if readgstatus(newg) != _Gdead {throw("newproc1: new g is not Gdead")}totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frametotalSize = alignUp(totalSize, sys.StackAlign)sp := newg.stack.hi - totalSizespArg := spif usesLR {// caller's LR*(*uintptr)(unsafe.Pointer(sp)) = 0prepGoExitFrame(sp)spArg += sys.MinFrameSize}//设置g的调度memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)newg.gopc = callerpcnewg.ancestors = saveAncestors(callergp)newg.startpc = fn.fnif isSystemGoroutine(newg, false) {atomic.Xadd(&sched.ngsys, +1)} else {// Only user goroutines inherit pprof labels.if _g_.m.curg != nil {newg.labels = _g_.m.curg.labels}}// Track initial transition?newg.trackingSeq = uint8(fastrand())if newg.trackingSeq%gTrackingPeriod == 0 {newg.tracking = true}//设置g的状态为待运行casgstatus(newg, _Gdead, _Grunnable)gcController.addScannableStack(_p_, int64(newg.stack.hi-newg.stack.lo))if _p_.goidcache == _p_.goidcacheend {// Sched.goidgen is the last allocated id,// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].// At startup sched.goidgen=0, so main goroutine receives goid=1._p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)_p_.goidcache -= _GoidCacheBatch - 1_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch}newg.goid = int64(_p_.goidcache)_p_.goidcache++if raceenabled {newg.racectx = racegostart(callerpc)}if trace.enabled {traceGoCreate(newg, newg.startpc)}releasem(_g_.m)return newg
}func main() {g := getg() //获取g// Racectx of m0->g0 is used only as the parent of the main goroutine.// It must not be used for anything else.g.m.g0.racectx = 0// Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.// Using decimal instead of binary GB and MB because// they look nicer in the stack overflow failure message.if goarch.PtrSize == 8 {maxstacksize = 1000000000} else {maxstacksize = 250000000}// An upper limit for max stack size. Used to avoid random crashes// after calling SetMaxStack and trying to allocate a stack that is too big,// since stackalloc works with 32-bit sizes.maxstackceiling = 2 * maxstacksize// 标记主函数已调用.mainStarted = true//判断是否已就绪if GOARCH != "wasm" { // no threads on wasm yet, so no sysmonsystemstack(func() {newm(sysmon, nil, -1)})}// Lock the main goroutine onto this, the main OS thread,// during initialization. Most programs won't care, but a few// do require certain calls to be made by the main thread.// Those can arrange for main.main to run in the main thread// by calling runtime.LockOSThread during initialization// to preserve the lock.lockOSThread()if g.m != &m0 {throw("runtime.main not on m0")}// Record when the world started.// Must be before doInit for tracing init.runtimeInitTime = nanotime()if runtimeInitTime == 0 {throw("nanotime returning zero")}if debug.inittrace != 0 {inittrace.id = getg().goidinittrace.active = true}//初始化doInit(&runtime_inittask) // Must be before defer.// Defer unlock so that runtime.Goexit during init does the unlock too.needUnlock := truedefer func() {if needUnlock {unlockOSThread()}}()gcenable()main_init_done = make(chan bool)if iscgo {if _cgo_thread_start == nil {throw("_cgo_thread_start missing")}if GOOS != "windows" {if _cgo_setenv == nil {throw("_cgo_setenv missing")}if _cgo_unsetenv == nil {throw("_cgo_unsetenv missing")}}if _cgo_notify_runtime_init_done == nil {throw("_cgo_notify_runtime_init_done missing")}// Start the template thread in case we enter Go from// a C-created thread and need to create a new thread.startTemplateThread()cgocall(_cgo_notify_runtime_init_done, nil)}doInit(&main_inittask)// Disable init tracing after main init done to avoid overhead// of collecting statistics in malloc and newprocinittrace.active = falseclose(main_init_done)needUnlock = falseunlockOSThread()if isarchive || islibrary {// A program compiled with -buildmode=c-archive or c-shared// has a main, but it is not executed.return}//进行间接调用fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtimefn()if raceenabled {racefini()}// Make racy client program work: if panicking on// another goroutine at the same time as main returns,// let the other goroutine finish printing the panic trace.// Once it does, it will exit. See issues 3934 and 20018.if atomic.Load(&runningPanicDefers) != 0 {// Running deferred functions should not take long.for c := 0; c < 1000; c++ {if atomic.Load(&runningPanicDefers) == 0 {break}Gosched()}}if atomic.Load(&panicking) != 0 {gopark(nil, nil, waitReasonPanicWait, traceEvGoStop, 1)}exit(0)for {var x *int32*x = 0}
}func sysmon() {lock(&sched.lock)sched.nmsys++checkdead()unlock(&sched.lock)lasttrace := int64(0)idle := 0 // how many cycles in succession we had not wokeup somebodydelay := uint32(0)for {if idle == 0 { // start with 20us sleep...delay = 20} else if idle > 50 { // start doubling the sleep after 1ms...delay *= 2}if delay > 10*1000 { // up to 10msdelay = 10 * 1000}usleep(delay)// sysmon should not enter deep sleep if schedtrace is enabled so that// it can print that information at the right time.//// It should also not enter deep sleep if there are any active P's so// that it can retake P's from syscalls, preempt long running G's, and// poll the network if all P's are busy for long stretches.//// It should wakeup from deep sleep if any P's become active either due// to exiting a syscall or waking up due to a timer expiring so that it// can resume performing those duties. If it wakes from a syscall it// resets idle and delay as a bet that since it had retaken a P from a// syscall before, it may need to do it again shortly after the// application starts work again. It does not reset idle when waking// from a timer to avoid adding system load to applications that spend// most of their time sleeping.now := nanotime()if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {lock(&sched.lock)if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {syscallWake := falsenext, _ := timeSleepUntil()if next > now {atomic.Store(&sched.sysmonwait, 1)unlock(&sched.lock)// Make wake-up period small enough// for the sampling to be correct.sleep := forcegcperiod / 2if next-now < sleep {sleep = next - now}shouldRelax := sleep >= osRelaxMinNSif shouldRelax {osRelax(true)}syscallWake = notetsleep(&sched.sysmonnote, sleep)if shouldRelax {osRelax(false)}lock(&sched.lock)atomic.Store(&sched.sysmonwait, 0)noteclear(&sched.sysmonnote)}if syscallWake {idle = 0delay = 20}}unlock(&sched.lock)}lock(&sched.sysmonlock)// Update now in case we blocked on sysmonnote or spent a long time// blocked on schedlock or sysmonlock above.now = nanotime()// trigger libc interceptors if neededif *cgo_yield != nil {asmcgocall(*cgo_yield, nil)}// poll network if not polled for more than 10mslastpoll := int64(atomic.Load64(&sched.lastpoll))if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))list := netpoll(0) // non-blocking - returns list of goroutinesif !list.empty() {// Need to decrement number of idle locked M's// (pretending that one more is running) before injectglist.// Otherwise it can lead to the following situation:// injectglist grabs all P's but before it starts M's to run the P's,// another M returns from syscall, finishes running its G,// observes that there is no work to do and no other running M's// and reports deadlock.incidlelocked(-1)injectglist(&list)incidlelocked(1)}}if GOOS == "netbsd" && needSysmonWorkaround {// netpoll is responsible for waiting for timer// expiration, so we typically don't have to worry// about starting an M to service timers. (Note that// sleep for timeSleepUntil above simply ensures sysmon// starts running again when that timer expiration may// cause Go code to run again).//// However, netbsd has a kernel bug that sometimes// misses netpollBreak wake-ups, which can lead to// unbounded delays servicing timers. If we detect this// overrun, then startm to get something to handle the// timer.//// See issue 42515 and// https://gnats.netbsd.org/cgi-bin/query-pr-single.pl?number=50094.if next, _ := timeSleepUntil(); next < now {startm(nil, false)}}if atomic.Load(&scavenge.sysmonWake) != 0 {// Kick the scavenger awake if someone requested it.wakeScavenger()}// S1重新获取系统调用中阻塞的P,点长时间运行的Gif retake(now) != 0 {idle = 0} else {idle++}// check if we need to force a GCif t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {lock(&forcegc.lock)forcegc.idle = 0var list gListlist.push(forcegc.g)injectglist(&list)unlock(&forcegc.lock)}if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {lasttrace = nowschedtrace(debug.scheddetail > 0)}unlock(&sched.sysmonlock)}
}func retake(now int64) uint32 {n := 0// Prevent allp slice changes. This lock will be completely// uncontended unless we're already stopping the world.lock(&allpLock)// We can't use a range loop over allp because we may// temporarily drop the allpLock. Hence, we need to re-fetch// allp each time around the loop.for i := 0; i < len(allp); i++ {_p_ := allp[i]if _p_ == nil {// This can happen if procresize has grown// allp but not yet created new Ps.continue}pd := &_p_.sysmonticks := _p_.statussysretake := falseif s == _Prunning || s == _Psyscall {// Preempt G if it's running for too long.t := int64(_p_.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen+forcePreemptNS <= now {preemptone(_p_)// In case of syscall, preemptone() doesn't// work, because there is no M wired to P.sysretake = true}}//如果P在系统中调用( _Psyscall),且经历过了sysmon循环(20us-10ms),则抢占这个Pif s == _Psyscall {// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).t := int64(_p_.syscalltick)if !sysretake && int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}// On the one hand we don't want to retake Ps if there is no other work to do,// but on the other hand we want to retake them eventually// because they can prevent the sysmon thread from deep sleep.//如果当前P,Local队列没有其它G,当前有其它G处理Idle状态,并且syscall执行事件不超过10ms,则不用解绑当前Pif runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {continue}// Drop allpLock so we can take sched.lock.unlock(&allpLock)// Need to decrement number of idle locked M's// (pretending that one more is running) before the CAS.// Otherwise the M from which we retake can exit the syscall,// increment nmidle and report deadlock.incidlelocked(-1)if atomic.Cas(&_p_.status, s, _Pidle) {if trace.enabled {traceGoSysBlock(_p_)traceProcStop(_p_)}n++_p_.syscalltick++handoffp(_p_)}incidlelocked(1)lock(&allpLock)}}unlock(&allpLock)return uint32(n)
}func startm(_p_ *p, spinning bool) {// Disable preemption.//// Every owned P must have an owner that will eventually stop it in the// event of a GC stop request. startm takes transient ownership of a P// (either from argument or pidleget below) and transfers ownership to// a started M, which will be responsible for performing the stop.//// Preemption must be disabled during this transient ownership,// otherwise the P this is running on may enter GC stop while still// holding the transient P, leaving that P in limbo and deadlocking the// STW.//// Callers passing a non-nil P must already be in non-preemptible// context, otherwise such preemption could occur on function entry to// startm. Callers passing a nil P may be preemptible, so we must// disable preemption before acquiring a P from pidleget below.mp := acquirem()lock(&sched.lock)if _p_ == nil { //从"空闲P链表"获取一个空间的P_p_ = pidleget()if _p_ == nil {unlock(&sched.lock)if spinning {// The caller incremented nmspinning, but there are no idle Ps,// so it's okay to just undo the increment and give up.if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {throw("startm: negative nmspinning")}}releasem(mp)return}} //nmp := mget() //从"空闲m链表"获取一个空间的m//如果没有空闲的m,则会创建一个if nmp == nil {// No M is available, we must drop sched.lock and call newm.// However, we already own a P to assign to the M.//// Once sched.lock is released, another G (e.g., in a syscall),// could find no idle P while checkdead finds a runnable G but// no running M's because this new M hasn't started yet, thus// throwing in an apparent deadlock.//// Avoid this situation by pre-allocating the ID for the new M,// thus marking it as 'running' before we drop sched.lock. This// new M will eventually run the scheduler to execute any// queued G's.id := mReserveID()unlock(&sched.lock)var fn func()if spinning {// The caller incremented nmspinning, so set m.spinning in the new M.fn = mspinning}//会新建一个的m实例,m的实例包含一个go,然后调用newsproc动一个系统线程newm(fn, _p_, id)// Ownership transfer of _p_ committed by start in newm.// Preemption is now safe.releasem(mp)return}unlock(&sched.lock)if nmp.spinning {throw("startm: m is spinning")}if nmp.nextp != 0 {throw("startm: m has p")}if spinning && !runqempty(_p_) {throw("startm: p has runnable gs")}// The caller incremented nmspinning, so set m.spinning in the new M.nmp.spinning = spinningnmp.nextp.set(_p_)notewakeup(&nmp.park)// Ownership transfer of _p_ committed by wakeup. Preemption is now// safe.releasem(mp)
}