go中操作kafka主流使用的有:

1、github.com/Shopify/sarama

go get github.com/Shopify/sarama

2、生产者示例:

package main

import (
    "bufio"
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "strings"
)

func main(){
    // 生产者测试
    config := sarama.NewConfig()

    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"172.16.0.252:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()

    // 获取控制台输入
    inputReader := bufio.NewReader(os.Stdin)
    for {
        input, _ := inputReader.ReadString('\n')

        // 读取用户输入
        inputInfo := strings.Trim(input, "\r\n")
        if strings.ToUpper(inputInfo) == "exit" {
            // 如果输入exit就退出
            break
        }
        // 构造一个消息
        msg := &sarama.ProducerMessage{}
        msg.Topic = "my_topic"
        msg.Value = sarama.StringEncoder(inputInfo)

        // 发送消息
        pid, offset, err := client.SendMessage(msg)
        if err != nil {
            fmt.Println("send msg failed, err:", err)
            continue
        }
        fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
}

3、消费者示例:

package main

import (
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
)

func main(){
    // 消费者测试
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    client,err := sarama.NewClient([]string{"172.16.0.252:9092"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    consumer, err := sarama.NewConsumerFromClient(client)

    defer consumer.Close()
    if err != nil {
        panic(err)
    }

    // get partitionId list
    partitions,err := consumer.Partitions("my_topic")
    if err != nil {
        panic(err)
    }

    for _, partitionId := range partitions{
        // create partitionConsumer for every partitionId
        partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        go func(pc *sarama.PartitionConsumer) {
            defer (*pc).Close()
            // block
            for message := range (*pc).Messages(){
                value := string(message.Value)
                log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition,message.Offset, value)
            }

        }(&partitionConsumer)
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
        case <-signals:
    }
}
作者:admin  创建时间:2023-02-23 17:50
最后编辑:admin  更新时间:2023-03-05 18:21