WinSock IOCP 模型总结(附一个带缓存池的IOCP类)
由于篇幅原因,本文假设你已经熟悉了利用Socket进行TCP/IP编程的基本原理,并且也熟练的掌握了多线程编程技术,太基本的概念我这里就略过不提了,网上的资料应该遍地都是。
IOCP全称IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的Windows I/O模型,它可以自动处理I/O操作,并在I/O操作完成后将完成通知发送给用户。本文主要介绍基于IOCP的网络I/O操作(即socket的Accept、Send、Recv和Close等)。Windows提供了6种网络通信模型,分别是: 选择(select)模型:轮询方式探测socket上是否有收发的操作,再调用accept、recv和send操作,核心是select()函数,比阻塞模型高效一点,缺点是一次只能探测64个socket,需要手动调用recv和send进行收发数据。事件选择(WSAEventSelect)模型:原理基本同WSAAsyncSelect模型,但是不需要窗口,利用事件(Event)机制来获取socket上发生的I/O操作。缺点是一次只能等待64个事件,需要手动调用recv和send进行收发数据。优点是不用管收发过程,直接提供(发送时)/使用(接收时)数据。缺点是实现略复杂。 以上I/O模型由1-6理解难度依次提高,性能也相应地依次提高,我个人觉得重叠 I/O(Overlapped I/O)模型和IOCP(I/O Completion Port)模型并不是实现难度大,而是理解其运行机制的难度,5和6的使用比前面几种所需代码更少,更简单。下面开始正式介绍IOCP(I/O Completion Port)模型。相关概念
1、异步通信 我们知道外部设备I/O(比如磁盘读写,网络通信等)速度和CPU速度比起来是很慢的,如果我们进行外部I/O操作时在线程中等待I/O操作完成的话,此线程就会被阻塞住,相当于强迫CPU适应I/O设备的速度,这样会造成极大的CPU资源浪费。我们没必要在线程中等待I/O操作完成再执行后续的代码,而是将I/O操作请求交给设备驱动去处理,我们线程可以继续做其他事情,然后等待I/O操作完成的通知,大体的流程如下图所示:
WinSock IOCP 模型总结(附一个带缓存池的IOCP类)
图一:异步网络操作流程 我们可以从图中看到一个很明显的并行操作的过程,这就是异步调用,而“同步”的通信方式是再进行网络操作的时候主线程就挂起等待直到网络操作完成之后才可以执行后续的代码。同步方式流程如下图:
WinSock IOCP 模型总结(附一个带缓存池的IOCP类)
图二:同步网络操作流程 “异步”方式无疑比“阻塞+多线程”的方式效率要高得多。在Windows中实现异步的机制有好几种,主要区别是图一中的最后一步“通知主线程”的方式。实现操作系统调用驱动程序去收发数据的操作都是一样的,关键是“如何通知主线程取数据”。有兴趣的朋友可以搜索关键字“设备内核对象”、“事件内核对象”、APC(synchronous Procedure Call,异步过程调用)和IOCP(完成端口)。 2、重叠结构(OVERLAPPED) 在Windows中要实现异步通信,必须要用到重叠结构(OVERLAPPED),Windows中所有的异步通信都是基于它的。至于为什么叫Overlapped?Jeffrey Richter的解释是因为“执行I/O请求的时间与线程执行其他任务的时间是重叠(overlapped)的”,从这个名字我们也可能看得出来重叠结构发明的初衷了,对于重叠结构的内部细节我这里就不过多的解释了,就把它当成和其他内核对象一样,不需要深究其实现机制,只要会使用就可以了,想要了解更多重叠结构内部的朋友,请去翻阅Jeffrey Richter的《Windows via C/C++》 5th 的292页。 3、完成端口 “完成端口”这个名词中的“端口”和我们网络通信中的“端口”(0-65535)是不同的,个人感觉应该叫“完成队列”更直观一点。之所以叫“完成”端口,是因为系统在IO操作“完成”后再通知我们,也就是说当系统通知我们时,IO操作已经完成,比如说进行网络操作,系统通知我们时,并非时有数据从网络到来,而是数据已经接受完毕了,或者是socket接入已经完成等,我们只需处理后面的事情即可。 所谓的完成端口,其实就是一个“内核对象”,我们不需要深究其实现原理,只需使用相关的API把完成端口框架搭建起来,投递IO请求,然后就等待IO完成的通知。 使用完成端口的基本流程 总的来说,使用完成端口只要遵循如下几个步骤: - 建立和处理器的核数相等的工作线程(WorkerThread),这些线程不断地通过GetQueuedCompletionStatus() 函数扫描完成端口中是否有IO操作完成,如果有的话,将已经完成了的IO操作取出处理,处理完成后,再投递一个IO请求即可(下文有WorkerThread的流程图)。
- 调用CreateIoCompletionPort() 绑定listen socket 到 完成端口,并投递一个或多个AcceptEx请求。此处的AcceptEx是WinSock2 的扩展函数,作用是投递一个accept请求,当有socket接入是可以再2中的线程中处理。
以上即为完成端口的初始化和监听socket的初始化。下面介绍WorkerThread的工作流程: 1、如果为accept操作,调用CreateIoCompletionPort() 绑定新接入的socket 到 完成端口,向新接入的socket 投递一个WSARecv请求。 2、如果为WSARecv操作,处理接收到的数据,向这个socket 再投递一个WSARecv请求。 流程图如下:
WinSock IOCP 模型总结(附一个带缓存池的IOCP类)
图三:IOCP流程图 完成端口的实现(配合代码阅读更佳) 1、创建一个完成端口 - HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
复制代码 CreateIoCompletionPort的参数如下:- //功能:创建完成端口和关联完成端口
- HANDLE WINAPI CreateIoCompletionPort(
- * __in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
- * __in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
- * __in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件
- * __in DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
- * );
复制代码 CreateIoCompletionPort函数有两个功能: 我们创建时给的参数是(INVALID_HANDLE_VALUE, NULL, 0, 0)就是创建完成端口,下面会介绍关联完成端口。 2、建立Worker线程 - SYSTEM_INFO si;
- GetSystemInfo(&si);
- workerThreadNum = si.dwNumberOfProcessors * 2;
- HANDLE *workerThreads = new HANDLE[workerThreadNum];
- for (int i = 0; i < workerThreadNum; i++)
- {
- workerThreads[i] = CreateThread(0, 0, WorkerThreadProc, (void *)this, 0, 0);
- }
复制代码 我们最好是建立CPU核心数量*2那么多的线程,这样更可以充分利用CPU资源,因为完成端口的调度是非常智能的,比如我们的Worker线程有的时候可能会有Sleep()或者WaitForSingleObject()之类的情况,这样同一个CPU核心上的另一个线程就可以代替这个Sleep的线程执行了;因为完成端口的目标是要使得CPU满负荷的工作。 WorkerThreadProc是Worker线程的线程函数,线程函数的具体内容我们后面再讲。 3、创建监听socket - 1 BOOL IOCPBase::InitializeListenSocket()
- 2 {
- 3 // 生成用于监听的socket的Context
- 4 listenSockContext = new SocketContext;
- 5 listenSockContext->connSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- 6 if (INVALID_SOCKET == listenSockContext->connSocket)
- 7 return false;
- 8
- 9 // 将socket绑定到完成端口中
- 10 if (NULL == CreateIoCompletionPort((HANDLE)listenSockContext->connSocket, completionPort, (DWORD)listenSockContext, 0))
- 11 {
- 12 RELEASE_SOCKET(listenSockContext->connSocket);
- 13 return false;
- 14 }
- 15
- 16 //服务器地址信息,用于绑定socket
- 17 sockaddr_in serverAddr;
- 18
- 19 // 填充地址信息
- 20 ZeroMemory((char *)&serverAddr, sizeof(serverAddr));
- 21 serverAddr.sin_family = AF_INET;
- 22 serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
- 23 serverAddr.sin_port = htons(port);
- 24
- 25 // 绑定地址和端口
- 26 if (SOCKET_ERROR == bind(listenSockContext->connSocket, (sockaddr *)&serverAddr, sizeof(serverAddr)))
- 27 {
- 28 return false;
- 29 }
- 30
- 31 // 开始监听
- 32 if (SOCKET_ERROR == listen(listenSockContext->connSocket, SOMAXCONN))
- 33 {
- 34 return false;
- 35 }
- 36
- 37 GUID guidAcceptEx = WSAID_ACCEPTEX;
- 38 GUID guidGetAcceptSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
- 39 // 提取扩展函数指针
- 40 DWORD dwBytes = 0;
- 41 if (SOCKET_ERROR == WSAIoctl(
- 42 listenSockContext->connSocket,
- 43 SIO_GET_EXTENSION_FUNCTION_POINTER,
- 44 &guidAcceptEx,
- 45 sizeof(guidAcceptEx),
- 46 &fnAcceptEx,
- 47 sizeof(fnAcceptEx),
- 48 &dwBytes,
- 49 NULL,
- 50 NULL))
- 51 {
- 52 DeInitialize();
- 53 return false;
- 54 }
- 55
- 56 if (SOCKET_ERROR == WSAIoctl(
- 57 listenSockContext->connSocket,
- 58 SIO_GET_EXTENSION_FUNCTION_POINTER,
- 59 &guidGetAcceptSockAddrs,
- 60 sizeof(guidGetAcceptSockAddrs),
- 61 &fnGetAcceptExSockAddrs,
- 62 sizeof(fnGetAcceptExSockAddrs),
- 63 &dwBytes,
- 64 NULL,
- 65 NULL))
- 66 {
- 67 DeInitialize();
- 68 return false;
- 69 }
- 70
- 71 for (size_t i = 0; i < MAX_POST_ACCEPT; i++)
- 72 {
- 73 IOContext *ioContext = listenSockContext->GetNewIOContext();
- 74 if (false == PostAccept(listenSockContext, ioContext))
- 75 {
- 76 listenSockContext->RemoveContext(ioContext);
- 77 return false;
- 78 }
- 79 }
- 80 return true;
- 81 }
- InitializeListenSocket
复制代码 用 CreateIoCompletionPort()函数把这个监听Socket和完成端口绑定,bind(),listen(),然后提取扩展函数AcceptEx和GetAcceptSockAddrs的指针,因为AcceptEx 实际上是存在于Winsock2结构体系之外的(因为是微软另外提供的),所以如果我们直接调用AcceptEx的话,首先我们的代码就只能在微软的平台上用了,没有办法在其他平台上调用到该平台提供的AcceptEx的版本(如果有的话), 而且我们每次调用AcceptEx时,Service Provider都得要通过WSAIoctl()获取一次该函数指针,效率太低了,所以我们自己获取函数指针。然后投递AcceptEx请求。 投递AcceptEx请求的代码: - BOOL IOCPBase::PostAccept(SocketContext * sockContext, IOContext * ioContext)
- {
- DWORD dwBytes = 0;
- ioContext->ioType = ACCEPT_POSTED;
- ioContext->ioSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- if (INVALID_SOCKET == ioContext->ioSocket)
- {
- return false;
- }
-
- // 将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
- if (false == fnAcceptEx(listenSockContext->connSocket, ioContext->ioSocket, ioContext->wsaBuf.buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytes, &ioContext->overLapped))
- {
- if (WSA_IO_PENDING != WSAGetLastError())
- {
- return false;
- }
- }
- InterlockedIncrement(&acceptPostCnt);
- return true;
- }
复制代码AcceptEx 函数说明:
参数1--sListenSocket, 这个就是那个唯一的用来监听的Socket了,没什么说的; 参数2--sAcceptSocket, 用于接受连接的socket,这个就是那个需要我们事先建好的,等有客户端连接进来直接把这个Socket拿给它用的那个,是AcceptEx高性能的关键所在。 参数3--lpOutputBuffer,接收缓冲区,这也是AcceptEx比较有特色的地方,既然AcceptEx不是普通的accpet函数,那么这个缓冲区也不是普通的缓冲区,这个缓冲区包含了三个信息:一是客户端发来的第一组数据,二是server的地址,三是client地址。 参数4--dwReceiveDataLength,前面那个参数lpOutputBuffer中用于存放数据的空间大小。如果此参数=0,则Accept时将不会待数据到来,而直接返回,如果此参数不为0,那么一定得等接收到数据了才会返回,这里设为0直接返回,防止拒绝服务攻击 参数5--dwLocalAddressLength,存放本地址地址信息的空间大小; 参数6--dwRemoteAddressLength,存放本远端地址信息的空间大小; 参数7--lpdwBytesReceived,out参数,对我们来说没用,不用管; 参数8--lpOverlapped,本次重叠I/O所要用到的重叠结构。 因为每投递一次网络IO请求都要求提供一个WSABuf和WSAOVERLAPPED的参数,所以我们自定义一个IOContext类,每次投递附带这个类的变量,但要注意这个变量的生命周期,防止内存泄漏。
- class IOContext
- {
- public:
- WSAOVERLAPPED overLapped; // 每个socket的每一个IO操作都需要一个重叠结构
- SOCKET ioSocket; // 此IO操作对应的socket
- WSABUF wsaBuf; // 数据缓冲
- IO_OPERATION_TYPE ioType; // IO操作类型
- UINT connectID; // 连接ID
- IOContext()
- {
- ZeroMemory(&overLapped, sizeof(overLapped));
- ioSocket = INVALID_SOCKET;
- wsaBuf.buf = (char *)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFF_SIZE);
- wsaBuf.len = BUFF_SIZE;
- ioType = NULL_POSTED;
- connectID = 0;
- }
- ~IOContext()
- {
- RELEASE_SOCKET(ioSocket);
- if (wsaBuf.buf != NULL)
- ::HeapFree(::GetProcessHeap(), 0, wsaBuf.buf);
- }
- void Reset()
- {
- if (wsaBuf.buf != NULL)
- ZeroMemory(wsaBuf.buf, BUFF_SIZE);
- else
- wsaBuf.buf = (char *)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFF_SIZE);
- ZeroMemory(&overLapped, sizeof(overLapped));
- ioType = NULL_POSTED;
- connectID = 0;
- }
- };
复制代码 对于每一个socket也定义了一个SocketContext的类和一个IOContextPool的缓冲池类的具体请查看代码。
- 1 class SocketContext
- 2 {
- 3 public:
- 4 SOCKET connSocket; // 连接的socket
- 5 SOCKADDR_IN clientAddr; // 连接的远程地址
- 6
- 7 private:
- 8 vector<IOContext*> arrIoContext; // 同一个socket上的多个IO请求
- 9 static IOContextPool ioContextPool; // 空闲的IOContext池
- 10 CRITICAL_SECTION csLock;
- 11
- 12 public:
- 13 SocketContext()
- 14 {
- 15 InitializeCriticalSection(&csLock);
- 16 arrIoContext.clear();
- 17 connSocket = INVALID_SOCKET;
- 18 ZeroMemory(&clientAddr, sizeof(clientAddr));
- 19 }
- 20
- 21 ~SocketContext()
- 22 {
- 23 RELEASE_SOCKET(connSocket);
- 24
- 25 // 回收所有的IOContext
- 26 for (vector<IOContext*>::iterator it = arrIoContext.begin(); it != arrIoContext.end(); it++)
- 27 {
- 28 ioContextPool.ReleaseIOContext(*it);
- 29 }
- 30
- 31 EnterCriticalSection(&csLock);
- 32 arrIoContext.clear();
- 33 LeaveCriticalSection(&csLock);
- 34
- 35 DeleteCriticalSection(&csLock);
- 36 }
- 37
- 38 // 获取一个新的IoContext
- 39 IOContext *GetNewIOContext()
- 40 {
- 41 IOContext *context = ioContextPool.AllocateIoContext();
- 42 if (context != NULL)
- 43 {
- 44 EnterCriticalSection(&csLock);
- 45 arrIoContext.push_back(context);
- 46 LeaveCriticalSection(&csLock);
- 47 }
- 48 return context;
- 49 }
- 50
- 51 // 从数组中移除一个指定的IoContext
- 52 void RemoveContext(IOContext* pContext)
- 53 {
- 54 for (vector<IOContext*>::iterator it = arrIoContext.begin(); it != arrIoContext.end(); it++)
- 55 {
- 56 if (pContext == *it)
- 57 {
- 58 ioContextPool.ReleaseIOContext(*it);
- 59
- 60 EnterCriticalSection(&csLock);
- 61 arrIoContext.erase(it);
- 62 LeaveCriticalSection(&csLock);
- 63
- 64 break;
- 65 }
- 66 }
- 67 }
- 68
- 69 //
- 70 };
- class SocketContext
复制代码- 1 // 空闲的IOContext管理类(IOContext池)
- 2 class IOContextPool
- 3 {
- 4 private:
- 5 list<IOContext *> contextList;
- 6 CRITICAL_SECTION csLock;
- 7
- 8 public:
- 9 IOContextPool()
- 10 {
- 11 InitializeCriticalSection(&csLock);
- 12 contextList.clear();
- 13
- 14 EnterCriticalSection(&csLock);
- 15 for (size_t i = 0; i < INIT_IOCONTEXT_NUM; i++)
- 16 {
- 17 IOContext *context = new IOContext;
- 18 contextList.push_back(context);
- 19 }
- 20 LeaveCriticalSection(&csLock);
- 21
- 22 }
- 23
- 24 ~IOContextPool()
- 25 {
- 26 EnterCriticalSection(&csLock);
- 27 for (list<IOContext *>::iterator it = contextList.begin(); it != contextList.end(); it++)
- 28 {
- 29 delete (*it);
- 30 }
- 31 contextList.clear();
- 32 LeaveCriticalSection(&csLock);
- 33
- 34 DeleteCriticalSection(&csLock);
- 35 }
- 36
- 37 // 分配一个IOContxt
- 38 IOContext *AllocateIoContext()
- 39 {
- 40 IOContext *context = NULL;
- 41
- 42 EnterCriticalSection(&csLock);
- 43 if (contextList.size() > 0) //list不为空,从list中取一个
- 44 {
- 45 context = contextList.back();
- 46 contextList.pop_back();
- 47 }
- 48 else //list为空,新建一个
- 49 {
- 50 context = new IOContext;
- 51 }
- 52 LeaveCriticalSection(&csLock);
- 53
- 54 return context;
- 55 }
- 56
- 57 // 回收一个IOContxt
- 58 void ReleaseIOContext(IOContext *pContext)
- 59 {
- 60 pContext->Reset();
- 61 EnterCriticalSection(&csLock);
- 62 contextList.push_front(pContext);
- 63 LeaveCriticalSection(&csLock);
- 64 }
- 65 };
- class IOContextPool
复制代码4、Worker线程 这个工作线程所要做的工作就是几个Worker线程哥几个一起排好队队来监视完成端口的队列中是否有完成的网络操作就好了,代码大体如下: - DWORD IOCPBase::WorkerThreadProc(LPVOID lpParam)
- {
- IOCPBase *iocp = (IOCPBase*)lpParam;
- OVERLAPPED *ol = NULL;
- SocketContext *sockContext;
- DWORD dwBytes = 0;
- IOContext *ioContext = NULL;
- while (WAIT_OBJECT_0 != WaitForSingleObject(iocp->stopEvent, 0))
- {
- BOOL bRet = GetQueuedCompletionStatus(iocp->completionPort, &dwBytes, (PULONG_PTR)&sockContext, &ol, INFINITE);
- // 读取传入的参数
- ioContext = CONTAINING_RECORD(ol, IOContext, overLapped);
- // 收到退出标志
- if (EXIT_CODE == (DWORD)sockContext)
- {
- break;
- }
- if (!bRet)
- {
- DWORD dwErr = GetLastError();
- // 如果是超时了,就再继续等吧
- if (WAIT_TIMEOUT == dwErr)
- {
- // 确认客户端是否还活着...
- if (!iocp->IsSocketAlive(sockContext->connSocket))
- {
- iocp->OnConnectionClosed(sockContext);
- // 回收socket
- iocp->DoClose(sockContext);
- continue;
- }
- else
- {
- continue;
- }
- }
- // 可能是客户端异常退出了(64)
- else if (ERROR_NETNAME_DELETED == dwErr)
- {
- iocp->OnConnectionError(sockContext, dwErr);
- // 回收socket
- iocp->DoClose(sockContext);
- continue;
- }
- else
- {
- iocp->OnConnectionError(sockContext, dwErr);
- // 回收socket
- iocp->DoClose(sockContext);
- continue;
- }
- }
- else
- {
- // 判断是否有客户端断开
- if ((0 == dwBytes) && (RECV_POSTED == ioContext->ioType || SEND_POSTED == ioContext->ioType))
- {
- iocp->OnConnectionClosed(sockContext);
- // 回收socket
- iocp->DoClose(sockContext);
- continue;
- }
- else
- {
- switch (ioContext->ioType)
- {
- case ACCEPT_POSTED:
- iocp->DoAccpet(sockContext, ioContext);
- break;
- case RECV_POSTED:
- iocp->DoRecv(sockContext, ioContext);
- break;
- case SEND_POSTED:
- iocp->DoSend(sockContext, ioContext);
- break;
- default:
- break;
- }
- }
- }
- }
- // 释放线程参数
- RELEASE(lpParam);
- return 0;
- }
- WorkerThreadProc
复制代码其中的GetQueuedCompletionStatus()就是Worker线程里第一件也是最重要的一件事了,会让Worker线程进入不占用CPU的睡眠状态,直到完成端口上出现了需要处理的网络操作或者超出了等待的时间限制为止。 一旦完成端口上出现了已完成的I/O请求,那么等待的线程会被立刻唤醒,然后继续执行后续的代码。 至于这个神奇的函数,原型是这样的: - BOOL WINAPI GetQueuedCompletionStatus(
- __in HANDLE CompletionPort, // 这个就是我们建立的那个唯一的完成端口
- __out LPDWORD lpNumberOfBytes, //这个是操作完成后返回的字节数
- __out PULONG_PTR lpCompletionKey, // 这个是我们建立完成端口的时候绑定的那个自定义结构体参数
- __out LPOVERLAPPED *lpOverlapped, // 这个是我们在连入Socket的时候一起建立的那个重叠结构
- __in DWORD dwMilliseconds // 等待完成端口的超时时间,如果线程不需要做其他的事情,那就INFINITE就行了
- );
复制代码如果这个函数突然返回了,那就说明有需要处理的网络操作了 --- 当然,在没有出现错误的情况下。 然后switch()一下,根据需要处理的操作类型,那我们来进行相应的处理。 那我们如何直到需要处理的操作类型呢?这个就要用到我们定义的IOContext类,里面有一个WSAOVERLAPPED的变量和操作类型(参见第3步)。那有如何吧IOContext变量传进来呢?同样参见第三步我们投递AcceptEx请求时传入了一个&ioContext->overLapped参数。我们可以使用PER_IO_CONTEXT这个宏来通过ioContext->overLapped取得ioContext的地址,如此我们便取得操作类型和ioContext中的WSAbuf。数据就存放在WSABuf中。 另外,我们注意到关联socket到完成端口时,我们给CreateIoCompletionPort()函数的第三个参数ULONG_PTR CompletionKey参数传递了listenSockContext变量,我们可以在GetQueuedCompletionStatus的第三个参数取得这个传进来的变量。如此我们就通过完成端口穿进去了两个变量,理解这两个变量的传递时理解完成端口模式的关键,我之前就时卡着这里。 WorkerThreadProc线程中还有一些错误处理函数,自行查看。 5、收到accept通知时调用DoAccept() 在用户收到AcceptEx的完成通知时,需要后续代码并不多,我们把代码放到DoAccept()中:需要做三件事情: - 为新接入的socket分配资源。
- 向新接入的socket投递一个WSARecv请求
- 向监听socket投递继续Accept请求
- 1 BOOL IOCPBase::DoAccpet(SocketContext * sockContext, IOContext * ioContext)
- 2 {
- 3
- 4 InterlockedIncrement(&connectCnt);
- 5 InterlockedDecrement(&acceptPostCnt);
- 6 SOCKADDR_IN *clientAddr = NULL;
- 7 SOCKADDR_IN *localAddr = NULL;
- 8 int clientAddrLen, localAddrLen;
- 9 clientAddrLen = localAddrLen = sizeof(SOCKADDR_IN);
- 10
- 11 // 1. 获取地址信息 (GetAcceptExSockAddrs函数不仅可以获取地址信息,还可以顺便取出第一组数据)
- 12 fnGetAcceptExSockAddrs(ioContext->wsaBuf.buf, 0, localAddrLen, clientAddrLen, (LPSOCKADDR *)&localAddr, &localAddrLen, (LPSOCKADDR *)&clientAddr, &clientAddrLen);
- 13
- 14 // 2. 为新连接建立一个SocketContext
- 15 SocketContext *newSockContext = new SocketContext;
- 16 newSockContext->connSocket = ioContext->ioSocket;
- 17 memcpy_s(&(newSockContext->clientAddr), sizeof(SOCKADDR_IN), clientAddr, sizeof(SOCKADDR_IN));
- 18
- 19 // 3. 将listenSocketContext的IOContext 重置后继续投递AcceptEx
- 20 ioContext->Reset();
- 21 if (false == PostAccept(listenSockContext, ioContext))
- 22 {
- 23 listenSockContext->RemoveContext(ioContext);
- 24 }
- 25
- 26 // 4. 将新socket和完成端口绑定
- 27 if (NULL == CreateIoCompletionPort((HANDLE)newSockContext->connSocket, completionPort, (DWORD)newSockContext, 0))
- 28 {
- 29 DWORD dwErr = WSAGetLastError();
- 30 if (dwErr != ERROR_INVALID_PARAMETER)
- 31 {
- 32 DoClose(newSockContext);
- 33 return false;
- 34 }
- 35 }
- 36
- 37 // 并设置tcp_keepalive
- 38 tcp_keepalive alive_in;
- 39 tcp_keepalive alive_out;
- 40 alive_in.onoff = TRUE;
- 41 alive_in.keepalivetime = 1000 * 60; // 60s 多长时间( ms )没有数据就开始 send 心跳包
- 42 alive_in.keepaliveinterval = 1000 * 10; //10s 每隔多长时间( ms ) send 一个心跳包
- 43 unsigned long ulBytesReturn = 0;
- 44 if (SOCKET_ERROR == WSAIoctl(newSockContext->connSocket, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in), &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL))
- 45 {
- 46 TRACE(L"WSAIoctl failed: %d/n", WSAGetLastError());
- 47 }
- 48
- 49
- 50 OnConnectionEstablished(newSockContext);
- 51
- 52 // 5. 建立recv操作所需的ioContext,在新连接的socket上投递recv请求
- 53 IOContext *newIoContext = newSockContext->GetNewIOContext();
- 54 newIoContext->ioType = RECV_POSTED;
- 55 newIoContext->ioSocket = newSockContext->connSocket;
- 56 // 投递recv请求
- 57 if (false == PostRecv(newSockContext, newIoContext))
- 58 {
- 59 DoClose(sockContext);
- 60 return false;
- 61 }
- 62
- 63 return true;
- 64 }
- DoAccpet
复制代码- 1 BOOL IOCPBase::PostAccept(SocketContext * sockContext, IOContext * ioContext)
- 2 {
- 3 DWORD dwBytes = 0;
- 4 ioContext->ioType = ACCEPT_POSTED;
- 5 ioContext->ioSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- 6 if (INVALID_SOCKET == ioContext->ioSocket)
- 7 {
- 8 return false;
- 9 }
- 10
- 11 // 将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
- 12 if (false == fnAcceptEx(listenSockContext->connSocket, ioContext->ioSocket, ioContext->wsaBuf.buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytes, &ioContext->overLapped))
- 13 {
- 14 if (WSA_IO_PENDING != WSAGetLastError())
- 15 {
- 16 return false;
- 17 }
- 18 }
- 19
- 20 InterlockedIncrement(&acceptPostCnt);
- 21 return true;
- 22 }
- PostAccept
复制代码- 1 BOOL IOCPBase::PostRecv(SocketContext * sockContext, IOContext *ioContext)
- 2 {
- 3 DWORD dwFlags = 0, dwBytes = 0;
- 4 ioContext->Reset();
- 5 ioContext->ioType = RECV_POSTED;
- 6
- 7 int nBytesRecv = WSARecv(ioContext->ioSocket, &ioContext->wsaBuf, 1, &dwBytes, &dwFlags, &ioContext->overLapped, NULL);
- 8 // 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了
- 9 if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
- 10 {
- 11 DoClose(sockContext);
- 12 return false;
- 13 }
- 14 return true;
- 15 }
- PostRecv
复制代码此处要注意理清第4步中说的两个变量的传入。 DoAccept中还调用了OnConnectionEstablished()函数,这是一个虚函数,派生类重载这个函数即可处理连接接入的通知。具体看代码里的例程。 6、收到recv通知时调用DoRecv() 在用户收到recv的完成通知时,需要后续代码并不多,我们把代码放到DoRecv()中:需要做两件事情: - 处理WSABuf中的数据
- 向此socket重新投递一个WSARecv请求
- BOOL IOCPBase::DoRecv(SocketContext * sockContext, IOContext * ioContext)
- {
- OnRecvCompleted(sockContext, ioContext);
- ioContext->Reset();
- if (false == PostRecv(sockContext, ioContext))
- {
- DoClose(sockContext);
- return false;
- }
- return true;
- }
- DoRecv
复制代码此处要注意理清第4步中说的两个变量的传入。 7、关闭完成端口 Worker线程一旦进入了GetQueuedCompletionStatus()的阶段,就会进入睡眠状态,INFINITE的等待完成端口中,如果完成端口上一直都没有已经完成的I/O请求,那么这些线程将无法被唤醒,这也意味着线程没法正常退出。 熟悉或者不熟悉多线程编程的朋友,都应该知道,如果在线程睡眠的时候,简单粗暴的就把线程关闭掉的话,那是会一个很可怕的事情,因为很多线程体内很多资源都来不及释放掉,无论是这些资源最后是否会被操作系统回收,我们作为一个C++程序员来讲,都不应该允许这样的事情出现。 所以我们必须得有一个很优雅的,让线程自己退出的办法。 这时会用到我们这次见到的与完成端口有关的最后一个API,叫 PostQueuedCompletionStatus(),从名字上也能看得出来,这个是和 GetQueuedCompletionStatus() 函数相对的,这个函数的用途就是可以让我们手动的添加一个完成端口I/O操作,这样处于睡眠等待的状态的线程就会有一个被唤醒,如果为我们每一个Worker线程都调用一次PostQueuedCompletionStatus()的话,那么所有的线程也就会因此而被唤醒了。 PostQueuedCompletionStatus()函数的原型是这样定义的: - BOOL WINAPI PostQueuedCompletionStatus(
- __in HANDLE CompletionPort,
- __in DWORD dwNumberOfBytesTransferred,
- __in ULONG_PTR dwCompletionKey,
- __in_opt LPOVERLAPPED lpOverlapped
- );
复制代码 我们可以看到,这个函数的参数几乎和GetQueuedCompletionStatus()的一模一样,都是需要把我们建立的完成端口传进去,然后后面的三个参数是 传输字节数、结构体参数、重叠结构的指针. 注意,这里也有一个很神奇的事情,正常情况下,GetQueuedCompletionStatus()获取回来的参数本来是应该是系统帮我们填充的,或者是在绑定完成端口时就有的,但是我们这里却可以直接使用PostQueuedCompletionStatus()直接将后面三个参数传递给GetQueuedCompletionStatus(),这样就非常方便了。 例如,我们为了能够实现通知线程退出的效果,可以自己定义一些约定,比如把这后面三个参数设置一个特殊的值,然后Worker线程接收到完成通知之后,通过判断这3个参数中是否出现了特殊的值,来决定是否是应该退出线程了。 例如我们在调用的时候,就可以这样: - for (int i = 0; i < workerThreadNum; i++)
- {
- // 通知所有完成端口退出
- PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT_CODE, NULL);
- }
复制代码
|