wordpress   发布时间:2022-04-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了windows – 为什么ZeroMQ PGM组播接收卡在中间并且没有进一步接收数据包大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

概述

ZeroMQ(版本 – zeromq-4.1.6)PGM组播数据包接收卡在中间,甚至Sender仍然发送数据包没有任何问题. 如果我们重新启动Receiver,应用程序现在会收到数据包,但它不是解决方案.我在Sender&amp ;;中尝试了各种ZMQ_RATE.接收方. 问题: 发送者使用以下套接字选项发送近300,000个数据包,但接收器卡在&之间.没有收到所有的数据包.如果我们添加Sleep
ZeroMQ(版本 – zeromq-4.1.6)PGM组播数据包接收卡在中间,甚至Sender仍然发送数据包没有任何问题.

如果我们重新启动Receiver,应用程序现在会收到数据包,但它不是解决方案.我在Sender&amp ;;中尝试了各种ZMQ_RATE.接收方.

问题:

发送者使用以下套接字选项发送近300,000个数据包,但接收器卡在&之间.没有收到所有的数据包.如果我们添加Sleep(2) – 在每次发送中等待2 ms,有时我们会收到所有数据包,但它需要更多时间.

环境设置:

(发送器和接收器使用D-Link交换机在单个子网内连接.介质速度为1Gbps)

Sender: JZMQ ( ZMQ C library,openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
Packet size - 1024 bytes
ZMQ_RECOVERY_IVL - 2 minutes
Send Flag - 0 ( blocking mode )
Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer.
Platform - Windows

Receiver: ZMQ C++ ( ZMQ C library,openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
ZMQ_RCVTIMEO - 3 Secs
receive Flag - 0 ( blocking mode )
Platform - Windows

可能是什么问题?

ZeroMQ PGM-multicast是不是一个稳定的库?

JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm://local_IP;239.255.0.20:30001");

byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
    socket.send(bytesToSend,0);
    count++;
}

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"


int main(int argc,char* argv[]) {
    try {

         zmq::context_t context(1);

      // Socket to talk to server
         printf ("ConnecTing to server...");

         zmq::socket_t *s1 = new zmq::socket_t(context,ZMQ_SUB);

         int recvTimeout = 3000;
         s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));

         int recvRate = 80000;
         s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));

         int recsec = 60 * 60;
      // s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recseC));

         s1->connect("pgm://local_IP;239.255.0.20:30001");

         s1->setsockopt (ZMQ_SUBSCRIBE,NULL,0);

         printf ("done. \n");
         int seq=0;
         while(true) {

               zmq::message_t msgbuff;

               int ret = s1->recv(&msgbuff,0);
               if(!ret)
               {
                   printf ("Received not received timeout\n");
                   conTinue;
               }

               printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
               ++seq;
         }
    }
    catch( zmq::error_t &e )   {
           printf ("An error occurred: %s\n",e.what());
           return 1;
    }
    catch( std::exception &e ) {
           printf ("An error occurred: %s\n",e.what());
           return 1;
    }
    return 0;
}

解决方法

PGM是否稳定?仅供参:从2.1.1开始工作,今天我们有稳定的4.2.

这不是一个好习惯.我敢于指责图书馆维护人员在发布图书馆之前没有彻底测试过PGM / EPGM,或者在应用程序设计得到充分理解,设计稳健,诊断良好之前随时在开发中做不好的工作.性能/延迟测试在实际部署生态系统的现实检查,通常由{localhost |家庭子网|远程网络| remote-host(s)}.

[PUB] – 发送部分需要得到应有的注意:

如果不出意外,文档的这一部分就是警告并响起所有的钟声和响声.吹嘘所有口哨,如果在一些模拟SLOC中进行不充分的资源管理,而对于非阻塞,超快速循环的残酷尝试确实存在应有的谨慎:

所以,可能是对的,你的[PUB] -sender丢失了丢失的消息,然后再将这些消息发送到线路上.

一个警告来自O / S权限:

接下来是[SUB] -receiver:

一些更多的调整将有助于嗅探[PUB] -sender,类似于下面提出的[SUB] -receiver的内联状态/跟踪工具:

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//                          MODs: https://stackoverflow.com/q/44526517/3666197

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"

#include <chrono>                                                       // since C++ 11
typedef std::chrono::high_resolution_clock              nanoCLK;

#define ZMQ_IO_THREAD_POOL_SIZE                         8

#define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY    0
#define ZMQ_AFINITY_LO_PRIO_POOL                        0 | 1
#define ZMQ_AFINITY_HI_PRIO_POOL                        0 | 0 | 2
#define ZMQ_AFINITY_MC_EPGM_POOL                        0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128


int main( int argc,char* argv[] ) {

    auto RECV_start = nanoCLK::Now();
    auto RECV_ret   = nanoCLK::Now();
    auto RECV_last  = nanoCLK::Now();
    auto test_start = nanoCLK::Now();

    try {
           zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE );           printf ( "ConnecTing to server..." );
           int            major,minor,patch;
           zmq::version( &major,&minor,&patch );                      printf ( "Using ZeroMQ( %d.%d.%d )",major,patch );

           zmq::socket_t *s1 = new zmq::socket_t( context,ZMQ_SUB );   // Socket to talk to server

           int zmqLinger   =       0,// [  ms]
               zmqAffinity =       0,// [   #]  mapper bitmap-onto-IO-thread-Pool (ref. #define-s above )

               recvBuffer  =       2 * 123456,// [   B]
               recvMaxSize =    9876,// [   B]
               recvHwMark  =  123456,// [   #]  max number of MSGs allowed to be Queued per connected Peer

               recvRate    =   80000 * 10,// [kbps]
               recvTimeout =    3000,// [  ms]  before ret EAGAIN { 0: NO_BLOCK | -1: INF | N: wait [ms] }
               recoverMSEC =      60 * 60      // [  ms]
               ;

           s1->setsockopt ( ZMQ_AFFINITY,&zmqAffinity,sizeof(int) );
           s1->setsockopt ( ZMQ_LINGER,&zmqLinger,sizeof(int) );
           s1->setsockopt ( ZMQ_MAXMSGSIZE,&recvMaxSize,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVBUF,&recvBuffer,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVHWM,&recvHwMark,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVTIMEO,sizeof(int) );
           s1->setsockopt ( ZMQ_RATE,sizeof(int) );
     //    s1->setsockopt ( ZMQ_RECOVERY_IVL,&recoverMSEC,sizeof(int) );

           s1->connect ( "pgm://local_IP;239.255.0.20:30001" );
           s1->setsockopt ( ZMQ_SUBSCRIBE,0 );                   printf ( "done. \n" );

           int seq = 0;
           while( true ) {
                  zmq::message_t         msgbuff;                  RECV_start = nanoCLK::Now(); RECV_last = RECV_ret;
                  int   ret = s1->recv( &msgbuff,0 );             RECV_ret   = nanoCLK::Now();
                  if ( !ret )                                           printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n",std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - test_start ).count(),recvTimeout,ret,std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
                  else                                                  printf ( "[T0+ %14d [ns]]: [SUB] did Now receive   a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n",++seq,msgbuff.size(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
           }
    }
    catch( zmq::error_t   &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %s\nWill RET(1)",e.what() );
           return 1;
    }
    catch( std::exception &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %s\nWill RET(1)",e.what() );
           return 1;
    }
    return 0;
}

大佬总结

以上是大佬教程为你收集整理的windows – 为什么ZeroMQ PGM组播接收卡在中间并且没有进一步接收数据包全部内容,希望文章能够帮你解决windows – 为什么ZeroMQ PGM组播接收卡在中间并且没有进一步接收数据包所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。