登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[ZeroMQ] messaging pattern (2) -- pipeline  

2010-11-25 16:34:53|  分类: net_ZeroMQ |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
pipeline,单向的数据发送。

要点:
  1. bind() 要先于 connect(),否则 connecting fail
  2. 如下两种情况,zmq_connect() / zmq_bind() 要用对。

可以多个 push 一个 pull,此时 push 要使用 zmq_connect(),而 pull 要使用 zmq_bind()。
[0MQ] messaging pattern (2) -- pipeline - kasicass - Code@Pig Home
 
另一种情况是一个 push,多个 pull,则 push 使用 zmq_bind(),而 pull 使用 zmq_connect()
[0MQ] messaging pattern (2) -- pipeline - kasicass - Code@Pig Home
 
下面演示了第一种情况。
------------------------------------------------------------------
#include <zmq.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>

void *push_thread(void *ctx)
{
        pthread_t pid;
        zmq_msg_t msg;
        void *push;
        char buf[80];

        pid  = pthread_self();
        push = zmq_socket(ctx, ZMQ_PUSH);
        zmq_connect(push, "inproc://pull-it");

        while (1)
        {
                sprintf(buf, "#%p send\n", pid);

                zmq_msg_init_size(&msg, strlen(buf)+1);
                memcpy(zmq_msg_data(&msg), buf, strlen(buf)+1);
                zmq_send(push, &msg, 0);
                zmq_msg_close(&msg);

                sleep(5);
        }

        return (void *)0;
}

int main()
{
        zmq_msg_t msg;
        void *ctx, *pull;
        pthread_t pid1, pid2;

        ctx  = zmq_init(0);
        pull = zmq_socket(ctx, ZMQ_PULL);
        zmq_bind(pull, "inproc://pull-it");

        pthread_create(&pid1, NULL, push_thread, ctx);
        pthread_create(&pid2, NULL, push_thread, ctx);

        while (1)
        {
                zmq_recv(pull, &msg, 0);
                printf("recv: %s", (char *)zmq_msg_data(&msg));
                zmq_msg_close(&msg);
        }

        return 0;
}
------------------------------------------------------------------
$ gcc -g -Wall -I/usr/local/include main.c -L/usr/local/lib -lzmq -pthread
$ ./a.out 
recv: #0x28429ec0 send
recv: #0x2842a140 send
recv: #0x28429ec0 send
recv: #0x2842a140 send
recv: #0x28429ec0 send
recv: #0x2842a140 send
------------------------------------------------------------------

FAQ:
1. 如果1个producer对应N个consumer,我想优雅的结束一个consumer,让其从producer的load balance中删除,同时这个consumer本身还可以把队列中余下的message都处理完?
    目前,做不到。zmq_close() 可以关闭socket,但此时在队列中的message都会被释放。
    只有让producer先结束,才能关闭consumer。
  评论这张
 
阅读(2749)| 评论(0)

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018