官方库:https://pkg.go.dev/github.com/apache/rocketmq-client-go/v2

1、下载和安装

go get github.com/apache/rocketmq-client-go/v2

2、连接RocketMQ发送消息

消息生产者示例:

package main

import (
    "bufio"
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
    "os"
    "strings"
)

func main(){
    // 1. 创建主题,这一步可以省略,在send的时候如果没有topic,也会进行创建。
    //CreateTopic("testTopic01")

    p,err := rocketmq.NewProducer(
        producer.WithNameServer([]string{"192.168.23.132:9876"}),
        //producer.WithRetry(2),
        //producer.WithGroupName("ProducerGroupName"),  // 生产者分组(5.x已废弃)
    )
    if err != nil {
        fmt.Println("创建生产者失败")
        return
    }
    // 启动
    err = p.Start()
    if err != nil {
        fmt.Printf("启动 生产者 出错, error: %s", err.Error())
        os.Exit(1)
    }

    // 获取控制台输入
    inputReader := bufio.NewReader(os.Stdin)
    for {
        input, _ := inputReader.ReadString('\n')

        // 读取用户输入
        inputInfo := strings.Trim(input, "\r\n")
        if strings.ToUpper(inputInfo) == "exit" {
            // 如果输入exit就退出
            break
        }

        // 发送消息
        result, sErr := p.SendSync(context.Background(), &primitive.Message{
            Topic: "testTopic01",
            Body:  []byte(inputInfo),
        })

        if sErr != nil {
            fmt.Printf("发送消息失败,error: %s\n", sErr.Error())
        } else {
            fmt.Printf("发送消息成功,result=%s\n", result.String())
        }
    }

    // 关闭
    err = p.Shutdown()
    if err != nil {
        fmt.Printf("关闭 生产者 出错, error: %s", err.Error())
        os.Exit(1)
    }
}


func CreateTopic(topicName string) {
    endPoint := []string{"192.168.23.132:9876"}
    // 创建主题
    testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(endPoint)))
    if err != nil {
        fmt.Printf("connection error: %s\n", err.Error())
    }
    err = testAdmin.CreateTopic(context.Background(), admin.WithTopicCreate(topicName))
    if err != nil {
        fmt.Printf("createTopic error: %s\n", err.Error())
    }
}

3、连接RocketMQ消费消息

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
    "os/signal"
    "syscall"
)

func main(){
    // 3.消费者订阅主题并消费
    c, err := rocketmq.NewPushConsumer(
        consumer.WithNameServer([]string{"192.168.23.132:9876"}),
        consumer.WithConsumerModel(consumer.BroadCasting),
        consumer.WithGroupName("ConsumerGroupName"),
    )
    if err != nil {
        fmt.Printf("创建消费者失败,error: %s\n", err.Error())
        return
    }

    // 订阅topic
    err = c.Subscribe(
        "testTopic01",
        consumer.MessageSelector{},
        func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
            for i := range msgs {
                fmt.Printf("subscribe callback : %v \n", msgs[i])
            }
            return consumer.ConsumeSuccess, nil
        },
    )

    if err != nil {
        fmt.Printf("订阅消息失败,error: %s\n", err.Error())
    }

    // 启动consumer
    err = c.Start()
    if err != nil {
        fmt.Printf("消费者启动失败,error: %s\n", err.Error())
        os.Exit(-1)
    }

    //创建一个信号监听通道
    quit := make(chan os.Signal, 1)
    //监听 syscall.SIGINT 跟 syscall.SIGTERM信号
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    si :=<-quit
    fmt.Println("Shutting down server...",si)

    err = c.Shutdown()
    if err != nil {
        fmt.Printf("shutdown Consumer error: %s\n", err.Error())
    }
}
作者:admin  创建时间:2023-02-24 17:59
最后编辑:admin  更新时间:2023-03-01 08:44