go中操作kafka主流使用的有:
1、github.com/Shopify/sarama
go get github.com/Shopify/sarama
2、生产者示例:
package main
import (
"bufio"
"fmt"
"github.com/Shopify/sarama"
"os"
"strings"
)
func main(){
// 生产者测试
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"172.16.0.252:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 获取控制台输入
inputReader := bufio.NewReader(os.Stdin)
for {
input, _ := inputReader.ReadString('\n')
// 读取用户输入
inputInfo := strings.Trim(input, "\r\n")
if strings.ToUpper(inputInfo) == "exit" {
// 如果输入exit就退出
break
}
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "my_topic"
msg.Value = sarama.StringEncoder(inputInfo)
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
continue
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
}
3、消费者示例:
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
)
func main(){
// 消费者测试
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
client,err := sarama.NewClient([]string{"172.16.0.252:9092"}, config)
defer client.Close()
if err != nil {
panic(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
defer consumer.Close()
if err != nil {
panic(err)
}
// get partitionId list
partitions,err := consumer.Partitions("my_topic")
if err != nil {
panic(err)
}
for _, partitionId := range partitions{
// create partitionConsumer for every partitionId
partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
if err != nil {
panic(err)
}
go func(pc *sarama.PartitionConsumer) {
defer (*pc).Close()
// block
for message := range (*pc).Messages(){
value := string(message.Value)
log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition,message.Offset, value)
}
}(&partitionConsumer)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
作者:admin 创建时间:2023-02-23 17:50
最后编辑:admin 更新时间:2023-03-05 18:21
最后编辑:admin 更新时间:2023-03-05 18:21