go的goroutine

go中最重要的概念之一就是协程,协程的创建方式也很简单,go func()就简单的创建了一个协程,虽然go语言中并没有规定协程的 创建数据限制,每个协程相对于线程的成本也很低,协程最小占用只有2Kb1000个协程才2M,所以成本极低。

但创建容易的同时也应该注意以下几点:

对于panic捕获的安全性

我们一般使用go func()直接创建协程了,但一般在代码中,很少直接这么使用,一般会在外面再封装一层。具体的封装方式类似这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package thread

import (
	"fmt"
	"runtime/debug"
)

func GoSafe(fn func()) {
	go RunSafe(fn)
}

func RunSafe(fn func()) {
	// 可以將panic上报给sentry等报警平台,或者钉钉,飞书机器人的钩子等
	defer func() {
		if p := recover(); p != nil {
			fmt.Printf("RunSafe capture crash,msg: %s \n %s ", fmt.Sprint(p), string(debug.Stack()))
		}
	}()
	fn()
}

原因很简单,go语言外层的defer是捕获不了内部goroutine的异常的,所以如果内部的goroutine发生panic,那会导致整个程序直接crash。 因此,使用goroutine的话最好再加一层安全封装,以防止潜在的panic导致的crash

goroutine的编排

go语言内部,有goroutine的编排库,比如用的比较多的waitgroup。 waitgroup相关的编排代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package waitgroup

import (
	"fmt"
	"sync"
)

func printA(wg *sync.WaitGroup)  {
	defer wg.Done()
	fmt.Println("A func")
}

func printB(wg *sync.WaitGroup)  {
	defer wg.Done()
	fmt.Println("B func")
}

func Wg()  {
	wg := &sync.WaitGroup{}
	wg.Add(2)
	go printA(wg)
	go printB(wg)
	wg.Wait()
}

但实际项目开发中,会更多的使用其一些变种库。比如errGroup. errGroup相关的编排代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package errgroup

import (
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

func PrintA()  {
	fmt.Println("A func")
}

func PrintB()  {
	fmt.Println("B func")
}

func PrintC(i int)  {
	fmt.Println(i)
	time.Sleep(5 * time.Second)
}

func Eg()  error{
	eg := errgroup.Group{}
	eg.SetLimit(5)
	eg.Go(func() error {
		PrintA()
		return nil
	})
	eg.Go(func() error {
		PrintB()
		return nil
	})
	for i:= 0; i < 50; i ++ {
		temp := i
		eg.Go(func() error {
			PrintC(temp)
			return nil
		})
	}
	//eg.SetLimit(1)
	if err := eg.Wait(); err != nil{
		fmt.Println(err)
		return err
	}
	return nil
}

errgroup在waitgroup上增加了以下几个功能。

  1. error 捕获,当遇到第1个出错的goroutine时,就直接暂停整组的执行。
  2. 增加流控限制,不再无限制的开启goroutine,主要还是集中在如果某个for循环,我们需要对某组数据并行执行,那瞬时就会生成很多的goroutine,因此需要流控,防止过多的goroutine造成的性能压力
  3. 可以不用自己控制add与done,add与done其实自己控制很容易出问题,数量不匹配都会导致panic或deadlock

协程池

一般我们在处理goroutine采用errgroup编排就行,但如果并发更大的场景,也会用到协程池来控制, 协程池可以尽量复用协程,据压测数据显示,1000w的协程,只创建了70W的协程 具体了解:https://github.com/panjf2000/ants

singleflight

SingleFlight的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。

常见的场景就是请求缓存,用于解决缓存穿透问题。redis做缓存时,我们可以加这么一层, 防止redis缓存失效时,直接穿透到后端mysql. https://pkg.go.dev/golang.org/x/sync@v0.2.0/singleflight

一个小例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// RedisCache 内置防穿透策略,将fn结果缓存在redis中
func RedisCache(redisCli *redis.Client, ctx context.Context, key string, expire time.Duration, v interface{}, fn func() (interface{}, error)) error {
	val, err := singleflight.Do(key, func() (interface{}, error) {
		val, err := redisCli.Get(ctx, key)
		if err != nil && err != redisPkg.ErrNil {
			return nil, err
		}
		if len(val) != 0 {
			return []byte(val), nil
		}
		resp, err := fn()
		if err != nil {
			return nil, err
		}
		//save cache
		b, err := json.Marshal(resp)
		if err != nil {
			return nil, err
		}
		_, err = redisCli.Set(ctx, key, string(b), expire)
		return b, err
	})
	if err != nil {
		return err
	}
	return json.Unmarshal(val.([]byte), v)
}

go-zero以redis作缓存中加的一层: https://github.com/zeromicro/go-zero/blob/master/core/stores/cache/cache.go

潜在坑

  • 一个阻塞,全员等待,解决方案,超时控制
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ch := g.DoChan(key, func() (interface{}, error) {
    ret, err := find(context.Background(), key)
    return ret, err
})
// Create our timeout
timeout := time.After(500 * time.Millisecond)

var ret singleflight.Result
select {
case <-timeout: // Timeout elapsed
        fmt.Println("Timeout")
    return
case ret = <-ch: // Received result from channel
    fmt.Printf("index: %d, val: %v, shared: %v\n", j, ret.Val, ret.Shared)
}
  • 一个出错,全部出错,解决方案,降低请求数量
1
2
3
4
5
6
7
8
9
 v, _, shared := g.Do(key, func() (interface{}, error) {
       go func() {
           time.Sleep(10 * time.Millisecond)
           fmt.Printf("Deleting key: %v\n", key)
           g.Forget(key)
       }()
       ret, err := find(context.Background(), key)
       return ret, err
   })

context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 直接使用parent cancelCtx
func f1(ctx context.Context) {
	go func() {
		select {
		case <-ctx.Done():
			fmt.Println("goroutine created by f1 exit")
		}
	}()
}

// 基于parent cancelCtx创建新的cancelCtx
func f2(ctx context.Context) {
	ctx1, _ := context.WithCancel(ctx)
	go func() {
		select {
		case <-ctx1.Done():
			fmt.Println("goroutine created by f2 exit")
		}
	}()
}

// 使用基于parent cancelCtx创建的valueCtx
func f3(ctx context.Context) {
	ctx1 := context.WithValue(ctx, "key3", "value3")
	go func() {
		select {
		case <-ctx1.Done():
			fmt.Println("goroutine created by f3 exit")
		}
	}()
}

// 基于parent cancelCtx创建的valueCtx之上创建cancelCtx
func f4(ctx context.Context) {
	ctx1 := context.WithValue(ctx, "key4", "value4")
	ctx2, _ := context.WithCancel(ctx1)
	go func() {
		select {
		case <-ctx2.Done():
			fmt.Println("goroutine created by f4 exit")
		}
	}()
}

func CtxCancel() {
	valueCtx := context.WithValue(context.Background(), "key0", "value0")
	cancelCtx, cancel := context.WithCancel(valueCtx)
	f1(cancelCtx)
	f2(cancelCtx)
	f3(cancelCtx)
	f4(cancelCtx)

	time.Sleep(3 * time.Second)
	fmt.Println("cancel all by main")
	cancel()
	time.Sleep(10 * time.Second) // wait for log output
}
  • 超时控制的方法调用
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func slowServer(w http.ResponseWriter, r *http.Request) {
	time.Sleep(10 * time.Second)
	w.Write([]byte("Hello world!"))
}

func call() error {
	client := &http.Client{}
	req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
	if err != nil {
		return err
	}
	ctx, cancel := context.WithTimeout(req.Context(), 1*time.Second)
	defer cancel()
	req = req.WithContext(ctx)
	_, err = client.Do(req)
	return err
}

func CtxTimeout() {
	// run slow server
	go func() {
		http.HandleFunc("/", slowServer)

		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	time.Sleep(1 * time.Second) // wait for server to run

	// call server
	err := call()
	if errors.Is(err, context.DeadlineExceeded) {
		log.Println("ContextDeadlineExceeded: true")
	}
	if os.IsTimeout(err) {
		log.Println("IsTimeoutError: true")
	}
	if err != nil {
		log.Println(err)
	}
}

代码风格

uber的一个代码风格文档,我觉得是目前看到的比较好的一个规范总结了,可以借鉴 https://github.com/xxjwxc/uber_go_guide_cn

微服务框架推荐

go-zero :https://github.com/zeromicro/go-zero kratos: https://github.com/go-kratos/kratos