大佬教程收集整理的这篇文章主要介绍了.net core Redis消息队列中间件【InitQ】,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
这是一篇拖更很久的博客,不知不觉InitQ在Nuget下载量已经过15K了,奈何胸无点墨也不晓得怎么写(懒),随便在github上挂了个md,现在好好唠唠如何在redis里使用队列
redis在项目中使用的越来越频繁,通常我们是用来做缓存,使用较多的就是String,Hash这两种类型,以及分布式锁,redis的List类型,就可以用于消息队列,使用起来更加简单,且速度更快,非常适合子服务内部之间的消息流转,创造灵感来自于杨老板的CAP(地址:https://www.cnblogs.com/tibos/p/11858095.html),采用注解的方式消费队列,让业务逻辑更加的清晰,方便维护
1.通过注解的方式,订阅队列
2.可以设置消费消息的频次
3.支持消息广播
4.支持延迟队列
1.获取initQ包 方案A. install-package InitQ 方案B. nuget包管理工具搜索 InitQ
2.添加中间件(该中间件依赖 StackExchange.redis)
services.AddInitQ(m=>
{
m.SuspendTime = 1000;
m.IntervalTime = 1000;
m.ConnectionString = "127.0.0.1,connectTimeout=15000,synctimeout=5000,password=123456";
m.ListSubscribe = new List<Type>() { typeof(redisSubscribeA), typeof(redisSubscribeB) };
m.ShowLog = false;
});
3.配置说明
public class InitQOptions
{
/// <sumMary>
/// redis连接字符串
/// </sumMary>
public String ConnectionString { get; set; }
/// <sumMary>
/// 没消息时挂起时长(毫秒)
/// </sumMary>
public int SuspendTime { get; set; }
/// <sumMary>
/// 每次消费消息间隔时间(毫秒)
/// </sumMary>
public int IntervalTime { get; set; }
/// <sumMary>
/// 是否显示日志
/// </sumMary>
public bool ShowLog { get; set; }
/// <sumMary>
/// 需要注入的类型
/// </sumMary>
public IList<Type> ListSubscribe { get; set; }
public InitQOptions()
{
ConnectionString = "";
IntervalTime = 0;
SuspendTime = 1000;
ShowLog = false;
}
}
消息的发布/订阅是最基础的功能,这里做了几个优化
示例如下(Thread.Sleep):
public class redisSubscribeA: IredisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubredisTest(String msg)
{
Console.WriteLine($"A类--->当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
Thread.Sleep(3000); //使用堵塞线程模式,同步延时
Console.WriteLine($"A类<---当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
}
}
public class redisSubscribeA: IredisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubredisTest(String msg)
{
Console.WriteLine($"A类--->当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
Thread.Sleep(3000); //使用堵塞线程模式,同步延时
Console.WriteLine($"A类<---当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
}
[Subscribe("tibos_test_1")]
private async Task SubredisTest2(String msg)
{
Console.WriteLine($"A类--->当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
Thread.Sleep(3000); //使用堵塞线程模式,同步延时
Console.WriteLine($"A类<---当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
}
}
示例如下(Task.Delay):
[Subscribe("tibos_test_1")]
private async Task SubredisTest(String msg)
{
Console.WriteLine($"A类--->当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
await Task.Delay(3000); //使用非堵塞线程模式,异步延时
Console.WriteLine($"A类<---当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
}
根据业务情况,合理的选择堵塞模式
using (var scope = _provider.Getrequiredservice<IserviceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.serviceProvider.Getservice<ICacheservice>();
//循环向 tibos_test_1 队列发送消息
for (int i = 0; i < 1000; i++)
{
await _redis.ListrightPushAsync("tibos_test_1", $"我是消息{i + 1}号");
}
}
public class redisSubscribeA: IredisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubredisTest(String msg)
{
Console.WriteLine($"A类--->订阅者A消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubredisTest1(String msg)
{
Console.WriteLine($"A类--->订阅者A1消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubredisTest2(String msg)
{
Console.WriteLine($"A类--->订阅者A2消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubredisTest3(String msg)
{
Console.WriteLine($"A类--->订阅者A3消息消息:{msg}");
}
}
public class redisSubscribeB : IredisSubscribe
{
/// <sumMary>
/// 测试
/// </sumMary>
/// <param name="msg"></param>
/// <returns></returns>
[Subscribe("tibos_test_1")]
private async Task SubredisTest(String msg)
{
Console.WriteLine($"B类--->订阅者B消费消息:{msg}");
}
}
消息广播是StackExchange.redis已经封装好的,我们只用起个线程监听即可,只要监听了这个key的线程,都会收到消息
public class ChAnnelSubscribeA : IHostedservice, IDisposable
{
private readonly IserviceProvider _provider;
private readonly ILogger _logger;
public ChAnnelSubscribeA(ILogger<TestMain> logger, IserviceProvider provider)
{
_logger = logger;
_provider = provider;
}
public void Dispose()
{
_logger.LogInformation("退出");
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(async () =>
{
using (var scope = _provider.Getrequiredservice<IserviceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.serviceProvider.Getservice<ICacheservice>();
await _redis.SubscribeAsync("test_chAnnel", new Action<redisChAnnel, redisValue>((chAnnel, messagE) =>
{
Console.WriteLine("test_chAnnel" + " 订阅服务A收到消息:" + messagE);
}));
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
}
public class ChAnnelSubscribeB : IHostedservice, IDisposable
{
private readonly IserviceProvider _provider;
private readonly ILogger _logger;
public ChAnnelSubscribeB(ILogger<TestMain> logger, IserviceProvider provider)
{
_logger = logger;
_provider = provider;
}
public void Dispose()
{
_logger.LogInformation("退出");
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(async () =>
{
using (var scope = _provider.Getrequiredservice<IserviceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.serviceProvider.Getservice<ICacheservice>();
await _redis.SubscribeAsync("test_chAnnel", new Action<redisChAnnel, redisValue>((chAnnel, messagE) =>
{
Console.WriteLine("test_chAnnel" + " 订阅服务B收到消息:" + messagE);
}));
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
}
services.AddHostedservice<ChAnnelSubscribeA>();
services.AddHostedservice<ChAnnelSubscribeB>();
using (var scope = _provider.Getrequiredservice<IserviceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.serviceProvider.Getservice<ICacheservice>();
for (int i = 0; i < 1000; i++)
{
await _redis.PublishAsync("test_chAnnel", $"往通道发送第{i}条消息");
}
}
延迟消息非常适用处理一些定时任务的场景,如订单15分钟未付款,自动取消, xxx天后,自动续费...... 这里使用zset+redis锁来实现,这里的操作方式,跟发布/订阅非常类似 写入延迟消息:SortedSetAddAsync 注解使用:SubscribeDelay
1.定义发布者
Task.Run(async () =>
{
using (var scope = _provider.Getrequiredservice<IserviceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.serviceProvider.Getservice<ICacheservice>();
for (int i = 0; i < 100; i++)
{
var dt = datetiR_581_11845@e.Now.AddSeconds(3 * (i + 1));
//key:redis里的key,唯一
//msg:任务
//time:延时执行的时间
await _redis.SortedSetAddAsync("test_0625", $"延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt);
}
}
});
2.定义消费者
//延迟队列
[SubscribeDelay("test_0625")]
private async Task SubredisTest1(String msg)
{
Console.WriteLine($"A类--->当前时间:{datetiR_581_11845@e.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}");
//模拟任务执行耗时
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine($"A类--->{msg} 结束<---");
}
版本
版本库:
作者:提伯斯
以上是大佬教程为你收集整理的.net core Redis消息队列中间件【InitQ】全部内容,希望文章能够帮你解决.net core Redis消息队列中间件【InitQ】所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。