程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了kafka-go 库 - 从最新轮询中获取所有记录作为消息片段大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决kafka-go 库 - 从最新轮询中获取所有记录作为消息片段?

开发过程中遇到kafka-go 库 - 从最新轮询中获取所有记录作为消息片段的问题如何解决?下面主要结合日常开发的经验,给出你关于kafka-go 库 - 从最新轮询中获取所有记录作为消息片段的解决方法建议,希望对你解决kafka-go 库 - 从最新轮询中获取所有记录作为消息片段有所启发或帮助;

我正在使用 segmentio/kafka-go 库,特别是 kafka.Reader,但它通过 Fetchmessage 函数一次只能读取 1 条记录。我的设置目前如下所示:

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,DualStack: true,TLS:       buildTlsConfig(certPath,tlsEnabled),}

// make a new kafka reader that is part of the given consumer group
r := kafka.NewReader(kafka.ReaderConfig{
    brokers: []String{BootstrapbrokerAddress},GroupID: group,topic:   topic,Dialer:  dialer,})

// in a loop in a diff part of the code
m,err := kr.reader.Fetchmessage(ctX)

为了提高业务逻辑的效率,我想将每个轮询迭代中的所有消息作为一个批次读取。我希望有这样一个简单的功能:

func (r *Reader) Fetchmessages(ctx context.Context) ([]message,error)

或者至少有一种方法可以检查是否从上次民意调查中提取了更多记录,以便我可以自己收集它们,但是我在 API 中没有看到类似的内容。我知道 Conn 对象上有较低级别的函数,但我想利用读者的消费者组处理。

是否有一种简单的方法可以将每次轮询中的所有消息作为一个切片获取?

解决方法

在您提供的同一链接上,有调用 ReadBatch 的示例

// to consume messages
topic := "my-topic"
partition := 0

conn,err := kafka.DialLeader(context.BACkground(),"tcp","localhost:9092",topic,partition)
if err != nil {
    log.Fatal("failed to dial leader:",err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3,1e6) // fetch 10KB min,1MB max

b := make([]byte,10e3) // 10KB max per message
for {
    _,err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(String(b))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:",err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:",err)
}

大佬总结

以上是大佬教程为你收集整理的kafka-go 库 - 从最新轮询中获取所有记录作为消息片段全部内容,希望文章能够帮你解决kafka-go 库 - 从最新轮询中获取所有记录作为消息片段所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。