程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了无法启动多个 Kafka 消费者大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决无法启动多个 Kafka 消费者?

开发过程中遇到无法启动多个 Kafka 消费者的问题如何解决?下面主要结合日常开发的经验,给出你关于无法启动多个 Kafka 消费者的解决方法建议,希望对你解决无法启动多个 Kafka 消费者有所启发或帮助;

我有一个控制台应用程序,它有多个 BACkgroundservices,每个应用程序都使用 Confluent.Kafka nuget 包 (v1.6.2) 从同一个 Kafka 主题中读取。该主题有 3 个分区。

当应用程序启动时,所有后台服务都会调用它们的构造函数,但是只有一个 ExecuteAsync 方法被调用。如果我添加一个 Task.Delay() - 毫秒数似乎并不重要 - 在每个 ExecuteAsync 开始时,一切正常,所有后台服务都运行。

据我所知,没有引发异常。

有没有人知道可能会发生什么,或者在哪里可以进一步查看?

代码如下:

using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.HosTing;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaConsumer
{
    class Program
    {
        static voID Main(String[] args)
        {
            CreateHostBuilder(args).build().Run();
        }

        private static IHostBuilder CreateHostBuilder(String[] args) =>
            Host.CreateDefaultBuilder(args)
                .Configureservices((hostContext,services) =>
                {
                    services.AddHostedservice<ConsumerA>();
                    services.AddHostedservice<ConsumerB>();
                    services.AddHostedservice<ConsumerC>();
                });
    }

    public class ConsumerA : BACkgroundservice
    {
        private Readonly ILogger<ConsumerA> _logger;
        private Readonly IConsumer<Ignore,String> _consumer;

        public ConsumerA(ILogger<ConsumerA> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig()
            {
                bootstrapServers = @"server:port",GroupID = "Group1",autoOffsetreset = autoOffsetreset.EarlIEst
            };

            _consumer = new ConsumerBuilder<Ignore,String>(config).build();
            _logger.Loginformation("ConsumerA constructor");
        }

        protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            // await Task.Delay(10);

            _logger.Loginformation("ConsumerA starTing");
            _consumer.Subscribe(new List<String> { "topic" });

            while (!cancellationToken.IsCancellationrequested)
            {
                _ = _consumer.Consume(cancellationToken);
            }
        }
    }

    public class ConsumerB : BACkgroundservice
    {
        private Readonly ILogger<ConsumerB> _logger;
        private Readonly IConsumer<Ignore,String> _consumer;

        public ConsumerB(ILogger<ConsumerB> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig()
            {
                bootstrapServers = @"server:port",String>(config).build();
            _logger.Loginformation("ConsumerB constructor");
        }

        protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            // await Task.Delay(10);

            _logger.Loginformation("ConsumerB starTing");
            _consumer.Subscribe(new List<String> { "topic" });

            while (!cancellationToken.IsCancellationrequested)
            {
                _ = _consumer.Consume(cancellationToken);
            }
        }
    }

    public class ConsumerC : BACkgroundservice
    {
        private Readonly ILogger<ConsumerC> _logger;
        private Readonly IConsumer<Ignore,String> _consumer;

        public ConsumerC(ILogger<ConsumerC> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig()
            {
                bootstrapServers = @"server:port",String>(config).build();
            _logger.Loginformation("ConsumerC constructor");
        }

        protected overrIDe async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            // await Task.Delay(10);
            
            _logger.Loginformation("ConsumerC starTing");
            _consumer.Subscribe(new List<String> { "topic" });

            while (!cancellationToken.IsCancellationrequested)
            {
                _ = _consumer.Consume(cancellationToken);
            }
        }
    }
}

和输出:

(没有延迟):

info: KafkaConsumer.ConsumerA[0]
      ConsumerA constructor
info: KafkaConsumer.ConsumerB[0]
      ConsumerB constructor
info: KafkaConsumer.ConsumerC[0]
      ConsumerC constructor
info: KafkaConsumer.ConsumerA[0]
      ConsumerA starTing

(添加延迟):

info: KafkaConsumer.ConsumerA[0]
      ConsumerA constructor
info: KafkaConsumer.ConsumerB[0]
      ConsumerB constructor
info: KafkaConsumer.ConsumerC[0]
      ConsumerC constructor
info: Microsoft.HosTing.lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.HosTing.lifetime[0]
      HosTing environment: Production
info: Microsoft.HosTing.lifetime[0]
      Content root path: c:\users\..\kafkaconsumer\bin\DeBUG\net5.0
info: KafkaConsumer.ConsumerC[0]
      ConsumerC starTing
info: KafkaConsumer.ConsumerA[0]
      ConsumerA starTing
info: KafkaConsumer.ConsumerB[0]
      ConsumerB starTing

解决方法

是因为您的 execute 方法是异步的,但您没有在其中使用 await 来通知 SynchronizationContext。 像这样编写你的 executeAsync 方法:

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        // await Task.Delay(10);
        
        _logger.LogInformation("ConsumerC starTing");
        await Task.Run(() => _consumer.Subscribe(new List<String> { "topic" }));
        
        while (!cancellationToken.IsCancellationrequested)
        {
             _await Task.Run(() => consumer.Consume(cancellationToken));
        }
    }
,

当启动 BACkgroundservices 时,框架显然在做这样的事情:

var starTing1 = service1.ExecuteAsync(@R_801_8352@ //all called in sequence without awaits inbetween
var starTing2 = service2.ExecuteAsync(@R_801_8352@
var starTing3 = service3.ExecuteAsync(@R_801_8352@ 
...
//will await the starTings all at once later on

当然,当它在您的一项服务中执行此操作时,它会立即陷入同步循环中,在该循环中它会阻塞地轮询 Kafka 消费者。执行线程永远不会让给框架继续调用其他服务。

您可以通过在单独的线程上执行同步循环来解决这个问题,让框架愉快地开展业务:

protected Task ExecuteAsync(...) 
{
    return Task.Run(() => { //runs the below on a separate thread from the threadpool
        _logger.LogInformation("ConsumerC starTing");
        _consumer.Subscribe(new List<String> { "topic" });

        while (!cancellationToken.IsCancellationrequested)
        {
            _ = _consumer.Consume(cancellationToken);
        }
    });
}

在处理异步 API 时,人们普遍期望您不要坐在给定的线程上,因为这样做可能会导致您上面期望线程返回的事情出现问题。当您等待事情时,执行点“停留”在您的代码中,但实际上线程会返回给调用者,而延续排队等待在正确的时间(显然,大多数情况下)继续执行您的工作。

不幸的是,据我所知,Kafka 库没有用于处理此问题的 API,因此它们需要自己的完整线程。

大佬总结

以上是大佬教程为你收集整理的无法启动多个 Kafka 消费者全部内容,希望文章能够帮你解决无法启动多个 Kafka 消费者所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。