官方库:https://pkg.go.dev/github.com/apache/rocketmq-client-go/v2
1、下载和安装
go get github.com/apache/rocketmq-client-go/v2
2、连接RocketMQ发送消息
消息生产者示例:
package main
import (
"bufio"
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strings"
)
func main(){
// 1. 创建主题,这一步可以省略,在send的时候如果没有topic,也会进行创建。
//CreateTopic("testTopic01")
p,err := rocketmq.NewProducer(
producer.WithNameServer([]string{"192.168.23.132:9876"}),
//producer.WithRetry(2),
//producer.WithGroupName("ProducerGroupName"), // 生产者分组(5.x已废弃)
)
if err != nil {
fmt.Println("创建生产者失败")
return
}
// 启动
err = p.Start()
if err != nil {
fmt.Printf("启动 生产者 出错, error: %s", err.Error())
os.Exit(1)
}
// 获取控制台输入
inputReader := bufio.NewReader(os.Stdin)
for {
input, _ := inputReader.ReadString('\n')
// 读取用户输入
inputInfo := strings.Trim(input, "\r\n")
if strings.ToUpper(inputInfo) == "exit" {
// 如果输入exit就退出
break
}
// 发送消息
result, sErr := p.SendSync(context.Background(), &primitive.Message{
Topic: "testTopic01",
Body: []byte(inputInfo),
})
if sErr != nil {
fmt.Printf("发送消息失败,error: %s\n", sErr.Error())
} else {
fmt.Printf("发送消息成功,result=%s\n", result.String())
}
}
// 关闭
err = p.Shutdown()
if err != nil {
fmt.Printf("关闭 生产者 出错, error: %s", err.Error())
os.Exit(1)
}
}
func CreateTopic(topicName string) {
endPoint := []string{"192.168.23.132:9876"}
// 创建主题
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(endPoint)))
if err != nil {
fmt.Printf("connection error: %s\n", err.Error())
}
err = testAdmin.CreateTopic(context.Background(), admin.WithTopicCreate(topicName))
if err != nil {
fmt.Printf("createTopic error: %s\n", err.Error())
}
}
3、连接RocketMQ消费消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
"os/signal"
"syscall"
)
func main(){
// 3.消费者订阅主题并消费
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"192.168.23.132:9876"}),
consumer.WithConsumerModel(consumer.BroadCasting),
consumer.WithGroupName("ConsumerGroupName"),
)
if err != nil {
fmt.Printf("创建消费者失败,error: %s\n", err.Error())
return
}
// 订阅topic
err = c.Subscribe(
"testTopic01",
consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback : %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
},
)
if err != nil {
fmt.Printf("订阅消息失败,error: %s\n", err.Error())
}
// 启动consumer
err = c.Start()
if err != nil {
fmt.Printf("消费者启动失败,error: %s\n", err.Error())
os.Exit(-1)
}
//创建一个信号监听通道
quit := make(chan os.Signal, 1)
//监听 syscall.SIGINT 跟 syscall.SIGTERM信号
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
si :=<-quit
fmt.Println("Shutting down server...",si)
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s\n", err.Error())
}
}
作者:admin 创建时间:2023-02-24 17:59
最后编辑:admin 更新时间:2023-03-01 08:44
最后编辑:admin 更新时间:2023-03-01 08:44