登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[win32] Winsock 笔记 (3) ---- IOCP  

2008-04-05 21:04:43|  分类: win32 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
虽然一直都知道 windows 下可以用 IOCP(I/O Completion Port),但自己都没用过。今天总算把一个基于 IOCP 的 simple server 完成了(还有bug)。

目前只能连上一个 client,一旦连上了多个,就会 crash。暂时还没时间查证问题,估计是某个地方出错但没处理,而接下去的其他函数又用了错误地返回值。

ps. 顺便骂一下《Windows网络编程 2nd》,不知道是翻译版很垃圾,还是原书就是如此。反正代码错误颇多,看得人很不爽。另外,windows 的鬼函数永远是那么多的参数,写的人要抓狂。

ok 言归正传,下面来简单说一下使用 IOCP 要注意的问题。

我的理解只有一个,对于 IOCP,留意 WSARecv() 的第六个参数(OVERLAPPED那个),此地并非传入一个 struct overlapped,而是一个自定义的结构体,当 GetQueuedCompletionStatus() 时,从第四个参数(OVERLAPPED那个)可以取到传入 WSARecv() 的结构体。

简单说起来,就是很多东西从一个函数传进去(userland -> kernel),过一会又从另一个函数取回来(kernel -> userland)。专业一点说,IOCP 属于 async I/O,把一个事件注册给操作系统,当事件完成后,操作系统自动通知你(windows 上IOCP的通知机制就是 thread)。

IOCP 的东西,还不太理解,有空继续研究下。

-------------------------
#include <winsock2.h>
#include <windows.h>
#include <tchar.h>
#include <stdlib.h>
#include <stdio.h>

#pragma comment(lib, "ws2_32.lib")



#define    SEND_POSTED                1
#define    RECV_POSTED                2

#define DATA_BUFSIZE            4092

typedef struct
{
    SOCKET sock_fd;
    SOCKADDR_STORAGE client_addr;
} PER_HANDLE_DATA;

typedef struct
{
    OVERLAPPED    overlapped;
    WSABUF        wbuf;

    char        buffer[DATA_BUFSIZE];
    DWORD        buf_len;
    int            op_type;
} PER_IO_DATA;

typedef struct
{
    HANDLE        complete_port;
    int            thread_index;
} MY_THREAD_DATA;


void errx(const _TCHAR *msg)
{
    _putts(msg);
    exit(-1);
}


void shutdown_network()
{
    WSACleanup();
    _putts(_T("network shutdown ..."));
}

void init_network()
{
    WORD wVersionRequested;
    WSADATA wsaData;
    int err;

    wVersionRequested = MAKEWORD(2, 2);

    err = WSAStartup( wVersionRequested, &wsaData );
    if ( err != 0 )
    {
        errx(_T("init network fail!"));
        exit(-1);
    }

    atexit(shutdown_network);
    _putts(_T("network init ..."));
}

DWORD get_system_cpu_num(void)
{
    SYSTEM_INFO si;

    GetSystemInfo(&si);
    return si.dwNumberOfProcessors;
}

DWORD WINAPI server_work_thread(LPVOID lpParam)
{
    MY_THREAD_DATA *MyThreadData = (MY_THREAD_DATA *) lpParam;
    HANDLE CompletionPort = MyThreadData->complete_port;

    DWORD  BytesTransferred;
    PER_HANDLE_DATA *PerHandleData;
    PER_IO_DATA *PerIoData;


    printf("Enter Thread #%d ...\n", MyThreadData->thread_index);
    while (TRUE)
    {
        BOOL ret;

        ret = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
                (LPDWORD) &PerHandleData, (LPOVERLAPPED *)&(PerIoData), INFINITE);

        printf("Thread #%d, GetQueuedCompletionStatus() = %d\n", MyThreadData->thread_index, ret);
        if ( BytesTransferred == 0 &&
            (PerIoData->op_type == RECV_POSTED || PerIoData->op_type == SEND_POSTED) )
        {
            // connection close by peer
            closesocket(PerHandleData->sock_fd);
            free(PerHandleData);
            free(PerIoData);
            continue;
        }

        if ( PerIoData->op_type == RECV_POSTED )
        {
            PerIoData->buffer[PerIoData->wbuf.len] = '\0';
            printf("thread #%d, recv data: %s\n", MyThreadData->thread_index, PerIoData->buffer);
        }


        // recv again
        DWORD Flags = 0;

        ZeroMemory(PerIoData, sizeof(*PerIoData));
        PerIoData->wbuf.buf = PerIoData->buffer;
        PerIoData->wbuf.len = DATA_BUFSIZE;
        PerIoData->op_type  = RECV_POSTED;

        if ( WSARecv(PerHandleData->sock_fd, &(PerIoData->wbuf), 1, &(PerIoData->buf_len),
                &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR )
        {
            if ( WSAGetLastError() != WSA_IO_PENDING )
                errx(_T("thread::WSARecv() error!"));
            else
                printf("thread #%d WSARecv() WSA_IO_PENDING, good!\n", MyThreadData->thread_index);
        }
    }

    return 0;
}


int main(void)
{
    DWORD ncpu;
    HANDLE CompletionPort;

    init_network();

    // init IOCP & threads
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    ncpu = get_system_cpu_num();
    for ( size_t i = 0; i < ncpu+2; i++ )
    {
        HANDLE ThreadHandle;
        MY_THREAD_DATA *tdata = (MY_THREAD_DATA *) calloc(1, sizeof(MY_THREAD_DATA));

        tdata->complete_port = CompletionPort;
        tdata->thread_index  = i+1;

        ThreadHandle = CreateThread(NULL, 0, server_work_thread, (LPVOID) tdata, 0, NULL);
        CloseHandle(ThreadHandle);
    }

    //
    SOCKET listen_fd;
    listen_fd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if ( INVALID_SOCKET == listen_fd )
        errx(_T("WSASocket() fail!"));

    struct sockaddr_in addr_in;
    ZeroMemory(&addr_in, sizeof(addr_in));
    addr_in.sin_family      = AF_INET;
    addr_in.sin_addr.s_addr = htonl(INADDR_ANY);
    addr_in.sin_port        = htons(8888);

    if ( bind(listen_fd, (struct sockaddr *) &addr_in, sizeof(addr_in)) == SOCKET_ERROR )
        errx(_T("bind() fail!"));
   
    if ( listen(listen_fd, 5) == SOCKET_ERROR )
        errx(_T("listen() fail!"));

    while (TRUE)
    {
        PER_HANDLE_DATA * PerHandleData = NULL;
        SOCKET client_fd;

        struct sockaddr_in remote_addr;
        int remote_len = sizeof(remote_addr);

        puts("main::begin accept() ..");
        client_fd = accept(listen_fd, (struct sockaddr *) &remote_addr, &remote_len);
        PerHandleData = (PER_HANDLE_DATA *) calloc(1, sizeof(PER_HANDLE_DATA));
        printf("socket number %d connected\n", client_fd);

        PerHandleData->sock_fd = client_fd;
        memcpy(&PerHandleData->client_addr, &remote_addr, remote_len);

        CreateIoCompletionPort((HANDLE)client_fd, CompletionPort, (ULONG_PTR) PerHandleData, 0);


        // recv data
        PER_IO_DATA *PerIoData;
        DWORD Flags = 0;
       
        PerIoData = (PER_IO_DATA *) calloc(1, sizeof(PerIoData));
        ZeroMemory(PerIoData, sizeof(*PerIoData));

        PerIoData->wbuf.buf = PerIoData->buffer;
        PerIoData->wbuf.len = DATA_BUFSIZE;
        PerIoData->op_type  = RECV_POSTED;
        if ( WSARecv(client_fd, &(PerIoData->wbuf), 1, &(PerIoData->buf_len),
                &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR )
        {
            if ( WSAGetLastError() != WSA_IO_PENDING )
                errx(_T("main::WSARecv() error!"));
            else
                _putts(_T("main::WSARecv() WSA_IO_PENDING, good!"));
        }
    }
   
    return 0;
}

  评论这张
 
阅读(1161)| 评论(0)

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018