注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[ZeroMQ] messaging pattern (1) -- publish/subscribe  

2010-11-03 15:45:27|  分类: net_ZeroMQ |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
ZeroMQ 中规定了几个pattern用法,除此之外,不能乱用。

publish/subscribe, 一个one-to-many的消息发布。一个publisher,多个subscriber。
如果pub发布消息的时候,sub没有连接上来,则此消息,sub是收不到的。

注意:subscribe 初始化的时候一定要设置filter,否则收不到任何消息。

[zmq] messaging pattern (1) -- publish/subscribe - kasicass - Code@Pig Home

------------------------------ server.c -------------------------------
#include <zmq.h>
#include <pthread.h>
#include <unistd.h>

#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>

static char *
s_recv (void *socket) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    if (zmq_recv (socket, &message, 0))
        exit (1);           //  Context terminated, exit

    int size = zmq_msg_size (&message);
    char *string = malloc (size + 1);
    memcpy (string, zmq_msg_data (&message), size);
    zmq_msg_close (&message);
    string [size] = 0;
    return (string);
}

static int
s_send (void *socket, char *string) {
    int rc;
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    rc = zmq_send (socket, &message, 0);
    assert (!rc);
    zmq_msg_close (&message);
    return (rc);
}

void *sub_thread(void *ctx)
{
    void *sub = zmq_socket(ctx, ZMQ_SUB);
    zmq_connect(sub, "inproc://in-pub");
    zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);    // no filter

    printf("begin in-sub!\n");
    while (1)
    {
        char *msg = s_recv(sub);
        printf("in-sub: %s\n", msg);
        free(msg);
    }

    return (void *)0;
}
int main()
{
    void *ctx, *pub;
    pthread_t pid;

    ctx = zmq_init(1);
    pub = zmq_socket(ctx, ZMQ_PUB);
    zmq_bind(pub, "inproc://in-pub");
    zmq_bind(pub, "tcp://127.0.0.1:8888");

    pthread_create(&pid, NULL, sub_thread, ctx);

    int i = 0;
    printf("begin pub!\n");
    while (1)
    {
        char msg[80];
        sprintf(msg, "num %d", i++);
        s_send(pub, msg);
        sleep(2);
    }

    zmq_close(pub);
    zmq_term(ctx);
    return 0;
}
------------------------- client.c ----------------------
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

static char *
s_recv (void *socket) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    if (zmq_recv (socket, &message, 0))
        exit (1);           //  Context terminated, exit

    int size = zmq_msg_size (&message);
    char *string = malloc (size + 1);
    memcpy (string, zmq_msg_data (&message), size);
    zmq_msg_close (&message);
    string [size] = 0;
    return (string);
}

void *sub_thread(void *ctx)
{
    void *sub = zmq_socket(ctx, ZMQ_SUB);
    zmq_connect(sub, "tcp://127.0.0.1:8888");
    zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);    // no filter

    while (1)
    {
        char *msg = s_recv(sub);
        printf("client in-sub: %s\n", msg);
        free(msg);
    }

    return (void *)0;
}

int main()
{
    void *ctx, *sub;
    pthread_t pid;

    ctx = zmq_init(1);
    sub = zmq_socket(ctx, ZMQ_SUB);
    zmq_connect(sub, "tcp://127.0.0.1:8888");
    zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);    // no filter

    printf("begin client-sub!\n");
    pthread_create(&pid, NULL, sub_thread, ctx);

    while (1)
    {
        char *msg = s_recv(sub);
        printf("client sub: %s\n", msg);
        free(msg);
    }

    zmq_term(ctx);
    return 0;
}
----------------------------------------------------------------
$ ./server 
begin in-sub!
begin pub!
in-sub: num 0
in-sub: num 1
in-sub: num 2
in-sub: num 3
in-sub: num 4
in-sub: num 5

$ ./client 
begin client-sub!
client in-sub: num 3
client sub: num 3
client in-sub: num 4
client sub: num 4
client in-sub: num 5
client sub: num 5
----------------------------------------------------------------
  评论这张
 
阅读(2749)| 评论(1)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017