banner
moeyy

moeyy

一条有远大理想的咸鱼。
github
mastodon
email

Linux Network Programming Series (10) -- Basic Usage of epoll

1. Four IO Models in Network Programming#

  • Blocking IO model, where the default socket is blocking, meaning that IO operations must wait until they are completed before returning;
  • Non-blocking IO model, where IO operations do not wait and return immediately, but require constant polling of the kernel to check if data is ready; if it is ready, the function is called to process the data, using fcntl to set the socket to non-blocking;
  • Multiplexing model, which is event-driven IO, meaning that processing occurs only when an event is detected on a descriptor, typically using select and epoll;
  • Asynchronous IO model, where after initiating an IO operation, it immediately returns to do other tasks, and the kernel waits for the data to be ready, copies the data to user memory, and sends a signal to the user process to notify that the IO operation is complete;

2. epoll Function#

2.1 Two Working Modes of epoll#

2.1.1 LT Mode (also known as Level Triggered Mode, similar to select/poll):#

Completely driven by the kernel; as long as a file descriptor changes, it will continuously notify our application until it is processed.

2.1.2 ET Mode (also known as Edge Triggered Mode, must set the socket to non-blocking):#

In this case, when a file descriptor changes, epoll will notify the application only once and remove the descriptor from the monitoring queue until the application processes that change. If the application does not process it, epoll will not pay attention to that file descriptor again, which may lead to packet loss.
At this point, the application needs to maintain a table of fds, registering the status information obtained from epoll_wait into this table, and then the application can choose to traverse this table to operate on busy fds.

2.1.3 Choosing Between Level Triggered and Edge Triggered Modes#

ET has more requirements for the application, requiring more design from the programmer. LT seems much simpler, but when we require timeout control for fds, LT also needs to traverse fds, making ET a better choice since it is already traversing.
Moreover, since the number of fds returned by epoll_wait each time is limited, in high concurrency scenarios, LT will be very busy, as all fds need to generate status information in its queue, but only a portion of fds can be returned to the application each time.
ET, on the other hand, once epoll_wait returns a set of fds, these fds will be removed from the queue, and only when the fd becomes idle again will it be re-added to the queue. This means that with each return of epoll_wait, the number of fds in the queue decreases, giving ET mode an advantage in high concurrency situations.

2.2 epoll Function Prototype#

2.2.1 epoll_create#

int epoll_create(int size); // Creates an epoll handle, size indicates the number of fds to listen to

Return value:
>0 returns the successfully created epoll handle
-1 failure

2.2.2 epoll_ctl#

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

The event registration function for epoll, registering the types of events to listen to:
Parameter description:

  • epfd epoll_create returns the handle
  • op indicates the action, represented by three macros: EPOLL_CTL_ADD registers a new fd to epfd, EPOLL_CTL_MOD modifies the listening events of an already registered fd, EPOLL_CTL_DEL deletes a fd from epfd.
  • fd indicates the fd to listen to (generally the file descriptor generated by the socket function)
  • event tells the kernel what events to listen for

The struct epoll_event structure is as follows:

struct epoll_event {
 __uint32_t events; // A collection of multiple macros indicating whether the corresponding file descriptor is readable, writable, urgent, etc.
 epoll_data_t data; // A union, detailed introduction below
};
typedef union epoll_data
{
 void *ptr;
 int fd;
 uint32_t u32;
 uint64_t u64;
}epoll_data_t;

2.2.3 epoll_wait#

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

Parameter description:

  • events determines whether to call accept to respond to a connection or call read or write to read/write files based on the events in events
  • maxevents tells the kernel the size of events and cannot exceed the size in epoll_create

Function description:
Waits for events to occur on the socket fd registered in epfd (the file descriptor generated by epoll); if they occur, the socket fd and event type are placed into the events array.
And clears the event types of the socket fd registered in epfd, so if you want to pay attention to this socket fd in the next loop, you need to use epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev) to reset the event types of the socket fd. At this point, you do not need to use EPOLL_CTL_ADD, because the socket fd has not been cleared, only the event types have been cleared. This step is very important. When epoll_wait returns, based on the return value (greater than 0), call accept.

2.3 Implementation of epoll#

2.3.1 Call Process of epoll Function#

socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close

2.3.2 Code Implementation#

First, supplement the CTCP class to set the socket to non-blocking:

int CTcp::SetNoblock (int nSock)
{
    assert (m_nSock != -1);
    int nFlags;

    if ( nSock == -1 )
    {
        nSock = m_nSock;
    }

    if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
        return 0;

    nFlags = nFlags  O_NONBLOCK;

    if (fcntl (nSock, F_SETFL, nFlags) < 0)
        return 0;

    return 1;
}

Then, based on the CTCP class, implement the CEpollServer class, as follows:

//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__

#include "SxTcp.h"

//Tcp class
class CEpollServer
{

//Constructor
public:
    CEpollServer ();
    virtual ~CEpollServer ();

//Public member functions
public:
    int CreateEpoll(const char* szIp, int nPort, int nSize);
    int ProcessEpoll();
    int CloseEpoll();

//Private member variables
private:
    CTcp m_cTcp;
    int m_nEpollFd;
};

#endif
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>

CEpollServer::CEpollServer ()
{
    m_nEpollFd = -1;
}

CEpollServer::~CEpollServer ()
{
    CloseEpoll();
    m_cTcp.Close();
}


/* Create epoll handle
  Input parameters:
  szIp server IP address
  nPort port to bind
  nSize number of file descriptors to listen to
  Output parameters: 1: success; 0: failure
*/
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
    assert(szIp != nullptr);
    int iRet = 0;
    int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
   
    iRet = m_cTcp.Open();
    if ( iRet ==  0 )
    {
        return SOCKET_ERROR;
    }

    iRet = m_cTcp.Bind(szIp, nPort);
    if ( iRet == 0 )
    {
        return BIND_ERROR;
    }

    iRet = m_cTcp.SetNoblock();
    if ( iRet == 0 )
    {
        return SETSOCKOPT_ERROR;
    }

    iRet = m_cTcp.Listen(nSize+1);// The number of listening descriptors should be more than epoll?
    if ( iRet == 0)
    {
        return LISTEN_ERROR;
    }

    if ( m_nEpollFd != -1 )
    {
        CloseEpoll();
    }

    m_nEpollFd = epoll_create(size);
    if ( m_nEpollFd == -1)
    {
        return EPOLL_CREATE_ERROR;
    }

    return 1;
}

/* Process epoll events
  Output parameters: 1: success; 0: failure
*/
int CEpollServer::ProcessEpoll()
{
    assert(m_nEpollFd != -1);
    int nFds = 0;
    int connFd = -1, readFd = -1, writeFd = -1;
    int n = 0, nSize = 0;
    int nListenFd = -1;
    char buf[MAX_READ_SIZE] = {0};
    struct sockaddr_in clientAddr;
    socklen_t clilen;
    struct epoll_event ev, events[20];
    memset((void*)&ev, 0, sizeof(ev));
    nListenFd = m_cTcp.GetHandle();
    ev.data.fd = nListenFd;
    ev.events = EPOLLINEPOLLET;
    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
    {
        return EPOLL_CTL_ERROR;
    }
    while(1)
    {
        n = 0;
        nSize = 0;
        nFds = epoll_wait(m_nEpollFd, events, 20, 500);
        for (int i = 0; i< nFds; ++i)
        {
            memset(buf, 0, MAX_READ_SIZE);
            if (events[i].data.fd == nListenFd )
            {
                while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
                {
                    m_cTcp.SetNoblock(connFd);  // ET mode requires setting to non-blocking
                    ev.data.fd = connFd;
                    ev.events = EPOLLINEPOLLET;
                    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
                    {
                        return EPOLL_CTL_ERROR;
                    }
                }
                if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
                {
                    return ACCEPT_ERROR;
                }
                continue;
            }
            else if(events[i].events & EPOLLIN)
            {
                readFd = events[i].data.fd;
                if (readFd < 0)
                {
                    continue;
                }
                // Read data
                while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
                {
                    n += nSize;
                }
                // EAGAIN indicates end of reading
                if (nSize == -1 && errno != EAGAIN )
                {
                    fprintf(stderr, "epoll read failed\n");
                    //ngleLog::WriteLog(ERROR, "%s", "epoll read failed");
                }

                fprintf(stdout, "read data is:%s\n", buf);
               
                ev.data.fd = readFd;
                ev.events = EPOLLOUTEPOLLET; // Edge Triggered mode (ET)
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
            }
            else if(events[i].events & EPOLLOUT)
            {
                writeFd = events[i].data.fd;
                // Write data
                strncpy(buf, "hello client", sizeof(buf)-1);
                int dataSize = strlen(buf);
                n = dataSize;
                while(n > 0)
                {
                    nSize = write(writeFd, buf + dataSize - n, n);
                    if (nSize < n)
                    {
                        if (nSize == -1 && errno != EAGAIN)
                        {                           
                            break;
                        }
                    }
                    n -= nSize;
                }

                ev.data.fd = writeFd;
                ev.events = EPOLLINEPOLLET;
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
            }
        }
    }
}

/*
Close epoll file descriptor
*/
int CEpollServer::CloseEpoll()
{
    if (m_nEpollFd != -1)
    {
        close (m_nEpollFd);
        m_nEpollFd = -1;
    }
    return 1;

}

Compile the above CEpollServer class and TCP class into a dynamic library, the makefile is as follows:

LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g

%.o:%.cpp
    $(CC) -c $< $(PIC)

network:$(obj)
    $(CC) -o libnetwork.so $^ $(LIBSO)
    cp -f libnetwork.so ../test/lib

clean:
    rm -f *.o *.so

Then implement TestEpollServer.cpp as follows:
Note: The following ConfigIni and SingleLog are libraries I wrote during testing; modifications are needed if you want to use the following code!

#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"

CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"

//epoll server
int epoll_server_init()
{   
    int iRet = -1;
    string strIp;
    int nPort = 0, nEpollNum = 0, nTimeout = 0;
    ConfigIni::Init(string(FILEDIR));
    strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
    if (strIp == "")
    {
        SingleLog::WriteLog(ERROR,"read server addr failed");
        return iRet;
    }

    nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
    if ( nPort == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server port failed");
        return iRet;
    }

    nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
    if ( nEpollNum == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server epoll num failed");
        return iRet;
    }

    nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
    if ( nTimeout == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server timeout failed");
        return iRet;
    }

    iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
    if ( iRet == 0 )
    {
        SingleLog::WriteLog(ERROR, "epoll create failed");
        return -1;
    }
   
    return 0;
}

void epoll_server_run()
{
    g_clEpollServer.ProcessEpoll();
}

int main()
{
    SingleLog::Init();
    if (epoll_server_init() == -1)
    {
        return -1;
    }
    epoll_server_run();
    return 0;
}
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;

int main()
{
    CTcp tcp;
    int iRet = 0;
    int iFd = 0;
    char buf[128] = {0};
   
    iRet = tcp.Open();
    if (iRet == 0)
    {
        perror("socket create failed");
        return -1;
    }

    iRet = tcp.Connect("192.168.233.250", 6666);
    if (iRet == 0)
    {
        perror("socket connect failed");
        return -1;
    }

    while(1)
    {
        memset(buf, 0, sizeof(buf));
        cout << "please input some string:";
        cin >> buf;
        iRet = tcp.Send(buf, strlen(buf));
        if (iRet < -1 && errno != EAGAIN)
        {
            perror("send failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("connect is closed");
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        iRet = tcp.Recv(buf, sizeof(buf));
        if (iRet < 0 && errno != EAGAIN)
        {
            perror("recv failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("socket not connect");
            return -1;
        }

        fprintf(stdout, "recv data is:%s\n", buf);
    }

    return 0;
}

Compile TestEpollServer.cpp and TestClient.cpp separately to generate server and client applications for communication.

All content written by me on JianShu is original, and reproduction requires my consent. JianShu homepage: https://www.jianshu.com/u/e8c7bb5e3257

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.