1、介绍

grpc服务端和客户端都提供了interceptor功能,功能类似middleware,很适合在这里处理 验证日志 等流程。

在自定义Token认证的示例中,认证信息是由每个服务中的方法处理并认证的,如果有大量的接口方法,这种姿势就太不优雅了,每个接口实现都要先处理认证信息。

这个时候interceptor就可以用来解决了这个问题,在请求被转到具体接口之前处理认证信息,一处认证,到处无忧。

2、代码示例

2.1 服务端代码

package main

import (
    "context"
    "fmt"
    "go_study/framework/grpc/pb" // 导入proto
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
    "net"
)

// 定义 UserInfoService 服务
type UserInfoService struct{
    pb.UnimplementedUserInfoServiceServer
}

// GetUserInfo 实现方法
func (s *UserInfoService) GetUserInfo(ctx context.Context, req *pb.UserRequest) (resp *pb.UserResponse, err error) {
    //err = tokenAuth(ctx)
    //if err != nil {
    //    return nil, err
    //}
    // 通过用户名查询用户信息
    name := req.Name
    // 数据里查用户信息
    if name == "zs" {
        resp = &pb.UserResponse{
            Id:    1,
            Name:  name,
            Age:   22,
            Hobby: []string{"Sing", "Run"},
        }
    }
    return
}

type HelloService struct {
    pb.UnimplementedHelloServiceServer
}

func (h *HelloService) SayHello(ctx context.Context, in *pb.HelloRequest) (resp *pb.HelloResponse, err error) {
    //err = tokenAuth(ctx)
    //if err != nil {
    //    return nil, err
    //}
    resp = new(pb.HelloResponse)
    resp.Result  = fmt.Sprintf("hello %s", in.Name)
    return resp, nil
}

// 使用这种方式验证token 每个服务端每个服务中都验证一次,比较繁琐
func tokenAuth(ctx context.Context) error {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return status.Errorf(codes.Unauthenticated, "无Token认证信息")
    }

    var (
        appid  string
        appkey string
    )

    if val, ok := md["appid"]; ok {
        appid = val[0]
    }

    if val, ok := md["appkey"]; ok {
        appkey = val[0]
    }

    if appid != "101010" || appkey != "i am key" {
        return status.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)
    }

    return nil
}

// interceptor 拦截器
// grpc.UnaryServerInfo 包含有关服务器端一元 RPC 的各种信息。所有 per-rpc 信息都可能被拦截器改变
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    err := tokenAuth(ctx)
    if err != nil {
        return nil, err
    }
    // 继续处理请求
    return handler(ctx, req)
}

func main(){
    // 地址
    addr := "127.0.0.1:8180"
    // 1.监听
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        grpclog.Fatalf("Failed to listen: %v", err)
    }
    fmt.Printf("监听端口:%s\n", addr)

    // tls认证
    creds,err := credentials.NewServerTLSFromFile("./framework/grpc/keys/server.pem", "./framework/grpc/keys/server.key")
    if err != nil {
        grpclog.Fatalf("Failed to generate credentials %v", err)
    }

    var opts []grpc.ServerOption

    // 添加 tls 认证
    opts = append(opts, grpc.Creds(creds))

    // 注册拦截器
    opts = append(opts, grpc.UnaryInterceptor(interceptor))

    // 2.实例化gRPC
    s := grpc.NewServer(opts...)

    // 3.在gRPC上注册微服务
    var u = UserInfoService{}
    pb.RegisterUserInfoServiceServer(s, &u)

    var h = HelloService{}
    pb.RegisterHelloServiceServer(s, &h)

    grpclog.Infoln("Listen on " + addr + " with TLS")

    // 4.启动服务端
    s.Serve(listener)
}

2.2 客户端代码

在客户端,我们增加一个请求日志,记录请求相关的参数和耗时等等。

package main

import (
    "context"
    "fmt"
    "go_study/framework/grpc/pb" // 导入proto
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/grpclog"
    "time"
)


// OpenTLS 是否开启TLS认证
var OpenTLS = true

type customCredential struct {}

func (c *customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
        "appid":  "101010",
        "appkey": "i am key",
    }, nil
}

func (c *customCredential) RequireTransportSecurity() bool {
    return OpenTLS
}

// interceptor 客户端拦截器
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    grpclog.Infof("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)
    return err
}

func main(){
    addr := "127.0.0.1:8180"
    var err error
    var opts []grpc.DialOption

    if OpenTLS {
        // TLS连接  记得把server name改成你写的服务器地址
        creds, err := credentials.NewClientTLSFromFile("./framework/grpc/keys/server.pem", "www.admincms.com")
        if err != nil {
            grpclog.Fatalf("Failed to create TLS credentials %v", err)
        }
        opts = append(opts, grpc.WithTransportCredentials(creds))
    }else{
        opts = append(opts, grpc.WithInsecure())
    }

    // 使用自定义认证
    opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))

    // 使用客户端拦截器
    opts = append(opts, grpc.WithUnaryInterceptor(interceptor))

    // 1.连接
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        grpclog.Fatalln(err)
    }
    defer conn.Close()

    // 2. 实例化gRPC客户端
    client := pb.NewUserInfoServiceClient(conn)

    // 3.组装请求参数
    req := new(pb.UserRequest)
    req.Name = "zs"

    // 4. 调用接口
    response, err := client.GetUserInfo(context.Background(), req)
    if err != nil {
        grpclog.Fatalln(err)
    }
    fmt.Printf("响应结果: %v\n", response)

    // 客户端调用HelloService
    hClient := pb.NewHelloServiceClient(conn)
    res1,err := hClient.SayHello(context.Background(), &pb.HelloRequest{
        Name: "grpc",
    })
    if err != nil {
        grpclog.Fatalln(err)
    }
    fmt.Println("响应结果:", res1)
}

2.3 运行结果

服务端:

API server listening at: 127.0.0.1:8216
监听端口:127.0.0.1:8180
INFO: 2023/05/23 14:39:01 Listen on 127.0.0.1:8180 with TLS
WARNING: 2023/05/23 14:43:47 [transport] transport: http2Server.HandleStreams failed to read frame: read tcp 127.0.0.1:8180->127.0.0.1:5081: wsarecv: An existing connection was forcibly closed by the remote host.
INFO: 2023/05/23 14:43:47 [transport] transport: loopyWriter.run returning. connection error: desc = "transport is closing"
INFO: 2023/05/23 14:43:47 [transport] transport: error closing conn during Close: tls: failed to send closeNotify alert (but connection was closed anyway): write tcp 127.0.0.1:8180->127.0.0.1:5081: wsasend: An existing connec
tion was forcibly closed by the remote host.

客户端:

API server listening at: 127.0.0.1:5057
INFO: 2023/05/23 14:43:47 [core] parsed scheme: ""
INFO: 2023/05/23 14:43:47 [core] scheme "" not registered, fallback to default scheme
INFO: 2023/05/23 14:43:47 [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:8180  <nil> 0 <nil>}] <nil> <nil>}
INFO: 2023/05/23 14:43:47 [core] ClientConn switching balancer to "pick_first"
INFO: 2023/05/23 14:43:47 [core] Channel switches to new LB policy "pick_first"
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to CONNECTING
INFO: 2023/05/23 14:43:47 [core] Subchannel picks a new address "127.0.0.1:8180" to connect
INFO: 2023/05/23 14:43:47 [core] pickfirstBalancer: UpdateSubConnState: 0xc000065b70, {CONNECTING <nil>}
INFO: 2023/05/23 14:43:47 [core] blockingPicker: the picked transport is not ready, loop back to repick
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to CONNECTING
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to READY
INFO: 2023/05/23 14:43:47 [core] pickfirstBalancer: UpdateSubConnState: 0xc000065b70, {READY <nil>}
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to READY
INFO: 2023/05/23 14:43:47 method=/pb.UserInfoService/GetUserInfo req=name:"zs" rep=id:1 name:"zs" age:22 hobby:"Sing" hobby:"Run" duration=8.1523ms error=<nil>
响应结果: id:1 name:"zs" age:22 hobby:"Sing" hobby:"Run"
INFO: 2023/05/23 14:43:47 method=/pb.HelloService/SayHello req=name:"grpc" rep=result:"hello grpc" duration=1.0434ms error=<nil>
响应结果: result:"hello grpc"
INFO: 2023/05/23 14:43:47 [core] Channel Connectivity change to SHUTDOWN
INFO: 2023/05/23 14:43:47 [core] Subchannel Connectivity change to SHUTDOWN

项目推荐: go-grpc-middleware
这个项目对interceptor进行了封装,支持多个拦截器的链式组装,对于需要多种处理的地方使用起来会更方便些。

作者:joker.liu  创建时间:2023-05-23 14:27
最后编辑:joker.liu  更新时间:2023-05-23 15:14