go语言 grpc 拦截器

文章目录

    • 拦截器
      • 服务端拦截器
        • 一元拦截器
        • 流拦截器
      • 客户端拦截器
        • 一元拦截器
        • 流拦截`
      • 多个拦截器
  • 代码仓库

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

在这里插入图片描述

一元拦截器
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"
流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具体返回多少个response根据业务逻辑调整for i := 0; i < 10; i++ {// 通过 send 方法不断推送数据err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}type wrappedStream struct {// 包装器流grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ServerStream.SendMsg(m)
}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {return &wrappedStream{s}
}func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置处理log.Println("====== [Server Stream Interceptor] ", info.FullMethod)// 包装器流调用 流RPCerr := handler(srv, newWrappedStream(ss))if err != nil {log.Printf("RPC failed with error %v", err)}return err
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

客户端拦截器

在这里插入图片描述

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置处理逻辑log.Println("Method : " + method)// 调用invoker 执行远程方法err := invoker(ctx, method, req, reply, cc, opts...)// 后置处理逻辑log.Println(reply)return err
}func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {grpc.ClientStream
}func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.RecvMsg(m)
}func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.SendMsg(m)
}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {return &wrappedStream{s}
}func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置处理逻辑log.Println("======= [Client Interceptor] ", method)// 调用streamer 来获取客户端流s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}return newWrappedStream(s), nil
}func main(){// 注册拦截器到客户端流conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// 调用客户端流RPC方法searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {interceptors := cc.dopts.chainUnaryIntsif cc.dopts.unaryInt != nil {interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)}var chainedInt UnaryClientInterceptorif len(interceptors) == 0 {chainedInt = nil} else if len(interceptors) == 1 {chainedInt = interceptors[0]} else {chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)}}cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {n := len(interceptors)if n > 1 {lastI := n - 1return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {var (chainHandler grpc.UnaryInvokercurI         int)chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {if curI == lastI {return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)}curI++err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)curI--return err}return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)}}...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/213136.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

物联网+AI智慧工地云平台源码(SaaS模式)

智慧工地云平台充分运用数字化技术&#xff0c;聚焦施工现场岗位一线&#xff0c;依托物联网、互联网、AI等技术&#xff0c;围绕施工现场管理的人、机、料、法、环五大维度&#xff0c;以及施工过程管理的进度、质量、安全三大体系为基础应用&#xff0c;实现全面高效的工程管…

【hugging face】bitsandbytes中8 bit量化的理解

8 位量化使数十亿参数规模的模型能够适应更小的硬件&#xff0c;而不会降低性能。 8 位量化的工作原理如下&#xff1a; 1.从输入隐藏状态中按列提取较大值&#xff08;离群值&#xff09;。 2.对 FP16 中的离群值和 int8 中的非离群值执行矩阵乘法。 3.改变非异常值结果以将值…

代理IP怎么使用?Mac苹果系统设置http代理IP教程

代理IP是一种通过将请求转发到另一个服务器&#xff0c;以隐藏自己的真实IP地址的服务器。使用代理IP可以保护您的隐私和安全&#xff0c;防止被跟踪或被攻击。在本文中&#xff0c;我们将介绍如何在Mac苹果系统上设置http代理IP教程。 一、了解代理IP 代理IP地址是一种可以用来…

网络编程值UDP

1. 知识点 1.1 TCP和UDP优缺点 1.2 UDP通信流程 1.2.1 服务端 1. 创建udp套接字 2. 初始化服务端网络地址结构 3. 绑定服务端网络地址 4.创建结构体用来存储客户端网络地址结构 5. 接收客户数据 1.2.2 客户端 1. 创建udp套接字 2. 初始化服务器网络地址结构 3. 客户端先发送数…

STM32 map文件详解

文章目录 1. 前言2. 生成 .map 文件3 .map 文件的组成3.1 Section Cross References - 各个源文件之间函数的调用关系3.2 Removing Unused input sections from the image - 移除未使用的模块3.3 Image Symbol Table - 映射符号表&#xff1a;描述各&#xff08;程序段 / 数据&…

Python---类的综合案例

1、需求分析 设计一个Game类 属性&#xff1a; 定义一个类属性top_score记录游戏的历史最高分 定义一个实例属性player_name记录当前游戏的玩家姓名 方法&#xff1a; 静态方法show_help显示游戏帮助信息 类方法show_top_score显示历史最高分 实例方法start_game开始当前…

图的邻接链表储存

喷了一节课 。。。。。。。、。 #include<stdio.h> #include<stdlib.h> #define MAXNUM 20 //每一个顶点的节点结构&#xff08;单链表&#xff09; typedef struct ANode{ int adjvex;//顶点指向的位置 struct ArcNode *next;//指向下一个顶点 …

macOS Big Sur/Mac电脑安装vscode显示您没有权限来打开应用程序‘Visual Studio Code‘ 请联系您的电脑或网络管理员问题修复

错误方法 首先我以为我的权限不足。&#xff0c;需要去用户群组里设置。结果根本不是这个的问题。 1.在系统偏好设置->用户与群组检查了一下我的用户是不是管理员 结果发现是管理员 2.根据苹果提示&#xff0c;右键我的文件夹->显示简介->最下面的共享与权限 解锁&…

maven学习笔记总结

目录 一、maven简介 二、GAVP属性 三、基于 IDLE 的 Maven 工程创建 1&#xff09;java标准工程&#xff08;Javase&#xff09;的创建 2&#xff09;java企业工程&#xff08;Javaee&#xff09;的创建 a&#xff09;手动创建 b&#xff09;插件方式创建&#xff08;fil…

Linux:dockerfile编写搭建tomcat练习(9)

我使用的httpyum仓库 本地使用了5个文件&#xff0c;tomcat使用的官网解压直接用的包】 Dockerfile 主配置文件 基于centos基础镜像 jdk1.8.0_91 java环境 run.sh 启动脚本 centos.repo 仓库文件 tomcat 源码包 vim Dockerfile写入FROM centos MAINTAINER ta…

SpringAOP专栏二《原理篇》

上一篇SpringAOP专栏一《使用教程篇》-CSDN博客介绍了SpringAop如何使用&#xff0c;这一篇文章就会介绍Spring AOP 的底层实现原理&#xff0c;并通过源代码解析来详细阐述其实现过程。 前言 Spring AOP 的实现原理是基于动态代理和字节码操作的。不了解动态代理和字节码操作…

智能优化算法应用:基于食肉植物算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于食肉植物算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于食肉植物算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.食肉植物算法4.实验参数设定5.算法结果6.参考…

免费网页抓取工具大全【附下载和工具使用教程】

在当今信息爆炸的时代&#xff0c;获取准确而丰富的数据对于企业决策和个人研究至关重要。而网页抓取工具作为一种高效获取互联网数据的方式&#xff0c;正逐渐成为大家解决数据需求的得力助手。本文将深入探讨网页抓取工具的种类&#xff0c;并为大家提供简单实用的页面采集教…

HarmonyOS创建JavaScript(类 Web开发模式)项目

上文 HarmonyOS带大家创建自己的第一个Page页面并实现路由跳转(ArkTS)带大家创建了我们项目中第一个自己创建的page 并完成了一个跳转逻辑的编写 上文的开发模式是 ArkTS 的 也被称为 声明式开发范式 还有一种 javaScript的 类Web开发模式 这种方式就类似于我们传统的前端开发模…

注意力机制的快速学习

注意力机制的快速学习 注意力机制 将焦点聚焦在比较重要的事物上 我&#xff08;查询对象Q&#xff09;&#xff0c;这张图&#xff08;被查询对象V&#xff09; 我看一张图&#xff0c;第一眼&#xff0c;就会判断那些东西对我而言比较重要&#xff0c;那些对于我不重要&…

POJ 3735 Training little cats 动态规划(矩阵的幂)

一、题目大意 我们有N只猫&#xff0c;每次循环进行K次操作&#xff08;N<100&#xff0c;K<100&#xff09;&#xff0c;每次操作可有以下三种选择&#xff1a; 1、g i 给第i只猫1个食物 2、e i 让第i只猫吃完它所有的食物 3、s i j 交换第i和j只猫的食物。 求出M次…

UDP通信

第二十一章 网络通信 本章节主要讲解的是TCP和UDP两种通信方式它们都有着自己的优点和缺点 这两种通讯方式不通的地方就是TCP是一对一通信 UDP是一对多的通信方式 接下来会一一讲解 UDP通信 主要的方向是一对多通信方式 UDP通信就是一下子可以通信多个对象&#xff0c;这就…

电子眼+无人机构建平安城市视频防控监控方案

电子眼&#xff08;也称为监控摄像机&#xff09;可以通过安装在城市的不同角落&#xff0c;实时监控城市的各个地方。它们可以用于监测交通违法行为、监控公共场所的安全以及实时监测特定区域的活动情况。通过电子眼的应用&#xff0c;可以帮助警方及时发现并响应各类安全事件…

9.关于Java的程序设计-基于Springboot的家政平台管理系统设计与实现

摘要 随着社会的进步和生活水平的提高&#xff0c;家政服务作为一种重要的生活服务方式逐渐受到人们的关注。本研究基于Spring Boot框架&#xff0c;设计并实现了一种家政平台管理系统&#xff0c;旨在提供一个便捷高效的家政服务管理解决方案。系统涵盖了用户注册登录、家政服…

LVS-DR+Keepalived+动静分离实验

架构图 解释一下架构&#xff0c;大概就是用Keepalived实现两台DR服务器的LVS负载均衡&#xff0c;然后后端服务器是两台Nginx服务器两台Tomcat服务器并且实现动静分离这个实验其实就是把 LVS-DRKeepalived 和 动静分离 给拼起来&#xff0c;真的是拼起来&#xff0c;两个部分…