banner
moeyy

moeyy

一条有远大理想的咸鱼。
github
mastodon
email

linux网络编程系列(十)--epoll的基本使用

1. 网络编程中的四种 IO 模型#

  • 阻塞 IO 模型,默认 socket 都是阻塞的,就是 IO 操作都要等待操作完成以后才能返回;
  • 非阻塞 IO 模型,就是 IO 操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用 fcntl 设置 socket 为非阻塞;
  • 多路复用模型,就是事件驱动 IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是 select 和 epoll;
  • 异步 IO 模型,就是发起 IO 操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知 IO 操作已完成;

2. epoll 函数#

2.1 epoll 的两种工作模式#

2.1.1 LT 模式 (又叫水平模式,类似于 select/poll):#

完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。

2.1.2 ET 模式 (又叫边缘触发模式,必须将 socket 设置为非阻塞):#

此种情况下,当文件描述符有变化时,epoll 只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那 epoll 就不会再次关注该文件描述符,这样其实可能会造成丢包的。
这时应用程序需要自己维护一张 fds 的表格,把从 epoll_wait 得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的 fds 进行操作。

2.1.3 水平模式和边沿模式的选择#

ET 比 LT 对应用程序的要求更多,需要程序员设计的部分也更多,看上去 LT 好像要简单很多,但是当我们要求对 fd 有超时控制时,LT 也同样需要对 fds 进行遍历,此时不如使用本来就要遍历的 ET。
而且由于 epoll_wait 每次返回的 fds 数量是有限的,在大并发的模式下,LT 将非常的繁忙,所有的 fds 都要在它的队列中产生状态信息,而每次只有一部分 fds 能返回给应用程序。
而 ET 只要 epoll_wait 返回一次 fds 之后,这些 fds 就会从队列中删除,只有当 fd 重新变为空闲状态时才重新加入到队列中,这就是说,随着 epoll_wait 的返回,队列中的 fds 是在减少的,这样在大并发的情况下,ET 模式将会更加具有优势。

2.2 epoll 函数原型#

2.2.1 epoll_create#

int epoll_create(int size); //创建一个epoll的句柄,size用来告诉内核要监听的数目

返回值:
>0 返回创建成功的 epoll 句柄
-1 失败

2.2.2 epoll_ctl#

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll 的事件注册函数, 注册要监听的事件类型:
参数说明:

  • epfd epoll_create 返回的句柄
  • op 表示动作,用 3 个宏表示:EPOLL_CTL_ADD 注册新的 fd 到 epfd 中,EPOLL_CTL_MOD,修改已经注册的 fd 的监听事件,EPOLL_CTL_DEL,从 epfd 中删除一个 fd。
  • fd 表示要监听的 fd (一般就是 socket 函数生成的文件描述符)
  • event 告诉内核要监听什么事

struct epoll_event 结构如下:

struct epoll_event {
 __uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等
 epoll_data_t data; //一个联合体,详细介绍见下面
};
typedef union epoll_data
{
 void *ptr;
 int fd;
 uint32_t u32;
 uint64_t u64;
}epoll_data_t;

2.2.3 epoll_wait#

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

参数说明如下:

  • events 根据 events 中事件,决定是调用 accept 回应一个连接,还是调用 read 或者 write 读写文件
  • maxevents 告诉内核这个 events 多大,并且不能大于 epoll_create 中的 size

功能说明:
等侍注册在 epfd (epoll 生成的文件描述符) 上的 socket fd 的事件的发生,如果发生则将发生的 sokct fd 和事件类型放入到 events 数组中。
并 且将注册在 epfd 上的 socket fd 的事件类型给清空,所以如果下一个循环你还要关注这个 socket fd 的话,则需要用 epoll_ctl (epfd,EPOLL_CTL_MOD,listenfd,&ev) 来重新设置 socket fd 的事件类型。这时不用 EPOLL_CTL_ADD, 因为 socket fd 并未清空,只是事件类型清空。这一步非常重要。当 epoll_wait 返回时根据返回值 (大于 0) 调用 accept。

2.3 epoll 的实现#

2.3.1 epoll 函数调用过程#

socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close

2.3.2 代码实现#

首先对 CTCP 类做一下补充,将 socket 设置为非阻塞:

int CTcp::SetNoblock (int nSock)
{
    assert (m_nSock != -1);
    int nFlags;

    if ( nSock == -1 )
    {
        nSock = m_nSock;
    }

    if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
        return 0;

    nFlags = nFlags  O_NONBLOCK;

    if (fcntl (nSock, F_SETFL, nFlags) < 0)
        return 0;

    return 1;
}

然后基于 CTCP 类,实现 CEpollServer 类,代码如下:

//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__

#include "SxTcp.h"

//Tcp类
class CEpollServer
{

//构造函数
public:
    CEpollServer ();
    virtual ~CEpollServer ();

//公有成员函数
public:
    int CreateEpoll(const char* szIp, int nPort, int nSize);
    int ProcessEpoll();
    int CloseEpoll();

//私有成员变量
private:
    CTcp m_cTcp;
    int m_nEpollFd;
};

#endif
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>

CEpollServer::CEpollServer ()
{
    m_nEpollFd = -1;
}

CEpollServer::~CEpollServer ()
{
    CloseEpoll();
    m_cTcp.Close();
}


/*创建epoll句柄
  入参:
  szIp 服务器ip地址
  nPort 要绑定的端口
  nSize 要监听的文件描述符数量
  出参:1: 成功 ; 0: 失败
*/
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
    assert(szIp != nullptr);
    int iRet = 0;
    int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
   
    iRet = m_cTcp.Open();
    if ( iRet ==  0 )
    {
        return SOCKET_ERROR;
    }

    iRet = m_cTcp.Bind(szIp, nPort);
    if ( iRet == 0 )
    {
        return BIND_ERROR;
    }

    iRet = m_cTcp.SetNoblock();
    if ( iRet == 0 )
    {
        return SETSOCKOPT_ERROR;
    }

    iRet = m_cTcp.Listen(nSize+1);//监听描述符数量要比epoll的多?
    if ( iRet == 0)
    {
        return LISTEN_ERROR;
    }

    if ( m_nEpollFd != -1 )
    {
        CloseEpoll();
    }

    m_nEpollFd = epoll_create(size);
    if ( m_nEpollFd == -1)
    {
        return EPOLL_CREATE_ERROR;
    }

    return 1;
}

/*处理epoll事件
  出参:1: 成功 ; 0: 失败
*/
int CEpollServer::ProcessEpoll()
{
    assert(m_nEpollFd != -1);
    int nFds = 0;
    int connFd = -1, readFd = -1, writeFd = -1;
    int n = 0, nSize = 0;
    int nListenFd = -1;
    char buf[MAX_READ_SIZE] = {0};
    struct sockaddr_in clientAddr;
    socklen_t clilen;
    struct epoll_event ev, events[20];
    memset((void*)&ev, 0, sizeof(ev));
    nListenFd = m_cTcp.GetHandle();
    ev.data.fd = nListenFd;
    ev.events = EPOLLINEPOLLET;
    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
    {
        return EPOLL_CTL_ERROR;
    }
    while(1)
    {
        n = 0;
        nSize = 0;
        nFds = epoll_wait(m_nEpollFd, events, 20, 500);
        for (int i = 0; i< nFds; ++i)
        {
            memset(buf, 0, MAX_READ_SIZE);
            if (events[i].data.fd == nListenFd )
            {
                while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
                {
                    m_cTcp.SetNoblock(connFd);  //ET模式需设置为非阻塞的
                    ev.data.fd = connFd;
                    ev.events = EPOLLINEPOLLET;
                    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
                    {
                        return EPOLL_CTL_ERROR;
                    }
                }
                if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
                {
                    return ACCEPT_ERROR;
                }
                continue;
            }
            else if(events[i].events & EPOLLIN)
            {
                readFd = events[i].data.fd;
                if (readFd < 0)
                {
                    continue;
                }
                //读取数据
                while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
                {
                    n += nSize;
                }
                //EAGAIN说明读到结尾了
                if (nSize == -1 && errno != EAGAIN )
                {
                    fprintf(stderr, "epoll read failedn");
                    //ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
                }

                fprintf(stdout, "read data is:%sn", buf);
               
                ev.data.fd = readFd;
                ev.events = EPOLLOUTEPOLLET;//边沿模式(ET)
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
            }
            else if(events[i].events & EPOLLOUT)
            {
                writeFd = events[i].data.fd;
                //写数据
                strncpy(buf, "hello client", sizeof(buf)-1);
                int dataSize = strlen(buf);
                n = dataSize;
                while(n > 0)
                {
                    nSize = write(writeFd, buf + dataSize - n, n);
                    if (nSize < n)
                    {
                        if (nSize == -1 && errno != EAGAIN)
                        {                           
                            break;
                        }
                    }
                    n -= nSize;
                }

                ev.data.fd = writeFd;
                ev.events = EPOLLINEPOLLET;
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
            }
        }
    }
}

/*
关闭epoll文件描述符
*/
int CEpollServer::CloseEpoll()
{
    if (m_nEpollFd != -1)
    {
        close (m_nEpollFd);
        m_nEpollFd = -1;
    }
    return 1;

}

将上面 CEpollServer 类和 TCP 类结合起来编译成一个动态库,makefile 如下:

LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g

%.o:%.cpp
    $(CC) -c $< $(PIC)

network:$(obj)
    $(CC) -o libnetwork.so $^ $(LIBSO)
    cp -f libnetwork.so ../test/lib

clean:
    rm -f *.o *.so

然后实现 TestEpollServer.cpp,如下:
注意:下面 ConfigIni 和 SingleLog 都是我本人测试时候写的库,如需使用下面代码,需要修改!

#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"

CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"

//epoll server
int epoll_server_init()
{   
    int iRet = -1;
    string strIp;
    int nPort = 0, nEpollNum = 0, nTimeout = 0;
    ConfigIni::Init(string(FILEDIR));
    strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
    if (strIp == "")
    {
        SingleLog::WriteLog(ERROR,"read server addr failed");
        return iRet;
    }

    nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
    if ( nPort == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server port failed");
        return iRet;
    }

    nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
    if ( nEpollNum == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server epoll num failed");
        return iRet;
    }

    nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
    if ( nTimeout == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server timeout failed");
        return iRet;
    }

    iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
    if ( iRet == 0 )
    {
        SingleLog::WriteLog(ERROR, "epoll create failed");
        return -1;
    }
   
    return 0;
}

void epoll_server_run()
{
    g_clEpollServer.ProcessEpoll();
}

int main()
{
    SingleLog::Init();
    if (epoll_server_init() == -1)
    {
        return -1;
    }
    epoll_server_run();
    return 0;
}
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;

int main()
{
    CTcp tcp;
    int iRet = 0;
    int iFd = 0;
    char buf[128] = {0};
   
    iRet = tcp.Open();
    if (iRet == 0)
    {
        perror("socket create failed");
        return -1;
    }

    iRet = tcp.Connect("192.168.233.250", 6666);
    if (iRet == 0)
    {
        perror("socket connect failed");
        return -1;
    }

    while(1)
    {
        memset(buf, 0, sizeof(buf));
        cout << "please input some string:";
        cin >> buf;
        iRet = tcp.Send(buf, strlen(buf));
        if (iRet < -1 && errno != EAGAIN)
        {
            perror("send failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("connect is closed");
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        iRet = tcp.Recv(buf, sizeof(buf));
        if (iRet < 0 && errno != EAGAIN)
        {
            perror("recv failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("socket not connect");
            return -1;
        }

        fprintf(stdout, "recv data is:%sn", buf);
    }

    return 0;
}

分别编译 TestEpollServer.cpp 和 TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。

本人在简书上写的内容均为本人原创,转载需经本人同意。简书主页:https://www.jianshu.com/u/e8c7bb5e3257

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。