go的goroutine#
go中最重要的概念之一就是协程,协程的创建方式也很简单,go func()
就简单的创建了一个协程,虽然go
语言中并没有规定协程的
创建数据限制,每个协程相对于线程的成本也很低,协程最小占用只有2Kb
, 1000
个协程才2M
,所以成本极低。
但创建容易的同时也应该注意以下几点:
对于panic捕获的安全性#
我们一般使用go func()
直接创建协程了,但一般在代码中,很少直接这么使用,一般会在外面再封装一层。具体的封装方式类似这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package thread
import (
"fmt"
"runtime/debug"
)
func GoSafe(fn func()) {
go RunSafe(fn)
}
func RunSafe(fn func()) {
// 可以將panic上报给sentry等报警平台,或者钉钉,飞书机器人的钩子等
defer func() {
if p := recover(); p != nil {
fmt.Printf("RunSafe capture crash,msg: %s \n %s ", fmt.Sprint(p), string(debug.Stack()))
}
}()
fn()
}
|
原因很简单,go
语言外层的defer
是捕获不了内部goroutine
的异常的,所以如果内部的goroutine
发生panic
,那会导致整个程序直接crash
。
因此,使用goroutine
的话最好再加一层安全封装,以防止潜在的panic
导致的crash
。
goroutine的编排#
go语言内部,有goroutine的编排库,比如用的比较多的waitgroup。
waitgroup相关的编排代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package waitgroup
import (
"fmt"
"sync"
)
func printA(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("A func")
}
func printB(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("B func")
}
func Wg() {
wg := &sync.WaitGroup{}
wg.Add(2)
go printA(wg)
go printB(wg)
wg.Wait()
}
|
但实际项目开发中,会更多的使用其一些变种库。比如errGroup.
errGroup相关的编排代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package errgroup
import (
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func PrintA() {
fmt.Println("A func")
}
func PrintB() {
fmt.Println("B func")
}
func PrintC(i int) {
fmt.Println(i)
time.Sleep(5 * time.Second)
}
func Eg() error{
eg := errgroup.Group{}
eg.SetLimit(5)
eg.Go(func() error {
PrintA()
return nil
})
eg.Go(func() error {
PrintB()
return nil
})
for i:= 0; i < 50; i ++ {
temp := i
eg.Go(func() error {
PrintC(temp)
return nil
})
}
//eg.SetLimit(1)
if err := eg.Wait(); err != nil{
fmt.Println(err)
return err
}
return nil
}
|
errgroup在waitgroup上增加了以下几个功能。
- error 捕获,当遇到第1个出错的goroutine时,就直接暂停整组的执行。
- 增加流控限制,不再无限制的开启goroutine,主要还是集中在如果某个for循环,我们需要对某组数据并行执行,那瞬时就会生成很多的goroutine,因此需要流控,防止过多的goroutine造成的性能压力
- 可以不用自己控制add与done,add与done其实自己控制很容易出问题,数量不匹配都会导致panic或deadlock
协程池#
一般我们在处理goroutine采用errgroup编排就行,但如果并发更大的场景,也会用到协程池来控制, 协程池可以尽量复用协程,据压测数据显示,1000w的协程,只创建了70W的协程
具体了解:https://github.com/panjf2000/ants
singleflight#
SingleFlight的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个
goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。
常见的场景就是请求缓存,用于解决缓存穿透问题。redis做缓存时,我们可以加这么一层, 防止redis缓存失效时,直接穿透到后端mysql.
https://pkg.go.dev/golang.org/x/sync@v0.2.0/singleflight
一个小例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// RedisCache 内置防穿透策略,将fn结果缓存在redis中
func RedisCache(redisCli *redis.Client, ctx context.Context, key string, expire time.Duration, v interface{}, fn func() (interface{}, error)) error {
val, err := singleflight.Do(key, func() (interface{}, error) {
val, err := redisCli.Get(ctx, key)
if err != nil && err != redisPkg.ErrNil {
return nil, err
}
if len(val) != 0 {
return []byte(val), nil
}
resp, err := fn()
if err != nil {
return nil, err
}
//save cache
b, err := json.Marshal(resp)
if err != nil {
return nil, err
}
_, err = redisCli.Set(ctx, key, string(b), expire)
return b, err
})
if err != nil {
return err
}
return json.Unmarshal(val.([]byte), v)
}
|
go-zero以redis作缓存中加的一层:
https://github.com/zeromicro/go-zero/blob/master/core/stores/cache/cache.go
潜在坑#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
ch := g.DoChan(key, func() (interface{}, error) {
ret, err := find(context.Background(), key)
return ret, err
})
// Create our timeout
timeout := time.After(500 * time.Millisecond)
var ret singleflight.Result
select {
case <-timeout: // Timeout elapsed
fmt.Println("Timeout")
return
case ret = <-ch: // Received result from channel
fmt.Printf("index: %d, val: %v, shared: %v\n", j, ret.Val, ret.Shared)
}
|
1
2
3
4
5
6
7
8
9
|
v, _, shared := g.Do(key, func() (interface{}, error) {
go func() {
time.Sleep(10 * time.Millisecond)
fmt.Printf("Deleting key: %v\n", key)
g.Forget(key)
}()
ret, err := find(context.Background(), key)
return ret, err
})
|
context#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
// 直接使用parent cancelCtx
func f1(ctx context.Context) {
go func() {
select {
case <-ctx.Done():
fmt.Println("goroutine created by f1 exit")
}
}()
}
// 基于parent cancelCtx创建新的cancelCtx
func f2(ctx context.Context) {
ctx1, _ := context.WithCancel(ctx)
go func() {
select {
case <-ctx1.Done():
fmt.Println("goroutine created by f2 exit")
}
}()
}
// 使用基于parent cancelCtx创建的valueCtx
func f3(ctx context.Context) {
ctx1 := context.WithValue(ctx, "key3", "value3")
go func() {
select {
case <-ctx1.Done():
fmt.Println("goroutine created by f3 exit")
}
}()
}
// 基于parent cancelCtx创建的valueCtx之上创建cancelCtx
func f4(ctx context.Context) {
ctx1 := context.WithValue(ctx, "key4", "value4")
ctx2, _ := context.WithCancel(ctx1)
go func() {
select {
case <-ctx2.Done():
fmt.Println("goroutine created by f4 exit")
}
}()
}
func CtxCancel() {
valueCtx := context.WithValue(context.Background(), "key0", "value0")
cancelCtx, cancel := context.WithCancel(valueCtx)
f1(cancelCtx)
f2(cancelCtx)
f3(cancelCtx)
f4(cancelCtx)
time.Sleep(3 * time.Second)
fmt.Println("cancel all by main")
cancel()
time.Sleep(10 * time.Second) // wait for log output
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func slowServer(w http.ResponseWriter, r *http.Request) {
time.Sleep(10 * time.Second)
w.Write([]byte("Hello world!"))
}
func call() error {
client := &http.Client{}
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(req.Context(), 1*time.Second)
defer cancel()
req = req.WithContext(ctx)
_, err = client.Do(req)
return err
}
func CtxTimeout() {
// run slow server
go func() {
http.HandleFunc("/", slowServer)
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
time.Sleep(1 * time.Second) // wait for server to run
// call server
err := call()
if errors.Is(err, context.DeadlineExceeded) {
log.Println("ContextDeadlineExceeded: true")
}
if os.IsTimeout(err) {
log.Println("IsTimeoutError: true")
}
if err != nil {
log.Println(err)
}
}
|
代码风格#
uber的一个代码风格文档,我觉得是目前看到的比较好的一个规范总结了,可以借鉴
https://github.com/xxjwxc/uber_go_guide_cn
微服务框架推荐#
go-zero :https://github.com/zeromicro/go-zero
kratos: https://github.com/go-kratos/kratos