上海网站建设公司网站建设,邢台信息港人力资源,网站建设图片手机,软件开发工具的作用GMP模型 channel1. 前言2. GMP模型2.1. 基本概念2.2. 调度器策略2.3. go指令的调度流程2.4. go启动周期的M0和G02.5. GMP可视化2.6. GMP的几种调度场景3. channel3.1. channel的基本使用3.2. 同步器1. 前言
Go中的并发是函数相互独立运行的体现#xff0c;Gorouti…
GMP模型 channel1. 前言2. GMP模型2.1. 基本概念2.2. 调度器策略2.3. go指令的调度流程2.4. go启动周期的M0和G02.5. GMP可视化2.6. GMP的几种调度场景3. channel3.1. channel的基本使用3.2. 同步器1. 前言
Go中的并发是函数相互独立运行的体现Goroutines是并发运行的函数。
并发多线程程序在单核CPU上执行每个线程使用时间片轮转执行间隔是ms级别的并行 多线程程序在多核CPU上执行每个cpu上都执行一个线程同一时刻有多个线程执行go主线程是一个物理线程(内核态)可以发起多个协程goroutine协程是一个轻量级线程(逻辑态)goroutine的特点有独立的栈空间共享程序堆空间调度由用户控制创建一个协程go 任务函数
2. GMP模型
2.1. 基本概念
M代表内核线程记录内核线程栈信息当goroutine调度到线程时使用该goroutine自己的栈信息
P调度器processor负责调度goroutine维护一个本地goroutine队列主线程从调度器上获得goroutine并执行同时还负责部分内存的管理。
G表示goroutine每个goroutine都有自己的栈空间定时器初始化的栈空间在2k左右空间会随着需求增长
M代表一个工作线程在M上有一个P和GP是绑定到M上的G是通过P的调度获取的在某一时刻一个M上只有一个Gg0除外在P上拥有一个G队列里面是已经就绪的G是可以被调度到线程栈上执行的协程称为运行队列
全局队列存放等待运行的G
P的本地队列优先将新创建的G存入到P的本地队列如果本地队列已满则存入到全局队列
P列表程序启动时创建P的最大个数GOMAXPROCS
M列表当前OS分配到go程序的内核线程数
2.2. 调度器策略
复用线程 work stealing机制当本线程无可运行的G时尝试从其他线程绑定的调度器中偷取协程而不是销毁线程hand off机制当本地线程由于G发生阻塞时线程释放绑定的P将P转给其他空闲的M线程来执行 并行 GOMAXPROCS设置P的数量最多有GOMAXPROCS个线程分布在多个CPU上同时运行抢占 go中的协程最多只能占用CPU 10ms防止其他协程处于饥饿状态全局G队列 当M线程执行work stealing机制从其他调度器P中获取不到G时可以从全局协程队列中获取
2.3. go指令的调度流程
通过 go func() 创建一个goroutine有两个存储G的队列一个是局部调度器P的本地协程队列一个是全局协程队列。新创建的G保存在P的本地队列中如果P的本地队列已满则保存在全局队列中G只能运行在M中一个M必须持有一个P。M从本地协程队列中选取一个可执行的G来执行如果本地队列为空则采用stealing机制从其他协程队列中获取一个G来执行一个M调度G的执行过程其实是一个循环的过程 当M执行某一个G时发生syscall系统调用或者其他阻塞操作M发生阻塞。如果当前一些G正在执行runtime会将这个线程M从P中移除然后创建一个新的内核线程M来执行G如果有空闲的内核线程可进行复用当M系统调用结束时此时G会尝试获取一个空闲的P执行放入到P的本地队列中。如果获取不到调度器P那么线程M则进行休眠加入到空闲线程队列中协程G则放入到全局协程队列中
2.4. go启动周期的M0和G0
M0
启动程序后编号为0的主线程在全局变量runtime.m0中不需要在heap上分配负责执行初始化操作和启动第一个协程G启动第一个协程之后M0与其他主线程相同
G0
每次启动一个M第一个创建的goroutine即为G0存放在全局空间G0仅仅用于调度不指向任何可执行的函数每个M都会绑定一个属于自己的G0在syscall时会将M切换到G0再调度
2.5. GMP可视化
创建trace文件启动trace执行业务代码停止tracego run运行后得到trace.out文件通过go tool trace trace文件名 可视化查看GMP
func main() {f, err : os.Create(trace.out)if err ! nil {panic(err)}defer f.Close()err trace.Start(f)if err ! nil {panic(err)}fmt.Println(业务逻辑...)fmt.Printf(GMP Model Trace test)trace.Stop()
}2.6. GMP的几种调度场景 G1创建G2P拥有G1M获取P开始运行G1G1使用go func() 创建了G2为了局部性G2优先加入到G1所在的本地协程队列 G1执行完毕当G1调用goexit()退出时M切换到所绑定的协程为G0由G0负责调度本地协程队列中的G2交给M执行 G2执行过程中开辟过多的协程如果G2运行时需要创建6个协程本地队列只能存放四个G3-G6在创建G7时需要将本地协程队列的前两个协程与G7协程同时放入到全局协程队列中此时本地协程队列还有一半空间可以直接创建G8协程 唤醒休眠队列的线程在创建G时运行的G会尝试唤醒其他空闲的调度器与内核线程组合进行绑定。假定G2唤醒了线程M2M2绑定了P2并且运行了G0但P2的本地协程队列没有协程空队列此时M2为自旋线程 自旋线程M2从全局协程队列中批量获取n个G。其中GQ为全局协程队列的sizeGOMAXPROCS为当前调度器个数。可以看作是从全局协程队列到本地协程队列的一种负载均衡策略 n min(len(GQ)/GOMAXPROCS1, len(GQ/2)) 如果全局协程队列为空自旋线程M2会执行work stealing机制从其他调度器P的本地队列中获取一半协程G到M2的本地队列 自旋线程个数 执行线程个数 ≤ GOMAXPROCS
3. channel
3.1. channel的基本使用
为什么需要channel?
1.主线程在等待所有goroutine全部完成的时间很难确定
2.通过全局变量加锁同步实现通讯不利于多个协程对全局变量的读写操作
channel特点
通道用于在goroutines之间共享数据保证同步交换。channel需要指定数据类型数据在channel上传递任何时刻只有一个goroutine可以访问数据项保证线程同步。channel底层是一个队列线程安全的多个协程并发访问时不需要加锁channel是有类型的一个string的channel只能存放string类型
channel声明和使用
var intChan chan int
chanMap : make(chan map[string]string, 10)
var chan1 chan Person
var chan2 chan *PersonintChan - 10 //写入数据到channel
num : - intChan // 读取channel的数据channel是引用类型必须**初始化(make)**才能使用。channel不能进行扩容在没有使用协程的情况下如果channel数据已取完再取则直接报错 dead lock error
channel的接收特性
读和写操作对元素值的处理必须是原子性的对于同一个channel同时写和读即使写入速度快于读取速度依旧不会造成阻塞dead lock
channel关闭和遍历
使用内置函数close关闭channel当channel关闭后不能再写数据只能读取for-range遍历时如果出现channel没有关闭则出现dead lock
只写channel和只读channel
channel可以声明为只读或者只写默认是可读可写的
var intchan chan - int // 只写channel
var intchan -chan int // 只读channel3.2. 同步器
WaitGroup实现同步
由于主线程一旦执行完毕无论goroutines是否执行完整个程序都会结束。因此需要一种同步机制来协调主线程和协程之间的执行顺序
WaitGroup类似于JUC中的CountDownLatch
WaitGroup.Done() 表示已经完成了一个任务等价于WaitGroup.Add(-1)WaitGroup.Add(1) 表示增加一个任务到协程队列计数器1主线程中使用WaitGroup.Wait()运行到这步会发生阻塞直到WaitGroup中的计数器为0时才能继续向下执行
var wg sync.WaitGroupfunc main(){for i : 0; i 10; i {go show(i)wg.Add(1)}wg.Wait() // 等价于countDownLatch.await();fmt.Println([main] continue...)
}func task(i int) {// defer wg.Add(-1)defer wg.Done() // 等价于countDownLatch.countDown();fmt.Printf([goroutine] 当前i%d\n, i)
}runtime包
runtime.Gosched() 让出CPU时间片重新等待安排任务
func printMsg(msg string) {for i : 0; i 5; i {fmt.Printf([goroutine] msg: %v\n, msg)}
}func main(){go printMsg(java is the best!)// go printMsg(spring cloud is all you need!)for i : 0; i 5; i {runtime.Gosched()fmt.Println([main] golang concurrent...)}fmt.Println([main] continue...)
}每次主线程运行到runtime.Gosched()时将CPU时间片交出去因此go printMsg任务会先执行打印5次之后主线程再打印5次。
runtime.Goexit() 退出当前协程runtime.GOMAXPROCS 默认使用本机的最大CPU核数sync.Mutex 互斥锁
var (FactorialMap make(map[int]uint64, 16)// 声明一个全局互斥锁lock sync.Mutex
)func main(){// 向map中写入数据for i : 1; i 20; i {go factorial(i)}// 防止主线程执行完毕goroutine还没完成任务time.Sleep(time.Second * 3)// 防止主线程和协程对临界资源的读写并发lock.Lock()for i, v : range FactorialMap {fmt.Printf(map[%d]%d\n, i, v)}lock.Unlock()
}func factorial(n int) {var res uint64res 1for i : 1; i n; i {res * uint64(i)}// 存在并发写问题 - concurrent map writes// 需要加入互斥锁lock.Lock()FactorialMap[n] reslock.Unlock()
}select和switch select用于处理异步IO操作select可以监听case语句中channel的读写操作当case中channel读写操作为非阻塞状态时可读可写触发相应的动作。解决从管道读取数据的阻塞问题在遍历channel时如果不关闭则会发生阻塞导致deadlock select中的case语句必须是一个channel操作default语句总是可执行的 如果有多个case都可运行select可随机选出一个执行 如果没有case可以执行那么执行default逻辑 如果没有case可以执行且没有default语句select将会阻塞直到某个case可以执行
var (intChan make(chan int)strChan make(chan string)
)
func main(){go func() {intChan - 100strChan - golangdefer close(intChan)defer close(strChan)}()for {select {case r : -intChan:fmt.Printf([int chan] r: %v\n, r)case r : -strChan:fmt.Printf([string chan] r: %v\n, r)default:fmt.Println(no channel can be read!)}}fmt.Println([main] continue...)
}Timer
定时器用于实现一些定时操作内部也是通过channel实现的
func main(){timer1 : time.NewTimer(time.Second * 2)t1 : time.Now()fmt.Printf(time1: %v\n, t1)// timer1.C阻塞直至2s结束继续执行t2 : -timer1.Cfmt.Printf(time2: %v\n, t2)timer2 : time.NewTimer(time.Second * 2)-timer2.Cfmt.Println(2s 后...)fmt.Printf(time3: %v\n, time.Now())timer3 : time.NewTimer(time.Second)go func() {-timer3.Cfmt.Println(timer3 blocked!)}()// 定时器停止上面匿名函数中的-timer3.C就不会阻塞了stop : timer3.Stop()if stop {fmt.Println(timer3 stopped!)}
}Ticker
Timer只会执行一次Ticker可以周期性的执行
func main(){ticker : time.NewTicker(time.Second)var sum intintChan : make(chan int)// 每隔1s向intChan中写入一个数select语句从三个case分支随机选择一个执行// 主线程一直在读取直到sum10读取结束go func() {for _ range ticker.C {select {case intChan - 1:fmt.Println(int channel写入1)case intChan - 2:fmt.Println(int channel写入2)case intChan - 3:fmt.Println(int channel写入3)}}}()for v : range intChan {fmt.Println(从int channel中读取到: , v)sum vif sum 10 {break}}
}原子变量
类似于JUC中的AtomicInteger原子整型等使用CAS机制进行同步。常见原子操作有
加减 atomic.AddInt32(num, 1)read atomic.LoadInt32(num)CAS atomic.CompareAndSwapInt32(num, 100, 200) 如果num100修改为200否则此次CAS失败交换 atomic.SwapInt32(num, 200)write atomic.StoreInt32(num, 200)
var num int32func AtomicTest() {for i : 0; i 100; i {go add()go sub()}fmt.Printf(num: %v\n, num)
}func add() {atomic.AddInt32(num, 1)fmt.Printf([add method] num: %v\n, num)
}func sub() {atomic.AddInt32(num, -1)fmt.Printf([sub method] num: %v\n, num)
}