[win32] Winsock 笔记 (3) ---- IOCP
2008-04-05 21:04:43| 分类:
win32
| 标签:
|举报
|字号大中小 订阅
虽然一直都知道 windows 下可以用 IOCP(I/O Completion Port),但自己都没用过。今天总算把一个基于 IOCP 的 simple server 完成了(还有bug)。
目前只能连上一个 client,一旦连上了多个,就会 crash。暂时还没时间查证问题,估计是某个地方出错但没处理,而接下去的其他函数又用了错误地返回值。
ps. 顺便骂一下《Windows网络编程 2nd》,不知道是翻译版很垃圾,还是原书就是如此。反正代码错误颇多,看得人很不爽。另外,windows 的鬼函数永远是那么多的参数,写的人要抓狂。
ok 言归正传,下面来简单说一下使用 IOCP 要注意的问题。
我的理解只有一个,对于 IOCP,留意 WSARecv() 的第六个参数(OVERLAPPED那个),此地并非传入一个 struct overlapped,而是一个自定义的结构体,当 GetQueuedCompletionStatus() 时,从第四个参数(OVERLAPPED那个)可以取到传入 WSARecv() 的结构体。
简单说起来,就是很多东西从一个函数传进去(userland -> kernel),过一会又从另一个函数取回来(kernel -> userland)。专业一点说,IOCP 属于 async I/O,把一个事件注册给操作系统,当事件完成后,操作系统自动通知你(windows 上IOCP的通知机制就是 thread)。
IOCP 的东西,还不太理解,有空继续研究下。
-------------------------
#include <winsock2.h>
#include <windows.h>
#include <tchar.h>
#include <stdlib.h>
#include <stdio.h>
#pragma comment(lib, "ws2_32.lib")
#define SEND_POSTED 1
#define RECV_POSTED 2
#define DATA_BUFSIZE 4092
typedef struct
{
SOCKET sock_fd;
SOCKADDR_STORAGE client_addr;
} PER_HANDLE_DATA;
typedef struct
{
OVERLAPPED overlapped;
WSABUF wbuf;
char buffer[DATA_BUFSIZE];
DWORD buf_len;
int op_type;
} PER_IO_DATA;
typedef struct
{
HANDLE complete_port;
int thread_index;
} MY_THREAD_DATA;
void errx(const _TCHAR *msg)
{
_putts(msg);
exit(-1);
}
void shutdown_network()
{
WSACleanup();
_putts(_T("network shutdown ..."));
}
void init_network()
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup( wVersionRequested, &wsaData );
if ( err != 0 )
{
errx(_T("init network fail!"));
exit(-1);
}
atexit(shutdown_network);
_putts(_T("network init ..."));
}
DWORD get_system_cpu_num(void)
{
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
}
DWORD WINAPI server_work_thread(LPVOID lpParam)
{
MY_THREAD_DATA *MyThreadData = (MY_THREAD_DATA *) lpParam;
HANDLE CompletionPort = MyThreadData->complete_port;
DWORD BytesTransferred;
PER_HANDLE_DATA *PerHandleData;
PER_IO_DATA *PerIoData;
printf("Enter Thread #%d ...\n", MyThreadData->thread_index);
while (TRUE)
{
BOOL ret;
ret = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
(LPDWORD) &PerHandleData, (LPOVERLAPPED *)&(PerIoData), INFINITE);
printf("Thread #%d, GetQueuedCompletionStatus() = %d\n", MyThreadData->thread_index, ret);
if ( BytesTransferred == 0 &&
(PerIoData->op_type == RECV_POSTED || PerIoData->op_type == SEND_POSTED) )
{
// connection close by peer
closesocket(PerHandleData->sock_fd);
free(PerHandleData);
free(PerIoData);
continue;
}
if ( PerIoData->op_type == RECV_POSTED )
{
PerIoData->buffer[PerIoData->wbuf.len] = '\0';
printf("thread #%d, recv data: %s\n", MyThreadData->thread_index, PerIoData->buffer);
}
// recv again
DWORD Flags = 0;
ZeroMemory(PerIoData, sizeof(*PerIoData));
PerIoData->wbuf.buf = PerIoData->buffer;
PerIoData->wbuf.len = DATA_BUFSIZE;
PerIoData->op_type = RECV_POSTED;
if ( WSARecv(PerHandleData->sock_fd, &(PerIoData->wbuf), 1, &(PerIoData->buf_len),
&Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR )
{
if ( WSAGetLastError() != WSA_IO_PENDING )
errx(_T("thread::WSARecv() error!"));
else
printf("thread #%d WSARecv() WSA_IO_PENDING, good!\n", MyThreadData->thread_index);
}
}
return 0;
}
int main(void)
{
DWORD ncpu;
HANDLE CompletionPort;
init_network();
// init IOCP & threads
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
ncpu = get_system_cpu_num();
for ( size_t i = 0; i < ncpu+2; i++ )
{
HANDLE ThreadHandle;
MY_THREAD_DATA *tdata = (MY_THREAD_DATA *) calloc(1, sizeof(MY_THREAD_DATA));
tdata->complete_port = CompletionPort;
tdata->thread_index = i+1;
ThreadHandle = CreateThread(NULL, 0, server_work_thread, (LPVOID) tdata, 0, NULL);
CloseHandle(ThreadHandle);
}
//
SOCKET listen_fd;
listen_fd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if ( INVALID_SOCKET == listen_fd )
errx(_T("WSASocket() fail!"));
struct sockaddr_in addr_in;
ZeroMemory(&addr_in, sizeof(addr_in));
addr_in.sin_family = AF_INET;
addr_in.sin_addr.s_addr = htonl(INADDR_ANY);
addr_in.sin_port = htons(8888);
if ( bind(listen_fd, (struct sockaddr *) &addr_in, sizeof(addr_in)) == SOCKET_ERROR )
errx(_T("bind() fail!"));
if ( listen(listen_fd, 5) == SOCKET_ERROR )
errx(_T("listen() fail!"));
while (TRUE)
{
PER_HANDLE_DATA * PerHandleData = NULL;
SOCKET client_fd;
struct sockaddr_in remote_addr;
int remote_len = sizeof(remote_addr);
puts("main::begin accept() ..");
client_fd = accept(listen_fd, (struct sockaddr *) &remote_addr, &remote_len);
PerHandleData = (PER_HANDLE_DATA *) calloc(1, sizeof(PER_HANDLE_DATA));
printf("socket number %d connected\n", client_fd);
PerHandleData->sock_fd = client_fd;
memcpy(&PerHandleData->client_addr, &remote_addr, remote_len);
CreateIoCompletionPort((HANDLE)client_fd, CompletionPort, (ULONG_PTR) PerHandleData, 0);
// recv data
PER_IO_DATA *PerIoData;
DWORD Flags = 0;
PerIoData = (PER_IO_DATA *) calloc(1, sizeof(PerIoData));
ZeroMemory(PerIoData, sizeof(*PerIoData));
PerIoData->wbuf.buf = PerIoData->buffer;
PerIoData->wbuf.len = DATA_BUFSIZE;
PerIoData->op_type = RECV_POSTED;
if ( WSARecv(client_fd, &(PerIoData->wbuf), 1, &(PerIoData->buf_len),
&Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR )
{
if ( WSAGetLastError() != WSA_IO_PENDING )
errx(_T("main::WSARecv() error!"));
else
_putts(_T("main::WSARecv() WSA_IO_PENDING, good!"));
}
}
return 0;
}
评论这张
转发至微博
转发至微博
评论