协程
原创大约 11 分钟
goroutine
package main
import (
"fmt"
"time"
)
// go并发编程较为简单,可以比较容易地写出高并发应用
// go没有线程,只有协程,goroutine就是go中的协程
func asyncPrint() {
fmt.Println("这是协程打印的结果")
}
// main()函数本身就是一个主协程
func main() {
// 主死从随
// 非常简单地即可调用协程:只需要 go [函数名称] 就完成了多线程调用
//go asyncPrint()
// 使用匿名函数启动goroutine的方式
//go func() {
// fmt.Println("这是协程打印的结果")
//}()
// 启动多个匿名goroutine
// 1. 闭包
// 2. for循环的问题
for i := 0; i < 100; i++ {
// 这样直接打印结果,会出现很多重复的i值,这是因为for循环时,i变量会被重用,也因为协程运行速度太快,i值还来不及改变
//go func() {
// fmt.Printf("这是第 %d 个协程打印的结果\r\n", i)
//}()
// 第一种解决办法,重新声明一个变量temp,复制i的值
//temp := i
//go func() {
// fmt.Printf("这是第 %d 个协程打印的结果\r\n", temp)
//}()
// 第二种解决办法:利用值传递,更优雅
go func(i int) {
fmt.Printf("这是第 %d 个协程打印的结果\r\n", i)
}(i)
}
// 协程的启动和结束都是由系统自动管理,不需要我们手动去管理
defer fmt.Println("main goroutine end")
fmt.Println("main goroutine")
// 为了让main()主协程存活更久一些,让 go asyncPrint() 能够来得及打印出结果,需要等待一段时间
time.Sleep(1 * time.Second)
}
gmp调度原理

Java协程的M:N调度参考的应该就是gmp机制。
各个
goroutine
之间有排队机制:如果哪个goroutine
出现了死循环现象,则会被挂起,让后续goroutine
继续执行。如果某个
goroutine
出现耗时的底层线程操作,例如g2
,那么调度器会把g2
和某个操作系统线程单独绑定
,让后它自己再换绑到其他的操作系统线程上。每个处理器只能
绑定
一个操作系统线程,但可以调度多个goroutine
。处理器对于
goroutine
的调度,以及各个处理器之间的调度协调是十分复杂的。
waitgroup:你先走,我再走
package main
import (
"fmt"
"sync"
)
// waitgroup 让协程执行完成后再执行主函数
func main() {
var waitGroup sync.WaitGroup
// 监控多少个goroutine执行结束,如果事先知道有多少个协程,就可以放在开头
//waitGroup.Add(100)
for i := 0; i < 100; i++ {
// 如果事先不知道有多少个协程,可以放在循环里面,每循环一次增加一个waitGroup.Add(1)
waitGroup.Add(1)
go func(i int) {
// 这个方法和waitGroup.Add(x)成对出现,可以加上defer
defer waitGroup.Done()
fmt.Println(i)
}(i)
}
// 等待所有协程执行结束后才结束主协程
waitGroup.Wait()
fmt.Println("all done")
}
mutex锁和原子包atomic
package main
import (
"fmt"
"sync"
"sync/atomic"
)
/*
解决资源竞争
*/
var total int
var total2 int32
var waitGroup sync.WaitGroup
// 为了防止资源竞争,最简单的方式就是加锁
// 之所以声明为全局变量,是因为要用同一把锁,否则没效果
var lock sync.Mutex
func add() {
defer waitGroup.Done()
for i := 0; i < 100000; i++ {
lock.Lock()
total += 1
lock.Unlock()
}
}
func sub() {
defer waitGroup.Done()
for i := 0; i < 100000; i++ {
lock.Lock()
total -= 1
lock.Unlock()
}
}
// 使用原子包
func add2() {
defer waitGroup.Done()
for i := 0; i < 100000; i++ {
atomic.AddInt32(&total2, 1)
}
}
// 使用原子包
func sub2() {
defer waitGroup.Done()
for i := 0; i < 100000; i++ {
atomic.AddInt32(&total2, -1)
}
}
func main() {
waitGroup.Add(2)
//// 先执行加法
//go add()
//// 再执行减法
//go sub()
// 这一次换用原子包执行操作
go add2()
go sub2()
waitGroup.Wait()
fmt.Println("total =", total)
fmt.Println("all done")
}
RWMutex读写锁
package main
import (
"fmt"
"sync"
"time"
)
var rwlock sync.RWMutex
var number = 1
var waitGroup sync.WaitGroup
// 读写锁
func main() {
waitGroup.Add(2)
// 读数据
go func() {
defer waitGroup.Done()
for {
// 读锁是共享锁:不会阻塞其他的读写锁,直到释放
rwlock.RLock()
time.Sleep(300 * time.Millisecond)
fmt.Println("获得读锁")
rwlock.RUnlock()
}
}()
// 写数据
go func() {
time.Sleep(1 * time.Second)
defer waitGroup.Done()
rwlock.Lock()
// 写锁是独占锁:会阻塞其他的读写锁,直到释放
defer rwlock.Unlock()
number++
fmt.Println("获得写锁 --->")
time.Sleep(3 * time.Second)
fmt.Println("number:", number)
}()
waitGroup.Wait()
}
channel
go语言的设计理念
不要通过共享内存来通信,而要通过通信来实现内存共享
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var waitGroup sync.WaitGroup
var waitGroup2 sync.WaitGroup
// 生产者,只能往里写数据
func producer(out chan<- string) {
defer waitGroup.Done()
for i := 0; i < 100; i++ {
out <- strconv.Itoa(i)
}
close(out)
}
// 消费者,只能从里读数据
func consumer(in <-chan string) {
defer waitGroup.Done()
//for data := range c {
// fmt.Println(data)
//}
for {
data, ok := <-in
if !ok {
break
}
fmt.Println(data)
}
}
// goroutine 之间通讯采用的是channel
func main() {
/*
go提供语法糖,让使用channel更简单
*/
// channel的缓存空间大小如果为0,那么放值进去会被阻塞
msg := make(chan string, 1)
// 下面两种方式都可以
// 1. 通过匿名函数
//go func() {
// msg <- "hello world"
//}()
// 2. 直接放值
msg <- "hello world"
// 将值取出并打印,不经过中间变量
//fmt.Println(<-msg)
// 下面的方法等同于上面的代码 fmt.Println(<-msg)
data := <-msg
fmt.Println(data)
/*
go有happens-before的机制,这一点和java线程异曲同工
"happens-before"指的是一种保证,确保某些内存操作在其他内存操作之前完成
通常,编译器和处理器可以自由地重新排序内存操作以提高性能,但这可能导致并发程序中的竞争条件
"happens-before"确保了在特定同步事件之间的内存操作顺序不会被打乱
go语言中"happens-before"关系包括:
1. 初始化:在 goroutine 的创建之前,对变量的所有初始化操作都保证在该 goroutine 内的其他操作之前完成
2. channel通道操作:对通道的发送操作在接收操作之前完成
换句话说,当一个值从通道接收时,发送该值的操作一定在接收操作之前完成
3. 互斥锁:获取互斥锁的操作在互斥锁被释放之前完成
这意味着,在持有互斥锁时对共享资源进行的操作,在互斥锁被释放之前都是可见的
4. sync.WaitGroup:对 sync.WaitGroup 的 Add 和 Done 操作在 Wait 操作之前完成
这保证了在 WaitGroup 的计数器达到零之前,所有相关的 goroutine 都已经完成各自的工作
*/
// 下面的代码运行情况符合“happens-before”的第1条
var a int = 1
var b int = 2
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
a = 2
b = 3
fmt.Println(a, b)
}()
fmt.Println(a, b)
fmt.Println("main goroutine end")
waitGroup.Wait()
// 有缓冲channel和无缓冲channel的应用场景
// 无缓冲channel适用于短期通知,例如,C要第一时间知道P是否已经完成
// 有缓冲channel适用于消费者和生产者之间的通信,例如,C要等待P生产数据,然后才能消费
// go中channel的应用场景:
// 1. 消息传递、过滤
// 2. 信号广播
// 3. 事件订阅、广播
// 4. 任务分发
// 5. 结果汇总
// 6. 并发控制
// 7. 同步和异步
// 对channel进行遍历
msg2 := make(chan int, 2)
go func(msg2 chan int) {
//data := <-msg2
//fmt.Println(data)
for data := range msg2 {
fmt.Println(data)
}
// 如果不主动关闭channel,这里永远不会被执行
fmt.Println("all done")
}(msg2)
msg2 <- 1
msg2 <- 2
// 这里主动关闭channel。已经关闭的channel是不可写的,但是可以读,只不过会被阻塞
close(msg2)
data2 := <-msg2
fmt.Println(data2)
time.Sleep(1 * time.Second)
// 单向channel
// 默认情况下,channel是双向的,既可以发也可以收
// 但是,将channel作为参数传递时,希望它只能发或只能收,可以给channel指定方向
write := make(chan<- int, 1) // 定义一个单向channel,只能写入,不能读取
//read1 := make(<-chan int, 1) // 定义一个单向channel,只能读取,不能写入
write <- 1
//<-read1
// 将双向channel转换为单向channel
channel := make(chan int, 2)
var send chan<- int = channel // 将双向channel变成只能写入
//var read2 <-chan int = channel // 将双向channel变成只能读取
send <- 1
//<-read2
// 调用生产者与消费者
channel2 := make(chan string, 1)
waitGroup.Add(2)
go consumer(channel2)
go producer(channel2)
waitGroup.Wait()
// 通过channel实现交叉打印
// 两个goroutine交替打印序列,一个打印数字,一个打印字母。先打印数字,最终效果为:
// 1A2B3C4D5E6F7G8H9I10J11K12L13M14N15O16P17Q18R19S20T21U22V23W24X25Y26Z
number, letter := make(chan bool, 1), make(chan bool, 1)
waitGroup2.Add(2)
// 先往number里放一个值,表示可以开始打印数字了
number <- true
go func() {
defer waitGroup2.Done()
for i := 1; i <= 26; i++ {
// 可以开始打印了
<-number
fmt.Printf("%d", i)
// 往letter里放值,表示可以开始打印字母了
letter <- true
}
}()
go func() {
defer waitGroup2.Done()
for i := 'A'; i <= 'Z'; i++ {
// 等待从letter中取值
<-letter
fmt.Printf("%c", i)
// 再往number里放一个值,表示可以接着打印数字了
number <- true
}
}()
waitGroup2.Wait()
}
select
select的作用
select
类似于switch
,但它的功能和Linux中的select
、poll
、epoll
类似,用于监控多个channel
。
package main
import (
"fmt"
"sync"
"time"
)
// 全局标识位
var flag bool = false
var lock sync.Mutex
// 监控goroutine的执行
// 使用全局变量来监控goroutine的执行
func g1() {
time.Sleep(1 * time.Second)
lock.Lock()
defer lock.Unlock()
flag = true
}
func g2() {
time.Sleep(2 * time.Second)
lock.Lock()
defer lock.Unlock()
flag = true
}
// 使用channel来监控goroutine的执行
func g3(channel chan struct{}) {
// 只需要往里放一个空的结构体就行了
channel <- struct{}{}
}
func g4(channel chan struct{}) {
channel <- struct{}{}
}
func main() {
// 现在有多个goroutine正在执行,但当某一个执行完成后,希望知道到底是哪一个执行完成
// 方式一:使用全局变量,但是无法确切地知道到底是哪一个goroutine已经完成
//go g1()
//go g2()
//for {
// if flag {
// fmt.Println("g1 or g2 is done")
// time.Sleep(10 * time.Millisecond)
// return
// }
//}
// 方式二:使用channel,但是需要自己写一个监控的goroutine
// channel是线程安全的
var channel1 = make(chan struct{}, 1)
var channel2 = make(chan struct{}, 1)
//channel1 <- struct{}{}
//channel2 <- struct{}{}
go g3(channel1)
go g4(channel2)
// 如果g3和g4都完成了,那么select就是随机的,这样做的目的是防止饥饿
for {
// 为了防止g3或g4阻塞,增加了default分支
// 但如果每次一进来就跳到default分支,那么g3和g4就没有执行的机会
// 所以,增加了for循环,并且在default分支执行前增加了 time.Sleep(1 * time.Second)
// 但这样一来,又会不停地打印 "g3 and g4 are done"
// 更优雅地方式是使用 time.NewTimer(1 * time.Second),它会返回一个time的channel
timeChannel := time.NewTimer(1 * time.Second)
select {
case <-channel1:
fmt.Println("g3 is done")
case <-channel2:
fmt.Println("g4 is done")
case <-timeChannel.C:
fmt.Println("timeout")
return
//default: // 防止g3或g4阻塞
// time.Sleep(1 * time.Second)
// fmt.Println("g3 and g4 are done")
}
}
}
context
一个特殊的接口
它可以实现跨API
和进程进行传值,比channel
更重量级
package main
import (
"context"
"fmt"
"sync"
"time"
)
var waitGroup sync.WaitGroup
// 有一个goroutine监控cpu的信息
// 方式1:现在需要能够主动退出监控,可以使用共享变量
var isExit bool
// 方式2:不使用变量,使用全局channel
var channel = make(chan struct{}, 1)
func cpu() {
defer waitGroup.Done()
for {
// 1. 第一种方式
//if isExit {
// fmt.Println("退出监控")
// break
//}
// 2. 第二种方式
//tc := time.NewTimer(3 * time.Second)
select {
case <-channel:
fmt.Println("退出监控")
return
default:
time.Sleep(1 * time.Second)
// 监控cpu信息
fmt.Println("获得cpu的信息")
}
}
}
// 方式2.5:使用channel参数,这种方式比 2 更优雅
func cpuWithChannel(channel chan struct{}) {
defer waitGroup.Done()
for {
select {
case <-channel:
fmt.Println("退出监控")
return
default:
time.Sleep(1 * time.Second)
// 监控cpu信息
fmt.Println("获得cpu的信息")
}
}
}
// 现在需要在main中向子goroutine传递一些消息,例如超时、退出,甚至一些数据
// 方式3:除了channel,go提供了一种比方式2.5更优雅的选择:context
// context包提供了几种函数:
// 1. context.WithCancel(context.Background()):创建一个context,并返回一个cancel函数
// 2. context.WithTimeout(context.Background(), timeout):创建一个context,并返回一个timeout函数
// 3. context.WithDeadline(context.Background(), deadline):创建一个context,并返回一个deadline函数
// 4. context.WithValue(context.Background(), key, value):创建一个context,并返回键值对
func cpuWithContext(ctx context.Context) {
// 在这里拿到WithValue传过来的数据
fmt.Println("orderid =", ctx.Value("orderid"))
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("退出监控")
return
default:
time.Sleep(1 * time.Second)
// 监控cpu信息
fmt.Println("获得cpu的信息")
}
}
}
func main() {
// 使用goroutine + waitGroup实现监控cpu信息
// 方式1、方式2和方式2.5
waitGroup.Add(1)
// 方式1和方式2
// go cpu()
// 方式2.5:改进
//var channel2 = make(chan struct{}, 1)
//go cpuWithChannel(channel2)
// 方式1:使用全局变量退出监控
//time.Sleep(3 * time.Second)
//isExit = true
// 方式2:使用全局channel退出监控
//channel <- struct{}{}
// 方式2.5:使用channel参数退出监控
//channel2 <- struct{}{}
// 方式3 begin:使用context退出监控
// 如果希望函数被控制,但不希望影响原接口信息,那么在函数参数列表中尽量中要加上context,而且要放在第一个
// 即使不用它,对函数也没有任何影响,这是一个习惯
// 因为context是一个树型结构,因此可以继续创建新的context
//ctx1, cancel1 := context.WithCancel(context.Background())
//ctx2, _ := context.WithCancel(ctx1)
//// 这里调用ctx2(也就是子context)同样可以实现退出监控,说明它的行为具有传递性
//go cpuWithContext(ctx2)
//time.Sleep(3 * time.Second)
//cancel1()
//// 方式3 end
///////////////////////////// WithValue 和 WithTimeout 的应用场景 /////////////////////////////
// WithTimeout 主动超时
//ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
// WithDeadline是在具体时间点超时
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second))
// WithValue 主动传值
valueCtx := context.WithValue(ctx, "orderid", "2088io878jsadasd878adas")
go cpuWithContext(valueCtx)
waitGroup.Wait()
fmt.Println("监控完成")
}
感谢支持
更多内容,请移步《超级个体》。