本篇文章会接上一篇,完成 重叠I/O 模型
重叠模型是让应用程序使用重叠数据结构(WSAOVERLAPPED),一次投递一个或多个 WinSock I/O请求。针对这些请求,在它们完成后,应用程序会受到通知,于是就可以通过另外的代码来处理这些数据了。
有两种方法可以用来管理重叠I/O请求完成的情况 – 即接到重叠操作完成的通知时处理
- 事件对象通知 (Event Object Notification)
- 完成例程(Completion Routies)
先讲事件对象通知吧。
基于事件通知的方法,就是要将WinSock事件对象与WSAOVERLPPED结构关联在一起,在使用重叠结构的情况下,我们常用的 send,sendto,recv,recvfrom 也要被WSASend,WSASendto,WSARecv,WSARecvfrom 替换掉了。
还是看代码吧,我尽量用代码详详细细的讲这个模型:
|
#include <winsock2.h> #include <process.h> #include <stdio.h> #pragma comment(lib,"ws2_32.lib") #define PORT 5150 #define MSGSIZE 1024 typedef struct { WSAOVERLAPPED overlap; WSABUF Buffer; char szMessage[MSGSIZE]; DWORD NumberOfByteRecv; DWORD Flags; }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA; //总连接数量 int g_iTotalConnect = 0; //套接字数组 SOCKET g_ClientSocketArr[MAXIMUM_WAIT_OBJECTS]; //事件数组 WSAEVENT g_ClientEventArr[MAXIMUM_WAIT_OBJECTS]; //信息结构体数组 LPPER_IO_OPERATION_DATA g_pPerIODataArr[MAXIMUM_WAIT_OBJECTS]; //工作线程 unsigned int WINAPI WorkerThread(void* pParam); //清理函数 void Cleanup(int index); int main() { //初始化环境 WORD wVersionRequested = MAKEWORD(2, 2); WSADATA wsaData; int ret = WSAStartup(wVersionRequested, &wsaData); if(ret != 0) { return 0; } //创建套接字 SOCKET sListen = socket(AF_INET, SOCK_STREAM, 0); //绑定端口 SOCKADDR_IN local; local.sin_addr.S_un.S_addr = htonl(INADDR_ANY); local.sin_family = AF_INET; local.sin_port = htons(PORT); bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN)); //监听 listen(sListen, 20); //创建工作线程 unsigned int dwThreadId; _beginthreadex(NULL, 0, WorkerThread, NULL, 0, &dwThreadId); //接受线程 SOCKET sClient; SOCKADDR_IN client; int iaddrSize = sizeof(SOCKADDR_IN); while(TRUE) { //如果连接数量没有超过最大值 if(g_iTotalConnect < MAXIMUM_WAIT_OBJECTS) { //接受 sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize); //输出信息 printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port)); //将客户端套接字放入数组 g_ClientSocketArr[g_iTotalConnect] = sClient; //分派一个 PER_IO_OPERATION_DATA - HeapAlloc是一个Windows API函数。 // 它用来在指定的堆上分配内存,并且分配后的内存不可移动。 g_pPerIODataArr[g_iTotalConnect] = (LPPER_IO_OPERATION_DATA)HeapAlloc( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_OPERATION_DATA)); //设置Buffer的属性 - 缓冲区 和 缓冲区大小 g_pPerIODataArr[g_iTotalConnect]->Buffer.buf = g_pPerIODataArr[g_iTotalConnect]->szMessage; g_pPerIODataArr[g_iTotalConnect]->Buffer.len = MSGSIZE; //创建事件对象 - 将事件对象放入数组 g_pPerIODataArr[g_iTotalConnect]->overlap.hEvent = WSACreateEvent(); g_ClientEventArr[g_iTotalConnect] = g_pPerIODataArr[g_iTotalConnect]->overlap.hEvent; //启动异步操作 - 完成之后会得到事件消息通知的 WSARecv( g_ClientSocketArr[g_iTotalConnect], &g_pPerIODataArr[g_iTotalConnect]->Buffer, 1, &g_pPerIODataArr[g_iTotalConnect]->NumberOfByteRecv, &g_pPerIODataArr[g_iTotalConnect]->Flags, &g_pPerIODataArr[g_iTotalConnect]->overlap, NULL); //连接数量 + 1 g_iTotalConnect++; } } return 0; } unsigned int WINAPI WorkerThread(void* pParam) { DWORD cbTransferred; while(TRUE) { //等待多个事件对象,只要有一个触发了 - 就返回 int ret = WSAWaitForMultipleEvents(g_iTotalConnect, g_ClientEventArr, FALSE, 1000, FALSE); if(ret == WSA_WAIT_TIMEOUT || ret == WSA_WAIT_FAILED) { continue; } //计算出是哪一个触发了 int index = ret - WSA_WAIT_EVENT_0; //重置这个事件对象 WSAResetEvent(g_ClientEventArr[index]); //获取返回结果 WSAGetOverlappedResult( g_ClientSocketArr[index], &g_pPerIODataArr[index]->overlap, &cbTransferred, TRUE, &g_pPerIODataArr[index]->Flags); //如果是 0 表示退出了 if(cbTransferred == 0) { Cleanup(index); } else { //回射 g_pPerIODataArr[index]->szMessage[cbTransferred] = '\0'; send(g_ClientSocketArr[index], g_pPerIODataArr[index]->szMessage, cbTransferred, 0); //再请求 WSARecv( g_ClientSocketArr[g_iTotalConnect], &g_pPerIODataArr[g_iTotalConnect]->Buffer, 1, &g_pPerIODataArr[g_iTotalConnect]->NumberOfByteRecv, &g_pPerIODataArr[g_iTotalConnect]->Flags, &g_pPerIODataArr[g_iTotalConnect]->overlap, NULL); } } } //清理函数 void Cleanup(int index) { closesocket(g_ClientSocketArr[index]); WSACloseEvent(g_ClientEventArr[index]); //这里是在默认堆 - 释放 HeapFree(GetProcessHeap(), 0, g_pPerIODataArr[index]); //这里用的是一个交换的方法 - 避免了移动的耗费 if(index < g_iTotalConnect - 1) { g_ClientSocketArr[index] = g_ClientSocketArr[g_iTotalConnect - 1]; g_ClientEventArr[index] = g_ClientEventArr[g_iTotalConnect - 1]; g_pPerIODataArr[index] = g_pPerIODataArr[g_iTotalConnect - 1]; } g_pPerIODataArr[--g_iTotalConnect] = NULL; } |
代码是很清晰的…主要麻烦的是各个函数的意思。
首先我们来看看重叠结构的成员吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
typedef struct _OVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; } DUMMYSTRUCTNAME; PVOID Pointer; } DUMMYUNIONNAME; HANDLE hEvent; } OVERLAPPED, *LPOVERLAPPED; |
上文里面,我们要关注的只有最后一个 hEvent – 事件对象句柄。
接下来是我们的一个接受函数 – 它的参数比recv函数要多,因为会用到重叠结构
1 2 3 4 5 6 7 8 9 |
int WSARecv( SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesRecvd, LPDWORD lpFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine ); |
七个参数:
- 套接字
- 接受缓冲区 – 这里需要一个由WSABUF结构构成的数组
- 数组中WSABUF结构数量
- 如果接收操作立即完成,这里会返回函数调用所接受到的字节数
- 设为0即可
- 绑定的重叠结构
- 完成例程中会用到的参数 – 设置为NULL
然后来看看等待函数 – WSAWaitForMultipleEvents
1 2 3 4 5 6 |
DWORD WSAWaitForMultipleEvents( DWORD cEvents, const WSAEVENT* lphEvents, BOOL fWaitAll, DWORD dwTimeout, BOOL fAlertable ); |
五个参数:
- 等待的事件总数量
- 事件数组的指针
- 设置为 TRUE,所有事件被触发的时候函数才会返回 – FALSE则任何一个事件被触发函数都要返回
- 超时时间 – 如果超时,返回WSA_WAIT_TIMEOUT – 如果设置为 0 ,则函数立即返回 – 如果设置为INFINITE,则只有某一事件触发后才会返回。
- 先设置为FALSE
返回值:
WSA_WAIT_TIMEOUT :最常见的返回值,我们需要做的就是继续Wait
WSA_WAIT_FAILED : 出现了错误,请检查cEvents和lphEvents两个参数是否有效
WSAWaitForMultipleEvents函数只能支持由WSA_MAXIMUM_WAIT_EVENTS对象定义的一个最大值,是 64,就是说WSAWaitForMultipleEvents只能等待64个事件,如果想同时等待多于64个事件,就要 创建额外的工作者线程,就不得不去管理一个线程池。
查询重叠操作完成的结果 – WSAGetOverlappedResult
1 2 3 4 5 6 |
BOOL WSAGetOverlappedResult( SOCKET s, LPWSAOVERLAPPED lpOverlapped, LPDWORD lpcbTransfer, BOOL fWait, LPDWORD lpdwFlags ); |
五个参数:
- 套接字
- 重叠结构的指针
- 本次操作接受 – 发送的字节数
- 如果设置为TRUE,除非重叠操作完成,否则不会返回。如果返回False,而且操作仍处于挂起状态,那么函数就会返回FALSE…
- 指针 – 接收结果标志
以上就是事件对象通知的全部了:
接下来是完成例程的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
#include <WinSock2.h> #include <process.h> #include <stdio.h> #pragma comment(lib,"ws2_32.lib") #define PORT 5150 #define MSGSIZE 1024 typedef struct { WSAOVERLAPPED overlap; WSABUF Buffer; char szMessage[MSGSIZE]; DWORD NumberOfByteRecvd; DWORD Flags; SOCKET sClient; }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA; unsigned int WINAPI WorkerThread(void* pParam); void CALLBACK CompletionRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD); SOCKET g_sNewClientConnection; BOOL g_bNewConnectionArrived = FALSE; int main() { //初始化环境 WSADATA wsaData; WORD wVersionRequsted = MAKEWORD(2, 2); int ret = WSAStartup(wVersionRequsted, &wsaData); if(ret != 0) { return 0; } //创建套接字 SOCKET sListen = socket(AF_INET, SOCK_STREAM, 0); //绑定端口 SOCKADDR_IN local; local.sin_addr.S_un.S_addr = htonl(INADDR_ANY); local.sin_family = AF_INET; local.sin_port = htons(PORT); bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN)); //监听 listen(sListen, 20); //创建工作线程 unsigned int dwThreadId; _beginthreadex(NULL, 0, WorkerThread, NULL, 0, &dwThreadId); //接受线程 SOCKET sClient; SOCKADDR_IN client; int iaddrSize = sizeof(SOCKADDR_IN); while(TRUE) { //应该是可以无限制的接受的 g_sNewClientConnection = accept(sListen, (struct sockaddr*)&client, &iaddrSize); g_bNewConnectionArrived = TRUE; printf("Accepted client: %s:%d \n", inet_ntoa(client.sin_addr), ntohs(client.sin_port)); } } unsigned int WINAPI WorkerThread(void* pParam) { LPPER_IO_OPERATION_DATA lpPerIOData = NULL; while(TRUE) { //如果有新的连接到达 if(g_bNewConnectionArrived) { //在堆上创建一个新数据结构 lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_OPERATION_DATA) ); //初始化数据结构 lpPerIOData->Buffer.len = MSGSIZE; lpPerIOData->Buffer.buf = lpPerIOData->szMessage; lpPerIOData->sClient = g_sNewClientConnection; //异步接收数据 WSARecv( lpPerIOData->sClient, &lpPerIOData->Buffer, 1, &lpPerIOData->NumberOfByteRecvd, &lpPerIOData->Flags, &lpPerIOData->overlap, CompletionRoutine); //设置为FALSE g_bNewConnectionArrived = FALSE; } SleepEx(1000, TRUE); } return 0; } //处理函数 void CALLBACK CompletionRoutine(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags) { LPPER_IO_OPERATION_DATA lpPerIOData = (LPPER_IO_OPERATION_DATA)lpOverlapped; if(dwError != 0 || cbTransferred == 0) { //客户端关闭了连接 closesocket(lpPerIOData->sClient); HeapFree(GetProcessHeap(), 0, lpPerIOData); } else { //回射 lpPerIOData->szMessage[cbTransferred] = 0; send(lpPerIOData->sClient, lpPerIOData->szMessage, cbTransferred, 0); //重置 memset(&lpPerIOData->overlap, 0, sizeof(WSAOVERLAPPED)); lpPerIOData->Buffer.len = MSGSIZE; lpPerIOData->Buffer.buf = lpPerIOData->szMessage; //再次接受 WSARecv( lpPerIOData->sClient, &lpPerIOData->Buffer, 1, &lpPerIOData->NumberOfByteRecvd, &lpPerIOData->Flags, &lpPerIOData->overlap, CompletionRoutine); } } |
用 完成例程来实现重叠I/O比用事件通知简单得多。
在这个模型中,主线程只用不停的接受连接即可;
辅助线程判断有没有新的客户端连接被建立,如果有,就为那 个客户端套接字激活一个异步的WSARecv操作,然后调用SleepEx使线程处于一种可警告的等待状态,以使得I/O完成后 CompletionROUTINE可以被内核调用。如果辅助线程不调用SleepEx,则内核在完成一次I/O操作后,无法调用完成例程(因为完成例程 的运行应该和当初激活WSARecv异步操作的代码在同一个线程之内)。
完成例程内的实现代码比较简单,它取出接收到的数据,然后将数据原封不动 的发送给客户端,最后重新激活另一个WSARecv异步操作。注意,在这里用到了“尾随数据”。我们在调用WSARecv的时候,参数 lpOverlapped实际上指向一个比它大得多的结构PER_IO_OPERATION_DATA,这个结构除了WSAOVERLAPPED以外,还被我们附加了缓冲区的结构信息,另外还包括客户端套接字等重要的信息。这样,在完成例程中通过参数lpOverlapped拿到的不仅仅是 WSAOVERLAPPED结构,还有后边尾随的包含客户端套接字和接收数据缓冲区等重要信息。
大致是这样 …有问题留言就好