大佬教程收集整理的这篇文章主要介绍了无法启动多个 Kafka 消费者,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个控制台应用程序,它有多个 BACkgroundservices,每个应用程序都使用 Confluent.Kafka nuget 包 (v1.6.2) 从同一个 Kafka 主题中读取。该主题有 3 个分区。
当应用程序启动时,所有后台服务都会调用它们的构造函数,但是只有一个 ExecuteAsync 方法被调用。如果我添加一个 Task.Delay() - 毫秒数似乎并不重要 - 在每个 ExecuteAsync 开始时,一切正常,所有后台服务都运行。
据我所知,没有引发异常。
有没有人知道可能会发生什么,或者在哪里可以进一步查看?
代码如下:
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.HosTing;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
class Program
{
static voID Main(String[] args)
{
CreateHostBuilder(args).build().Run();
}
private static IHostBuilder CreateHostBuilder(String[] args) =>
Host.CreateDefaultBuilder(args)
.Configureservices((hostContext,services) =>
{
services.AddHostedservice<ConsumerA>();
services.AddHostedservice<ConsumerB>();
services.AddHostedservice<ConsumerC>();
});
}
public class ConsumerA : BACkgroundservice
{
private Readonly ILogger<ConsumerA> _logger;
private Readonly IConsumer<Ignore,String> _consumer;
public ConsumerA(ILogger<ConsumerA> logger)
{
_logger = logger;
var config = new ConsumerConfig()
{
bootstrapServers = @"server:port",GroupID = "Group1",autoOffsetreset = autoOffsetreset.EarlIEst
};
_consumer = new ConsumerBuilder<Ignore,String>(config).build();
_logger.Loginformation("ConsumerA constructor");
}
protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
{
// await Task.Delay(10);
_logger.Loginformation("ConsumerA starTing");
_consumer.Subscribe(new List<String> { "topic" });
while (!cancellationToken.IsCancellationrequested)
{
_ = _consumer.Consume(cancellationToken);
}
}
}
public class ConsumerB : BACkgroundservice
{
private Readonly ILogger<ConsumerB> _logger;
private Readonly IConsumer<Ignore,String> _consumer;
public ConsumerB(ILogger<ConsumerB> logger)
{
_logger = logger;
var config = new ConsumerConfig()
{
bootstrapServers = @"server:port",String>(config).build();
_logger.Loginformation("ConsumerB constructor");
}
protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
{
// await Task.Delay(10);
_logger.Loginformation("ConsumerB starTing");
_consumer.Subscribe(new List<String> { "topic" });
while (!cancellationToken.IsCancellationrequested)
{
_ = _consumer.Consume(cancellationToken);
}
}
}
public class ConsumerC : BACkgroundservice
{
private Readonly ILogger<ConsumerC> _logger;
private Readonly IConsumer<Ignore,String> _consumer;
public ConsumerC(ILogger<ConsumerC> logger)
{
_logger = logger;
var config = new ConsumerConfig()
{
bootstrapServers = @"server:port",String>(config).build();
_logger.Loginformation("ConsumerC constructor");
}
protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
{
// await Task.Delay(10);
_logger.Loginformation("ConsumerC starTing");
_consumer.Subscribe(new List<String> { "topic" });
while (!cancellationToken.IsCancellationrequested)
{
_ = _consumer.Consume(cancellationToken);
}
}
}
}
和输出:
(没有延迟):
info: KafkaConsumer.ConsumerA[0]
ConsumerA constructor
info: KafkaConsumer.ConsumerB[0]
ConsumerB constructor
info: KafkaConsumer.ConsumerC[0]
ConsumerC constructor
info: KafkaConsumer.ConsumerA[0]
ConsumerA starTing
(添加延迟):
info: KafkaConsumer.ConsumerA[0]
ConsumerA constructor
info: KafkaConsumer.ConsumerB[0]
ConsumerB constructor
info: KafkaConsumer.ConsumerC[0]
ConsumerC constructor
info: Microsoft.HosTing.lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.HosTing.lifetime[0]
HosTing environment: Production
info: Microsoft.HosTing.lifetime[0]
Content root path: c:\users\..\kafkaconsumer\bin\DeBUG\net5.0
info: KafkaConsumer.ConsumerC[0]
ConsumerC starTing
info: KafkaConsumer.ConsumerA[0]
ConsumerA starTing
info: KafkaConsumer.ConsumerB[0]
ConsumerB starTing
这是因为您的 execute 方法是异步的,但您没有在其中使用 await 来通知 SynchronizationContext。 像这样编写你的 executeAsync 方法:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
// await Task.Delay(10);
_logger.LogInformation("ConsumerC starTing");
await Task.Run(() => _consumer.Subscribe(new List<String> { "topic" }));
while (!cancellationToken.IsCancellationrequested)
{
_await Task.Run(() => consumer.Consume(cancellationToken));
}
}
,
当启动 BACkgroundservices 时,框架显然在做这样的事情:
var starTing1 = service1.ExecuteAsync(@R_801_8352@ //all called in sequence without awaits inbetween
var starTing2 = service2.ExecuteAsync(@R_801_8352@
var starTing3 = service3.ExecuteAsync(@R_801_8352@
...
//will await the starTings all at once later on
当然,当它在您的一项服务中执行此操作时,它会立即陷入同步循环中,在该循环中它会阻塞地轮询 Kafka 消费者。执行线程永远不会让给框架继续调用其他服务。
您可以通过在单独的线程上执行同步循环来解决这个问题,让框架愉快地开展业务:
protected Task ExecuteAsync(...)
{
return Task.Run(() => { //runs the below on a separate thread from the threadpool
_logger.LogInformation("ConsumerC starTing");
_consumer.Subscribe(new List<String> { "topic" });
while (!cancellationToken.IsCancellationrequested)
{
_ = _consumer.Consume(cancellationToken);
}
});
}
在处理异步 API 时,人们普遍期望您不要坐在给定的线程上,因为这样做可能会导致您上面期望线程返回的事情出现问题。当您等待事情时,执行点“停留”在您的代码中,但实际上线程会返回给调用者,而延续排队等待在正确的时间(显然,大多数情况下)继续执行您的工作。
不幸的是,据我所知,Kafka 库没有用于处理此问题的 API,因此它们需要自己的完整线程。
以上是大佬教程为你收集整理的无法启动多个 Kafka 消费者全部内容,希望文章能够帮你解决无法启动多个 Kafka 消费者所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。