登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[ZeroMQ] zmq与libevent等共存的一个bug  

2010-12-16 18:23:35|  分类: net_ZeroMQ |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
仅仅是 zmq_bind / zmq_connect 的顺序问题,可能导致 select() 工作异常。

 * zmq 2.1.0, ZMQ_FD bug
   pull = zmq_bind(), push = zmq_connect(), libevent + pull, ok!
   push = zmq_bind(), pull = zmq_connect(), libevent + pull, fail!

让 select() 卡住的例子:
--------------------------------------------------------------------------------------------
//// main thread ////
push = zmq_socket(ZMQ_PUSH)
zmq_bind(push)

pull = zmq_socket(ZMQ_PULL)
zmq_connect(pull)
  -> zmq::socket_base_t::connect()
     -> send_bind()                 // send 'bind' to push.mailbox

start thread 2

sleep(1);
while (1)
{
        zmq_send(push);
}

//// thread 2 ////
fd = zmq_getsockopt(pull, ZMQ_FD);

while (1)
{
    select(fd for read);
    zmq_getsockopt(pull, ZMQ_EVENTS);
    while (1) zmq_recv(pull, ZMQ_NOBLOCK);
}

// ======== sequence chart ========
//
// main thread:
//   zmq_send(push);        // push_t::xsend()
//                                      //  -> lb_t::send()
//                                      //     -> writer_t::flush()
//                                      //        -> ypipe_t::flush()
//                                      //           -> c.cas (w, f) != w
//                                      //              (Compare-and-swap was unseccessful because 'c' is NULL.)
//                                      //              (no 'activate_reader' to pull.mailbox)
//
// thread 2 (first loop):
//   select(fd for read);      // block here, as no 'activate_reader' message
//   ...
--------------------------------------------------------------------------------------------

正常工作的例子:
--------------------------------------------------------------------------------------------
//// main thread ////
pull = zmq_socket(ZMQ_PULL)
zmq_bind(pull)

push = zmq_socket(ZMQ_PUSH)
zmq_connect(push)
  -> zmq::socket_base_t::connect()
     -> send_bind()                 // send 'bind' to pull.mailbox

start thread 2

sleep(1)
while (1)
{
    zmq_send(push);
}

//// thread 2 ////
fd = zmq_getsockopt(pull, ZMQ_FD);

while (1)
{
    select(fd for read);
    zmq_getsockopt(pull, ZMQ_EVENTS);
    while (1) zmq_recv(pull, ZMQ_NOBLOCK);
}

// ======== sequence chart ========
//
// thread 2 (first loop):
//   select(fd for read);                                 // got 'bind' message
//   zmq_getsockopt(pull, ZMQ_EVENTS);   // socket_base_t::process_commands()
//                                                                 //  -> pull attach
//                                                                 // pull_t::xhas_in()
//                                                                 //  -> fq_t::has_in()
//                                                                 //     -> ypipe_t::check_read()
//                                                                 //        -> c.cas (&queue.front (), NULL);    // c = NULL
//   zmq_recv(pull, ZMQ_NOBLOCK);         // do nothing, goto to select() again
//
// main thread:
//   zmq_send(push);                    // push_t::xsend()
//                                                  //  -> lb_t::send()
//                                                  //     -> writer_t::flush()
//                                                  //        -> ypipe_t::flush()
//                                                  //           -> c.cas (w, f) != w
//                                                  //              (Compare-and-swap was unseccessful because 'c' is NULL.)
//                                                  //        -> send 'activate_reader' to pull.mailbox
//
// second loop:
//   select(fd for read);                                 // got 'activate_reader' message
//   zmq_getsockopt(pull, ZMQ_EVENTS);   // socket_base_t::process_commands()
//                                                                 //  -> fq_t::activated()
//                                                                 // pull_t::xhas_in() / set 'ZMQ_POLLIN'
//   zmq_recv(pull, ZMQ_NOBLOCK);          // consuming all zmq_msg_t
--------------------------------------------------------------------------------------------

ps. 不过 ZeroMQ 的开发者不认为这是个 bug。而认为我将 edge-triggered 的 socket 当 level-triggered 来使用了。(reader_t 默认为 readable,所以 select() 听不到任何事件,厄)
"I would say the problem is that you are using ZMQ_FD as if it was level-triggered. In fact it is edge triggered. 
I.e. you should get ZMQ_EVENTS to find out whether particular 0MQ socket is readeable/writeable.
Further on poll(ZMQ_FD) signals event only if the state changes. For example if the socket was unreadable and become readable. If the socket was readable and remains readable, ZMQ_FD is not signaled.
Martin"
说得也对,但这让我写 libevent + zeromq 的时候,很纠结。
  评论这张
 
阅读(7825)| 评论(0)

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018