1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func MakeGrpcServerInterceptor() func(context.Context, interface{}, *grpc.UnaryServerInfo, grpc.UnaryHandler) (interface{}, error) {
sentinel.Logger.Info("sentinel breaker server is open")
ruleLoadMap := sync.Map{}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
_scope := "server.grpc." + info.FullMethod
_, exists := ruleLoadMap.Load(_scope)
//如果该规则不存在,则说明是第一次请求,则需要加载规则,加载规则在并发时可以多次,反正是覆盖,不影响。
if !exists {
rules := getRuleByScope(strings.Split(_scope, "."))
circuitbreaker.LoadRulesOfResource(_scope, rules)
ruleLoadMap.Store(_scope, "1")
}
token, e := api.Entry(_scope, api.WithTrafficType(base.Inbound))
if e != nil {
sentinel.Logger.ContextErrorWithFields(ctx, logrus.Fields{
"err": "grpc接口熔断",
"rule": e.TriggeredRule(),
"_scope": _scope,
})
return nil, error2.SysErrSentinelBreaker
}
defer token.Exit()
res, err := handler(ctx, req)
if err != nil {
e := error2.ParseErrToNeoErr(err)
//只有系统错误才算 而且如果是外部资源熔断或者上游熔断返回的错误,是不触发下游节点的熔断的
if e.Code < error2.MaxSystemErr && e.Code != error2.ResourceErrSentinelBreaker.Code && e.Code != error2.SysErrSentinelBreaker.Code {
api.TraceError(token, e)
}
}
return res, err
}
}
|