江西中企动力做的网站,网站开发和软件开发哪个难,网站建设与技术团队,中国最顶尖的服装设计公司一、集群抽象#xff1a;cluster
它是指我们在调用远程的时候#xff0c;尝试解决#xff1a;
1、failover:即引入重试功能#xff0c;但是重试的时候会换一个新节点
2、failfast: 立刻失败#xff0c;不需要重试
3、广播#xff1a;将请求发送到所有的节点上
4、组…一、集群抽象cluster
它是指我们在调用远程的时候尝试解决
1、failover:即引入重试功能但是重试的时候会换一个新节点
2、failfast: 立刻失败不需要重试
3、广播将请求发送到所有的节点上
4、组播组播和分组功能不太一样组播是指请求发送到一组节点上而不是只发送到一个单一节点上
注意failover
二、 gRPC 的 Interceptor 分成好几种
UnaryClientInterceptor: 用于拦截 gRPC unary 请求
StreamClientIntercepror 用于拦截 gRPC 的 stream 请求。
三、gRPC 广播注册中心获取所有节点
思路
利用拦截器捕获调用
利用注册中心来获取所有的服务实例
在拦截器内遍历所有的服务端实例
四、gRPC 限流
利用服务端拦截器调用进行限流逻辑
五、gRPC 广播实现
gRPC 广播利用客户端拦截器实现步骤也非常简单以下几步
1、利用注册中心获取所有节点
2、利用filter 过滤节点
3、grpc.Dial 循环调用节点发起tcp 请求
注意reflect.TypeOf(reply).Elem() 以及 reflect.New(typ).Interface() 生成一个新的Reply 避免覆盖, filter 是nil 就是广播非nil 就是组播
对节点进行过滤
type MyIntercaptor struct {register registry.Registermethod stringfilter Filter
}
type SetOptions func(optins *MyIntercaptor)
func NewMyInterceptor(register registry.Register, method string, options ...SetOptions) *MyIntercaptor {t : MyIntercaptor{register: register,method: method,filter: func(g1 string, ctx context.Context) bool {return true},}for _, opt : range options {opt(t)}return t
}
func WithMyInterSetFilter(filter Filter) SetOptions {return func(option *MyIntercaptor) {option.filter filter}
}
func (m *MyIntercaptor) Intercaptor() grpc.UnaryClientInterceptor {return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {list, er : m.register.ListServices(ctx, m.method)if er ! nil {return er}// 是否是广播ok, resp : IsBroadCast(ctx)defer func() {close(resp)}()if !ok {return invoker(ctx, method, req, reply, cc, opts...)}var err errgroup.Groupfor _, li : range list {if !m.filter(li.Group, ctx) {continue}if li.Addr {continue}typ : reflect.TypeOf(reply).Elem()addr : li.Addr// 并发调用err.Go(func() error {dial, er : grpc.Dial(addr, grpc.WithInsecure())if er ! nil {resp - Resp{Err: er,}return nil}rep : reflect.New(typ).Interface()// 发送方法请求er invoker(ctx, method, req, rep, dial, opts...)resp - Resp{Err: er,Reply: rep,}return nil})
}return err.Wait()}
}
type Filter func(g1 string, ctx context.Context) bool
func NewFilter() Filter {return func(g1 string, ctx context.Context) bool {group, ok : ctx.Value(group).(string)return ok group g1}
}
type broadcastKey struct {
}
func UseBroadcastKey(ctx context.Context) (context.Context, chan Resp) {ch : make(chan Resp)return context.WithValue(ctx, broadcastKey{}, ch), ch
}
func IsBroadCast(ctx context.Context) (bool, chan Resp) {resp, ok : ctx.Value(broadcastKey{}).(chan Resp)return ok, resp
}
type Resp struct {Err errorReply any
}