大佬教程收集整理的这篇文章主要介绍了kafka-python:我无法运行示例生产者代码,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我在 Google Cloud 虚拟机上创建了 Kafka 集群 首先,我使用 cli 命令测试了我的经纪人以生成消息: 制作人:
$ kafka-console-producer.sh --broker-List localhost:9092 --producer.config /opt/bitnami/kafka/conf/producer.propertIEs --topic lus_topic
>abc
消费者成功接收:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lus_topic --consumer.config /opt/bitnami/kafka/conf/consumer.propertIEs --from-beginning
abc
然后我尝试使用 kafka-python 生产者和 cli 消费者来检索主题
Python 3.7.3 (default,Jan 22 2021,20:04:44)
[GCC 8.3.0] on linux
Type "Help","copyright","credits" or "license" for more information.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('lus_topic',b'Hello,World!').get(timeout=30)
TraceBACk (most recent call last):
file "<stdin>",line 1,in <module>
file "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py",line 576,in send
self._wait_on_Metadata(topic,self.config['max_block_ms'] / 1000.0)
file "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py",line 703,in _wait_on_Metadata
"Failed to update Metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update Metadata after 60.0 secs.
你能帮我解释为什么我会收到这个超时错误。如何调试这个问题。
非常感谢
我通过提供 sasl 用户名/密码解决了这个问题:
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092',security_protocol='SASL_PLAIntexT',sasl_mechanism='PLAIN',sasl_plain_username='user',sasl_plain_password='GGGGGG')
>>>producer.bootstrap_connected()
True
>>> producer.send('lus_topic',b'Hello,World!')
<kafka.producer.future.FutureRecordMetadata object at 0x7fe3eb8ebbe0>
以上是大佬教程为你收集整理的kafka-python:我无法运行示例生产者代码全部内容,希望文章能够帮你解决kafka-python:我无法运行示例生产者代码所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。