1、介绍
grpc服务端和客户端都提供了interceptor
功能,功能类似middleware
,很适合在这里处理 验证、日志 等流程。
在自定义Token认证的示例中,认证信息是由每个服务中的方法处理并认证的,如果有大量的接口方法,这种姿势就太不优雅了,每个接口实现都要先处理认证信息。
这个时候interceptor就可以用来解决了这个问题,在请求被转到具体接口之前处理认证信息,一处认证,到处无忧。
2、代码示例
2.1 服务端代码
package main
import (
"context"
"fmt"
"go_study/framework/grpc/pb" // 导入proto
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"net"
)
// 定义 UserInfoService 服务
type UserInfoService struct{
pb.UnimplementedUserInfoServiceServer
}
// GetUserInfo 实现方法
func (s *UserInfoService) GetUserInfo(ctx context.Context, req *pb.UserRequest) (resp *pb.UserResponse, err error) {
//err = tokenAuth(ctx)
//if err != nil {
// return nil, err
//}
// 通过用户名查询用户信息
name := req.Name
// 数据里查用户信息
if name == "zs" {
resp = &pb.UserResponse{
Id: 1,
Name: name,
Age: 22,
Hobby: []string{"Sing", "Run"},
}
}
return
}
type HelloService struct {
pb.UnimplementedHelloServiceServer
}
func (h *HelloService) SayHello(ctx context.Context, in *pb.HelloRequest) (resp *pb.HelloResponse, err error) {
//err = tokenAuth(ctx)
//if err != nil {
// return nil, err
//}
resp = new(pb.HelloResponse)
resp.Result = fmt.Sprintf("hello %s", in.Name)
return resp, nil
}
// 使用这种方式验证token 每个服务端每个服务中都验证一次,比较繁琐
func tokenAuth(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.Unauthenticated, "无Token认证信息")
}
var (
appid string
appkey string
)
if val, ok := md["appid"]; ok {
appid = val[0]
}
if val, ok := md["appkey"]; ok {
appkey = val[0]
}
if appid != "101010" || appkey != "i am key" {
return status.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)
}
return nil
}
// interceptor 拦截器
// grpc.UnaryServerInfo 包含有关服务器端一元 RPC 的各种信息。所有 per-rpc 信息都可能被拦截器改变
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
err := tokenAuth(ctx)
if err != nil {
return nil, err
}
// 继续处理请求
return handler(ctx, req)
}
func main(){
// 地址
addr := "127.0.0.1:8180"
// 1.监听
listener, err := net.Listen("tcp", addr)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
fmt.Printf("监听端口:%s\n", addr)
// tls认证
creds,err := credentials.NewServerTLSFromFile("./framework/grpc/keys/server.pem", "./framework/grpc/keys/server.key")
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
var opts []grpc.ServerOption
// 添加 tls 认证
opts = append(opts, grpc.Creds(creds))
// 注册拦截器
opts = append(opts, grpc.UnaryInterceptor(interceptor))
// 2.实例化gRPC
s := grpc.NewServer(opts...)
// 3.在gRPC上注册微服务
var u = UserInfoService{}
pb.RegisterUserInfoServiceServer(s, &u)
var h = HelloService{}
pb.RegisterHelloServiceServer(s, &h)
grpclog.Infoln("Listen on " + addr + " with TLS")
// 4.启动服务端
s.Serve(listener)
}
2.2 客户端代码
在客户端,我们增加一个请求日志,记录请求相关的参数和耗时等等。
package main
import (
"context"
"fmt"
"go_study/framework/grpc/pb" // 导入proto
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"time"
)
// OpenTLS 是否开启TLS认证
var OpenTLS = true
type customCredential struct {}
func (c *customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": "101010",
"appkey": "i am key",
}, nil
}
func (c *customCredential) RequireTransportSecurity() bool {
return OpenTLS
}
// interceptor 客户端拦截器
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
grpclog.Infof("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)
return err
}
func main(){
addr := "127.0.0.1:8180"
var err error
var opts []grpc.DialOption
if OpenTLS {
// TLS连接 记得把server name改成你写的服务器地址
creds, err := credentials.NewClientTLSFromFile("./framework/grpc/keys/server.pem", "www.admincms.com")
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
}else{
opts = append(opts, grpc.WithInsecure())
}
// 使用自定义认证
opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))
// 使用客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
// 1.连接
conn, err := grpc.Dial(addr, opts...)
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 2. 实例化gRPC客户端
client := pb.NewUserInfoServiceClient(conn)
// 3.组装请求参数
req := new(pb.UserRequest)
req.Name = "zs"
// 4. 调用接口
response, err := client.GetUserInfo(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
fmt.Printf("响应结果: %v\n", response)
// 客户端调用HelloService
hClient := pb.NewHelloServiceClient(conn)
res1,err := hClient.SayHello(context.Background(), &pb.HelloRequest{
Name: "grpc",
})
if err != nil {
grpclog.Fatalln(err)
}
fmt.Println("响应结果:", res1)
}
2.3 运行结果
服务端:
API server listening at: 127.0.0.1:8216
监听端口:127.0.0.1:8180
INFO: 2023/05/23 14:39:01 Listen on 127.0.0.1:8180 with TLS
WARNING: 2023/05/23 14:43:47 [transport] transport: http2Server.HandleStreams failed to read frame: read tcp 127.0.0.1:8180->127.0.0.1:5081: wsarecv: An existing connection was forcibly closed by the remote host.
INFO: 2023/05/23 14:43:47 [transport] transport: loopyWriter.run returning. connection error: desc = "transport is closing"
INFO: 2023/05/23 14:43:47 [transport] transport: error closing conn during Close: tls: failed to send closeNotify alert (but connection was closed anyway): write tcp 127.0.0.1:8180->127.0.0.1:5081: wsasend: An existing connec
tion was forcibly closed by the remote host.
客户端:
API server listening at: 127.0.0.1:5057
INFO: 2023/05/23 14:43:47 [core] parsed scheme: ""
INFO: 2023/05/23 14:43:47 [core] scheme "" not registered, fallback to default scheme
INFO: 2023/05/23 14:43:47 [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:8180 <nil> 0 <nil>}] <nil> <nil>}
INFO: 2023/05/23 14:43:47 [core] ClientConn switching balancer to "pick_first"
INFO: 2023/05/23 14:43:47 [core] Channel switches to new LB policy "pick_first"
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to CONNECTING
INFO: 2023/05/23 14:43:47 [core] Subchannel picks a new address "127.0.0.1:8180" to connect
INFO: 2023/05/23 14:43:47 [core] pickfirstBalancer: UpdateSubConnState: 0xc000065b70, {CONNECTING <nil>}
INFO: 2023/05/23 14:43:47 [core] blockingPicker: the picked transport is not ready, loop back to repick
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to CONNECTING
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to READY
INFO: 2023/05/23 14:43:47 [core] pickfirstBalancer: UpdateSubConnState: 0xc000065b70, {READY <nil>}
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to READY
INFO: 2023/05/23 14:43:47 method=/pb.UserInfoService/GetUserInfo req=name:"zs" rep=id:1 name:"zs" age:22 hobby:"Sing" hobby:"Run" duration=8.1523ms error=<nil>
响应结果: id:1 name:"zs" age:22 hobby:"Sing" hobby:"Run"
INFO: 2023/05/23 14:43:47 method=/pb.HelloService/SayHello req=name:"grpc" rep=result:"hello grpc" duration=1.0434ms error=<nil>
响应结果: result:"hello grpc"
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to SHUTDOWN
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to SHUTDOWN
项目推荐: go-grpc-middleware
这个项目对interceptor进行了封装,支持多个拦截器的链式组装,对于需要多种处理的地方使用起来会更方便些。
作者:joker.liu 创建时间:2023-05-23 14:27
最后编辑:joker.liu 更新时间:2023-05-23 15:14
最后编辑:joker.liu 更新时间:2023-05-23 15:14