1. 網路編程中的四種 IO 模型#
- 阻塞 IO 模型,預設 socket 都是阻塞的,就是 IO 操作都要等待操作完成以後才能返回;
- 非阻塞 IO 模型,就是 IO 操作時不等待,立即返回,但需要不斷的去詢問內核,數據是否準備好了,如果準備好了,就主動調用函數去處理數據,使用 fcntl 設置 socket 為非阻塞;
- 多路復用模型,就是事件驅動 IO,也就是說檢測到描述符上發生了事件,才去處理,典型的就是 select 和 epoll;
- 非同步 IO 模型,就是發起 IO 操作後,立即返回去做其他的事,然後內核會等待數據準備完成後,將數據拷貝到用戶內存中,並給用戶進程發送一個信號,告知 IO 操作已完成;
2. epoll 函數#
2.1 epoll 的兩種工作模式#
2.1.1 LT 模式 (又叫水平模式,類似於 select/poll):#
完全靠內核驅動,只要某個文件描述符有變化,就會一直通知我們的應用程序,直到處理完畢為止。
2.1.2 ET 模式 (又叫邊緣觸發模式,必須將 socket 設置為非阻塞):#
此種情況下,當文件描述符有變化時,epoll 只會通知應用程序一次,並將描述符從監視隊列中清除,直到應用程序處理完該次變化,如果應用程序沒有處理,那 epoll 就不會再次關注該文件描述符,這樣其實可能會造成丟包的。
這時應用程序需要自己維護一張 fds 的表格,把從 epoll_wait 得到的狀態信息登記到這張表格,然後應用程序可以選擇遍歷這張表格,對處於忙碌狀態的 fds 進行操作。
2.1.3 水平模式和邊沿模式的選擇#
ET 比 LT 對應用程序的要求更多,需要程序員設計的部分也更多,看上去 LT 好像要簡單很多,但是當我們要求對 fd 有超時控制時,LT 也同樣需要對 fds 進行遍歷,此時不如使用本來就要遍歷的 ET。
而且由於 epoll_wait 每次返回的 fds 數量是有限的,在大並發的模式下,LT 將非常的繁忙,所有的 fds 都要在它的隊列中產生狀態信息,而每次只有一部分 fds 能返回給應用程序。
而 ET 只要 epoll_wait 返回一次 fds 之後,這些 fds 就會從隊列中刪除,只有當 fd 重新變為空閒狀態時才重新加入到隊列中,這就是說,隨著 epoll_wait 的返回,隊列中的 fds 是在減少的,這樣在大並發的情況下,ET 模式將會更加具有優勢。
2.2 epoll 函數原型#
2.2.1 epoll_create#
int epoll_create(int size); //創建一個epoll的句柄,size用來告訴內核要監聽的數目
返回值:
>0 返回創建成功的 epoll 句柄
-1 失敗
2.2.2 epoll_ctl#
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll 的事件註冊函數, 註冊要監聽的事件類型:
參數說明:
- epfd epoll_create 返回的句柄
- op 表示動作,用 3 個宏表示:EPOLL_CTL_ADD 註冊新的 fd 到 epfd 中,EPOLL_CTL_MOD,修改已經註冊的 fd 的監聽事件,EPOLL_CTL_DEL,從 epfd 中刪除一個 fd。
- fd 表示要監聽的 fd (一般就是 socket 函數生成的文件描述符)
- event 告訴內核要監聽什麼事
struct epoll_event 結構如下:
struct epoll_event {
__uint32_t events; //多個宏的集合,表示對應文件描述符可讀、可寫、緊急可讀等等
epoll_data_t data; //一個聯合體,詳細介紹見下面
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
2.2.3 epoll_wait#
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
參數說明如下:
- events 根據 events 中事件,決定是調用 accept 回應一個連接,還是調用 read 或者 write 讀寫文件
- maxevents 告訴內核這個 events 多大,並且不能大於 epoll_create 中的 size
功能說明:
等待註冊在 epfd (epoll 生成的文件描述符) 上的 socket fd 的事件的發生,如果發生則將發生的 socket fd 和事件類型放入到 events 數組中。
並且將註冊在 epfd 上的 socket fd 的事件類型給清空,所以如果下個循環你還要關注這個 socket fd 的話,則需要用 epoll_ctl (epfd,EPOLL_CTL_MOD,listenfd,&ev) 來重新設置 socket fd 的事件類型。這時不用 EPOLL_CTL_ADD, 因為 socket fd 並未清空,只是事件類型清空。這一步非常重要。當 epoll_wait 返回時根據返回值 (大於 0) 調用 accept。
2.3 epoll 的實現#
2.3.1 epoll 函數調用過程#
socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close
2.3.2 代碼實現#
首先對 CTCP 類做一下補充,將 socket 設置為非阻塞:
int CTcp::SetNoblock (int nSock)
{
assert (m_nSock != -1);
int nFlags;
if ( nSock == -1 )
{
nSock = m_nSock;
}
if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
return 0;
nFlags = nFlags O_NONBLOCK;
if (fcntl (nSock, F_SETFL, nFlags) < 0)
return 0;
return 1;
}
然後基於 CTCP 類,實現 CEpollServer 類,代碼如下:
//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__
#include "SxTcp.h"
//Tcp類
class CEpollServer
{
//構造函數
public:
CEpollServer ();
virtual ~CEpollServer ();
//公有成員函數
public:
int CreateEpoll(const char* szIp, int nPort, int nSize);
int ProcessEpoll();
int CloseEpoll();
//私有成員變量
private:
CTcp m_cTcp;
int m_nEpollFd;
};
#endif
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
CEpollServer::CEpollServer ()
{
m_nEpollFd = -1;
}
CEpollServer::~CEpollServer ()
{
CloseEpoll();
m_cTcp.Close();
}
/*創建epoll句柄
入參:
szIp 服務器ip地址
nPort 要綁定的端口
nSize 要監聽的文件描述符數量
出參:1: 成功 ; 0: 失敗
*/
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
assert(szIp != nullptr);
int iRet = 0;
int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
iRet = m_cTcp.Open();
if ( iRet == 0 )
{
return SOCKET_ERROR;
}
iRet = m_cTcp.Bind(szIp, nPort);
if ( iRet == 0 )
{
return BIND_ERROR;
}
iRet = m_cTcp.SetNoblock();
if ( iRet == 0 )
{
return SETSOCKOPT_ERROR;
}
iRet = m_cTcp.Listen(nSize+1);//監聽描述符數量要比epoll的多?
if ( iRet == 0)
{
return LISTEN_ERROR;
}
if ( m_nEpollFd != -1 )
{
CloseEpoll();
}
m_nEpollFd = epoll_create(size);
if ( m_nEpollFd == -1)
{
return EPOLL_CREATE_ERROR;
}
return 1;
}
/*處理epoll事件
出參:1: 成功 ; 0: 失敗
*/
int CEpollServer::ProcessEpoll()
{
assert(m_nEpollFd != -1);
int nFds = 0;
int connFd = -1, readFd = -1, writeFd = -1;
int n = 0, nSize = 0;
int nListenFd = -1;
char buf[MAX_READ_SIZE] = {0};
struct sockaddr_in clientAddr;
socklen_t clilen;
struct epoll_event ev, events[20];
memset((void*)&ev, 0, sizeof(ev));
nListenFd = m_cTcp.GetHandle();
ev.data.fd = nListenFd;
ev.events = EPOLLINEPOLLET;
if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
{
return EPOLL_CTL_ERROR;
}
while(1)
{
n = 0;
nSize = 0;
nFds = epoll_wait(m_nEpollFd, events, 20, 500);
for (int i = 0; i< nFds; ++i)
{
memset(buf, 0, MAX_READ_SIZE);
if (events[i].data.fd == nListenFd )
{
while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
{
m_cTcp.SetNoblock(connFd); //ET模式需設置為非阻塞的
ev.data.fd = connFd;
ev.events = EPOLLINEPOLLET;
if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
{
return EPOLL_CTL_ERROR;
}
}
if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
{
return ACCEPT_ERROR;
}
continue;
}
else if(events[i].events & EPOLLIN)
{
readFd = events[i].data.fd;
if (readFd < 0)
{
continue;
}
//讀取數據
while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
{
n += nSize;
}
//EAGAIN說明讀到結尾了
if (nSize == -1 && errno != EAGAIN )
{
fprintf(stderr, "epoll read failedn");
//ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
}
fprintf(stdout, "read data is:%sn", buf);
ev.data.fd = readFd;
ev.events = EPOLLOUTEPOLLET;//邊沿模式(ET)
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
}
else if(events[i].events & EPOLLOUT)
{
writeFd = events[i].data.fd;
//寫數據
strncpy(buf, "hello client", sizeof(buf)-1);
int dataSize = strlen(buf);
n = dataSize;
while(n > 0)
{
nSize = write(writeFd, buf + dataSize - n, n);
if (nSize < n)
{
if (nSize == -1 && errno != EAGAIN)
{
break;
}
}
n -= nSize;
}
ev.data.fd = writeFd;
ev.events = EPOLLINEPOLLET;
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
}
}
}
}
/*
關閉epoll文件描述符
*/
int CEpollServer::CloseEpoll()
{
if (m_nEpollFd != -1)
{
close (m_nEpollFd);
m_nEpollFd = -1;
}
return 1;
}
將上面 CEpollServer 類和 TCP 類結合起來編譯成一個動態庫,makefile 如下:
LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g
%.o:%.cpp
$(CC) -c $< $(PIC)
network:$(obj)
$(CC) -o libnetwork.so $^ $(LIBSO)
cp -f libnetwork.so ../test/lib
clean:
rm -f *.o *.so
然後實現 TestEpollServer.cpp,如下:
注意:下面 ConfigIni 和 SingleLog 都是我本人測試時候寫的庫,如需使用下面代碼,需要修改!
#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"
CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"
//epoll server
int epoll_server_init()
{
int iRet = -1;
string strIp;
int nPort = 0, nEpollNum = 0, nTimeout = 0;
ConfigIni::Init(string(FILEDIR));
strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
if (strIp == "")
{
SingleLog::WriteLog(ERROR,"read server addr failed");
return iRet;
}
nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
if ( nPort == -1 )
{
SingleLog::WriteLog(ERROR,"read server port failed");
return iRet;
}
nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
if ( nEpollNum == -1 )
{
SingleLog::WriteLog(ERROR,"read server epoll num failed");
return iRet;
}
nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
if ( nTimeout == -1 )
{
SingleLog::WriteLog(ERROR,"read server timeout failed");
return iRet;
}
iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
if ( iRet == 0 )
{
SingleLog::WriteLog(ERROR, "epoll create failed");
return -1;
}
return 0;
}
void epoll_server_run()
{
g_clEpollServer.ProcessEpoll();
}
int main()
{
SingleLog::Init();
if (epoll_server_init() == -1)
{
return -1;
}
epoll_server_run();
return 0;
}
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;
int main()
{
CTcp tcp;
int iRet = 0;
int iFd = 0;
char buf[128] = {0};
iRet = tcp.Open();
if (iRet == 0)
{
perror("socket create failed");
return -1;
}
iRet = tcp.Connect("192.168.233.250", 6666);
if (iRet == 0)
{
perror("socket connect failed");
return -1;
}
while(1)
{
memset(buf, 0, sizeof(buf));
cout << "please input some string:";
cin >> buf;
iRet = tcp.Send(buf, strlen(buf));
if (iRet < -1 && errno != EAGAIN)
{
perror("send failed");
return -1;
}
else if(iRet == 0)
{
perror("connect is closed");
return -1;
}
memset(buf, 0, sizeof(buf));
iRet = tcp.Recv(buf, sizeof(buf));
if (iRet < 0 && errno != EAGAIN)
{
perror("recv failed");
return -1;
}
else if(iRet == 0)
{
perror("socket not connect");
return -1;
}
fprintf(stdout, "recv data is:%sn", buf);
}
return 0;
}
分別編譯 TestEpollServer.cpp 和 TestClient.cpp,生成服務端和客戶端應用程序,即可實現通信。
本人在簡書上寫的內容均為本人創作,轉載需經本人同意。簡書首頁:https://www.jianshu.com/u/e8c7bb5e3257