程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION?

开发过程中遇到带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION的问题如何解决?下面主要结合日常开发的经验,给出你关于带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION的解决方法建议,希望对你解决带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION有所启发或帮助;

我是 Apache 水槽 https://flume.apache.org/ 的新手。对于用例之一,我需要将数据从一个集群上的 Kafka 主题(引导程序:bootstrap1,主题:topic1)移动到不同集群中具有不同名称的主题(引导程序:bootstrap2,主题:topic2)。同一个项目中还有另一个最适合水槽的用例,我需要为这个用例使用相同的水槽管道,尽管还有其他选项可以从 Kafka 复制到 Kafka。

我尝试了以下配置,结果如每个选项中所述。

#1: telnet 到 kafka sink (bootstrap2,topic2) --> 完美。 配置:

a1.sources = r1
a1.sinks = k1
a1.chAnnels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a chAnnel which buffers events in memory
a1.chAnnels.c1.type = memory
a1.chAnnels.c1.capacity = 1000
a1.chAnnels.c1.transactionCapacity = 100

# Bind the source and sink to the chAnnel
a1.sources.r1.chAnnels = c1
a1.sinks.k1.chAnnel = c1

#2:kafka 作为源(bootstrap1,topic1)和记录器作为接收器 --> 完美。

a1.sources = r1
a1.sinks = k1
a1.chAnnels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.Kafkasource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.ID = flume-gis-consumer
a1.sources.r1.BACkoffSleepIncrement = 1000

# Describe the sink
a1.sinks.k1.type = logger

# Use a chAnnel which buffers events in memory
a1.chAnnels.c1.type = memory
a1.chAnnels.c1.capacity = 1000
a1.chAnnels.c1.transactionCapacity = 100

# Bind the source and sink to the chAnnel
a1.sources.r1.chAnnels = c1
a1.sinks.k1.chAnnel = c1

#3: kafka 作为源 (bootstrap1,topic1) 和 kafka 作为 sink(bootstrap2,topic2) --> 给出如下配置中提到的错误。

a1.sources = r1
a1.sinks = k1
a1.chAnnels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.Kafkasource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.ID = flume-gis-consumer1
a1.sources.r1.BACkoffSleepIncrement = 1000


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1


# Use a chAnnel which buffers events in memory
a1.chAnnels.c1.type = memory
a1.chAnnels.c1.capacity = 100
a1.chAnnels.c1.transactionCapacity = 100


# Bind the source and sink to the chAnnel
a1.sources.r1.chAnnels = c1
a1.sinks.k1.chAnnel = c1

错误:

(kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clIEnts.NetworkClIEnt$DefaultMetadataupdater.handleCompletedMetadataResponse(NetworkClIEnt.java:968)] [Producer clIEntID=producer-1] Error while fetching Metadata with correlation ID 85 : {topic1=UNKNowN_topIC_OR_PARTITION}

持续显示以上错误。

终止flume-ng命令时出错

(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Could not send event
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:234)
    ... 3 more

向 stackoverflow 社区寻求帮助:

  1. 这里的配置出了什么问题。 Kafka 主题存在于各自的集群中。 (选项 1 和选项 2 工作正常,我可以看到消息从源流向接收器)
  2. 为什么生产者线程试图在源 kafka 主题中生产?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

大佬总结

以上是大佬教程为你收集整理的带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION全部内容,希望文章能够帮你解决带有 kafka 源、kafka 接收器和内存通道的 Apache Flume - 抛出 UNKNOWN_TOPIC_OR_PARTITION所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。