Azure   发布时间:2022-05-15  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

上篇博文中我们介绍了Azure messaging的重复消息机制、At most once 和at least once.

 Azure messaging-serviceBus messaging消息队列技术系列5-重复消息:at-least-once at-most-once

本文中我们主要研究并介绍Azure messaging的消息回执机制:实际应用场景:

同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址。还

有,生成订单编号场景,发送一个生成订单编号的消息,消息消费者接收生成订单编号的消息,并通过消息回执返回。

Azure messaging的消息回执机制主要通过:基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现

在代码实现中,我们需要:

1. 两个工作线程,一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

2. 一个会话标识:ReceiptSession  

3. 两个队列Queue:requestQueue:发送消息、接收消息,ResponseQueue:发送回执消息,接收回执消息。

直接Show Code:

首先,我们在serviceBusMQManager增加一个线程安全的创建带回话的QueueClient方法:

private static object syncObj = new object();
        /// <sumMary>
        /// 获取要求会话带Session的QueueClient
        </sumMary>
        <param name="queuename">队列名称</param>
        <returns>QueueClient</returns>
        public QueueClient GetSessionQueueClient(String queueName)
        {
            var namespaceClient = NamespaceManager.Create();
            if (!namespaceClient.QueueExists(queueName))
            {
                lock (syncObj)
                {
                    namespaceClient.QueueExists(queueName))
                    {
                        var queue = new QueueDescription(queueName) { requiresSession = true };
                        namespaceClient.CreateQueue(queuE);
                    }
                }
            }

            return QueueClient.Create(queuename,ReceiveMode.ReceiveAndDelete);
        }

然后我们定义一些常量:

        readonly String ReplyToSessionId = "ReceiptSession";

        const double ResponsemessageTimeout = 20.0String requestQueuename = requestQueueString responseQueuename = ResponseQueue";

实现发送并接收回执消息的方法:

        ///  发送并接收回执消息
        <param name="bills"></param>
        public void Sendmessage()
        {
            var manager = new serviceBusUtils();
            var responseClient = manager.GetSessionQueueClient(responseQueuename);
            var requestClient = manager.GetSessionQueueClient(requestQueuename);

            var messsageReceiver = responseClient.AcceptmessageSession(ReplyToSessionId);
            var order = CreateSALEsOrder(1);

            //发送消息
            var message =  brokeredmessage(order);
            message.Properties.Add(Type,order.GetType().ToString());
            message.SessionId = ReplyToSessionId;
            message.messagEID = Ordermessage001;
            message.ReplyTo = responseQueuename;
            requestClient.Send(messagE);
            Console.WriteLine(Send message: " + message.messagEID + SALEsOrder ID: " + order.orderID);

            接收消息回执
            var receivedmessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponsemessageTimeout * 2));

            var receivedOrder = receivedmessage.GetBody<SALEsOrder>();
            Console.WriteLine(Receive receipt message: " + receivedmessage.messagEID +  receivedOrder.orderID);
            messsageReceiver.Close();
        }

实现接收消息并发送回执方法:

 1         <sumMary>
 2          接收消息并回执
 3         </sumMary>
 4          Receivemessage()
 5         {
 6              serviceBusUtils();
 7 
 8              manager.GetSessionQueueClient(requestQueueName);
 9             var session = requestClient.AcceptmessageSession();
10             var requestmessage = session.Receive();
11            
12             if (requestmessage != null)
13             {
14                 var receivedOrder = requestmessage.GetBody<SALEsOrder>();
15                 Console.WriteLine(Receive message: " + requestmessage.messagEID +  receivedOrder.orderID);
16 
17                 var responsemessage =  brokeredmessage(receivedOrder);
18                 responsemessage.Properties.Add(String());
19                 responsemessage.ReplyToSessionId = ReplyToSessionId;
20                 responsemessage.messagEID = ResponSEOrdermessage001;
21                 responsemessage.SessionId = requestmessage.SessionId;
22                
23                 发送回执消息
24                  manager.GetSessionQueueClient(requestmessage.ReplyTo);
25                 responseClient.Send(responsemessagE);
26                 Console.WriteLine(Send receipt message: " + responsemessage.messagEID +  receivedOrder.orderID);               
27             }
28         }
@H_284_0@main方法中,启动两个工作线程:一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

因为涉及到Azure messaging中队列的第一次创建,Azure messaging是不支持多个请求同时创建同一个队列的,因此,我们两个线程间做一个简单的Task.Delay(3000).Wait();

 1         void Main([] args)
 2  3             var sendTask = Task.Factory.StartNew(() => { Sendmessage(); });
 4             Task.Delay(3000).Wait();
 5             var receiveTask = Task.Factory.StartNew(() => { Receivemessage(); });
 6 
 7             Task.WaitAll(sendTask,receiveTask);
 8 
 9             Console.ReadKey();           
10         }

我们看看程序输出:

Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

Azure 服务总线中的队列:

Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

可以看出:Azure messaging-serviceBus messaging 基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现消息回执机制。

 

周国庆

2017/3/23

 

大佬总结

以上是大佬教程为你收集整理的Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执全部内容,希望文章能够帮你解决Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签: