banner
moeyy

moeyy

一条有远大理想的咸鱼。
github
mastodon
email

linuxネットワークプログラミングシリーズ(10)--epollの基本的な使用法

1. ネットワークプログラミングにおける 4 つの IO モデル#

  • ブロッキング IO モデル、デフォルトのソケットはすべてブロッキングであり、IO 操作は操作が完了するまで待つ必要があります;
  • ノンブロッキング IO モデル、IO 操作中に待たずにすぐに戻りますが、カーネルにデータが準備できているかどうかを繰り返し確認する必要があります。準備ができていれば、関数を呼び出してデータを処理します。fcntl を使用してソケットをノンブロッキングに設定します;
  • マルチプレクシングモデル、すなわちイベント駆動 IO、つまり、記述子上でイベントが発生したときにのみ処理を行います。典型的なものは select と epoll です;
  • 非同期 IO モデル、IO 操作を開始した後、すぐに他の作業を行い、カーネルはデータの準備が完了するのを待ち、データをユーザーメモリにコピーし、ユーザープロセスに IO 操作が完了したことを通知する信号を送ります;

2. epoll 関数#

2.1 epoll の 2 つの動作モード#

2.1.1 LT モード(水平モードとも呼ばれ、select/poll に似ています):#

完全にカーネル駆動であり、特定のファイル記述子に変化がある限り、アプリケーションに通知し続けます。処理が完了するまで通知されます。

2.1.2 ET モード(エッジトリガーモードとも呼ばれ、ソケットをノンブロッキングに設定する必要があります):#

この場合、ファイル記述子に変化があると、epoll はアプリケーションに一度だけ通知し、記述子を監視キューから削除します。アプリケーションがその変化を処理するまで、epoll はそのファイル記述子に再度注目しません。これにより、パケットが失われる可能性があります。
このとき、アプリケーションは自分で fds の表を維持し、epoll_wait から得た状態情報をこの表に登録し、アプリケーションはこの表を遍歴して忙しい状態の fds に対して操作を行うことができます。

2.1.3 水平モードとエッジモードの選択#

ET は LT よりもアプリケーションに対する要求が多く、プログラマーが設計する部分も多くなります。LT は簡単に見えますが、fd にタイムアウト制御を要求する場合、LT も fds を遍歴する必要があります。この時、元々遍歴する必要がある ET を使用する方が良いです。
また、epoll_wait が毎回返す fds の数は限られているため、大規模な同時接続のモードでは LT は非常に忙しくなり、すべての fds がそのキュー内で状態情報を生成し、毎回一部の fds のみがアプリケーションに返されます。
ET は epoll_wait が一度 fds を返すと、これらの fds はキューから削除され、fd が再びアイドル状態になるまでキューに再度追加されません。これは、epoll_wait の返却に伴い、キュー内の fds が減少することを意味します。このため、大規模な同時接続の状況下では、ET モードがより有利になります。

2.2 epoll 関数のプロトタイプ#

2.2.1 epoll_create#

int epoll_create(int size); //epollのハンドルを作成します。sizeはカーネルにリッスンする数を知らせるために使用されます

返り値:
>0 作成成功の epoll ハンドルを返します
-1 失敗

2.2.2 epoll_ctl#

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll のイベント登録関数、リッスンするイベントタイプを登録します:
パラメータの説明:

  • epfd epoll_create が返すハンドル
  • op アクションを示し、3 つのマクロで表されます:EPOLL_CTL_ADD 新しい fd を epfd に登録、EPOLL_CTL_MOD、既に登録された fd のリッスンイベントを変更、EPOLL_CTL_DEL、epfd から fd を削除します。
  • fd リッスンする fd(一般的にはソケット関数が生成したファイル記述子)
  • event カーネルに何をリッスンするかを知らせます

struct epoll_event 構造は以下のようになります:

struct epoll_event {
 __uint32_t events; //複数のマクロの集合、対応するファイル記述子が読み取り可能、書き込み可能、緊急読み取り可能などを示します
 epoll_data_t data; //共用体、詳細は以下を参照
};
typedef union epoll_data
{
 void *ptr;
 int fd;
 uint32_t u32;
 uint64_t u64;
}epoll_data_t;

2.2.3 epoll_wait#

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

パラメータの説明は以下の通りです:

  • events events 内のイベントに基づいて、accept を呼び出して接続に応答するか、read または write を呼び出してファイルを読み書きするかを決定します
  • maxevents カーネルにこの events のサイズを知らせ、epoll_create 内の size を超えてはいけません

機能説明:
epfd(epoll によって生成されたファイル記述子)に登録されたソケット fd のイベントの発生を待ちます。発生した場合、発生したソケット fd とイベントタイプを events 配列に格納します。
また、epfd に登録されたソケット fd のイベントタイプをクリアします。したがって、次のループでこのソケット fd に再度注目する必要がある場合は、epoll_ctl (epfd,EPOLL_CTL_MOD,listenfd,&ev) を使用してソケット fd のイベントタイプを再設定する必要があります。この時、EPOLL_CTL_ADD は不要です。なぜなら、ソケット fd はクリアされておらず、単にイベントタイプがクリアされただけだからです。このステップは非常に重要です。epoll_wait が返ると、返り値(0 より大きい)に基づいて accept を呼び出します。

2.3 epoll の実装#

2.3.1 epoll 関数の呼び出しプロセス#

socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close

2.3.2 コード実装#

まず CTCP クラスに補足し、ソケットをノンブロッキングに設定します:

int CTcp::SetNoblock (int nSock)
{
    assert (m_nSock != -1);
    int nFlags;

    if ( nSock == -1 )
    {
        nSock = m_nSock;
    }

    if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
        return 0;

    nFlags = nFlags  O_NONBLOCK;

    if (fcntl (nSock, F_SETFL, nFlags) < 0)
        return 0;

    return 1;
}

次に CTCP クラスに基づいて CEpollServer クラスを実装します。コードは以下の通りです:

//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__

#include "SxTcp.h"

//Tcpクラス
class CEpollServer
{

//コンストラクタ
public:
    CEpollServer ();
    virtual ~CEpollServer ();

//公開メンバ関数
public:
    int CreateEpoll(const char* szIp, int nPort, int nSize);
    int ProcessEpoll();
    int CloseEpoll();

//プライベートメンバ変数
private:
    CTcp m_cTcp;
    int m_nEpollFd;
};

#endif
#include "EpollServer.h"
#include <sys/epoll.h>
#include "TypeError.h"
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>

CEpollServer::CEpollServer ()
{
    m_nEpollFd = -1;
}

CEpollServer::~CEpollServer ()
{
    CloseEpoll();
    m_cTcp.Close();
}


/*epollハンドルを作成
  入力パラメータ:
  szIp サーバーのIPアドレス
  nPort バインドするポート
  nSize リッスンするファイル記述子の数
  出力パラメータ:1: 成功 ; 0: 失敗
*/
int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
{
    assert(szIp != nullptr);
    int iRet = 0;
    int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
   
    iRet = m_cTcp.Open();
    if ( iRet ==  0 )
    {
        return SOCKET_ERROR;
    }

    iRet = m_cTcp.Bind(szIp, nPort);
    if ( iRet == 0 )
    {
        return BIND_ERROR;
    }

    iRet = m_cTcp.SetNoblock();
    if ( iRet == 0 )
    {
        return SETSOCKOPT_ERROR;
    }

    iRet = m_cTcp.Listen(nSize+1);//リッスンする記述子の数はepollより多くする必要がありますか?
    if ( iRet == 0)
    {
        return LISTEN_ERROR;
    }

    if ( m_nEpollFd != -1 )
    {
        CloseEpoll();
    }

    m_nEpollFd = epoll_create(size);
    if ( m_nEpollFd == -1)
    {
        return EPOLL_CREATE_ERROR;
    }

    return 1;
}

/*epollイベントを処理
  出力パラメータ:1: 成功 ; 0: 失敗
*/
int CEpollServer::ProcessEpoll()
{
    assert(m_nEpollFd != -1);
    int nFds = 0;
    int connFd = -1, readFd = -1, writeFd = -1;
    int n = 0, nSize = 0;
    int nListenFd = -1;
    char buf[MAX_READ_SIZE] = {0};
    struct sockaddr_in clientAddr;
    socklen_t clilen;
    struct epoll_event ev, events[20];
    memset((void*)&ev, 0, sizeof(ev));
    nListenFd = m_cTcp.GetHandle();
    ev.data.fd = nListenFd;
    ev.events = EPOLLINEPOLLET;
    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
    {
        return EPOLL_CTL_ERROR;
    }
    while(1)
    {
        n = 0;
        nSize = 0;
        nFds = epoll_wait(m_nEpollFd, events, 20, 500);
        for (int i = 0; i< nFds; ++i)
        {
            memset(buf, 0, MAX_READ_SIZE);
            if (events[i].data.fd == nListenFd )
            {
                while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
                {
                    m_cTcp.SetNoblock(connFd);  //ETモードではノンブロッキングに設定する必要があります
                    ev.data.fd = connFd;
                    ev.events = EPOLLINEPOLLET;
                    if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
                    {
                        return EPOLL_CTL_ERROR;
                    }
                }
                if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
                {
                    return ACCEPT_ERROR;
                }
                continue;
            }
            else if(events[i].events & EPOLLIN)
            {
                readFd = events[i].data.fd;
                if (readFd < 0)
                {
                    continue;
                }
                //データを読み取る
                while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 )
                {
                    n += nSize;
                }
                //EAGAINは読み取りの終わりを示します
                if (nSize == -1 && errno != EAGAIN )
                {
                    fprintf(stderr, "epoll read failedn");
                    //ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
                }

                fprintf(stdout, "read data is:%sn", buf);
               
                ev.data.fd = readFd;
                ev.events = EPOLLOUTEPOLLET;//エッジモード(ET)
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
            }
            else if(events[i].events & EPOLLOUT)
            {
                writeFd = events[i].data.fd;
                //データを書き込む
                strncpy(buf, "hello client", sizeof(buf)-1);
                int dataSize = strlen(buf);
                n = dataSize;
                while(n > 0)
                {
                    nSize = write(writeFd, buf + dataSize - n, n);
                    if (nSize < n)
                    {
                        if (nSize == -1 && errno != EAGAIN)
                        {                           
                            break;
                        }
                    }
                    n -= nSize;
                }

                ev.data.fd = writeFd;
                ev.events = EPOLLINEPOLLET;
                epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
            }
        }
    }
}

/*
epollファイル記述子を閉じる
*/
int CEpollServer::CloseEpoll()
{
    if (m_nEpollFd != -1)
    {
        close (m_nEpollFd);
        m_nEpollFd = -1;
    }
    return 1;

}

上記の CEpollServer クラスと TCP クラスを組み合わせて動的ライブラリをコンパイルします。makefile は以下の通りです:

LIB_DIR=./lib
src=$(wildcard *.cpp)
obj=$(patsubst %.cpp,%.o,$(src))
PIC=-fPIC
LIBSO=-shared
#CC=g++ -gdwarf-2 -gstrict-dwarf
CC=g++ -g

%.o:%.cpp
    $(CC) -c $< $(PIC)

network:$(obj)
    $(CC) -o libnetwork.so $^ $(LIBSO)
    cp -f libnetwork.so ../test/lib

clean:
    rm -f *.o *.so

次に TestEpollServer.cpp を実装します。以下の通りです:
注意:以下の ConfigIni と SingleLog は私がテスト時に作成したライブラリです。以下のコードを使用するには、修正が必要です!

#include "../../readini/ConfigIni.h"
#include <string>
#include "../../network/EpollServer.h"
#include "../../log/SingleLog.h"

CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"

//epollサーバー
int epoll_server_init()
{   
    int iRet = -1;
    string strIp;
    int nPort = 0, nEpollNum = 0, nTimeout = 0;
    ConfigIni::Init(string(FILEDIR));
    strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
    if (strIp == "")
    {
        SingleLog::WriteLog(ERROR,"read server addr failed");
        return iRet;
    }

    nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
    if ( nPort == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server port failed");
        return iRet;
    }

    nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
    if ( nEpollNum == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server epoll num failed");
        return iRet;
    }

    nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
    if ( nTimeout == -1 )
    {
        SingleLog::WriteLog(ERROR,"read server timeout failed");
        return iRet;
    }

    iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
    if ( iRet == 0 )
    {
        SingleLog::WriteLog(ERROR, "epoll create failed");
        return -1;
    }
   
    return 0;
}

void epoll_server_run()
{
    g_clEpollServer.ProcessEpoll();
}

int main()
{
    SingleLog::Init();
    if (epoll_server_init() == -1)
    {
        return -1;
    }
    epoll_server_run();
    return 0;
}
//TestClient.cpp
#include <stdio.h>
#include <iostream>
#include <string.h>
#include "../../network/SxTcp.h"
using namespace std;

int main()
{
    CTcp tcp;
    int iRet = 0;
    int iFd = 0;
    char buf[128] = {0};
   
    iRet = tcp.Open();
    if (iRet == 0)
    {
        perror("socket create failed");
        return -1;
    }

    iRet = tcp.Connect("192.168.233.250", 6666);
    if (iRet == 0)
    {
        perror("socket connect failed");
        return -1;
    }

    while(1)
    {
        memset(buf, 0, sizeof(buf));
        cout << "please input some string:";
        cin >> buf;
        iRet = tcp.Send(buf, strlen(buf));
        if (iRet < -1 && errno != EAGAIN)
        {
            perror("send failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("connect is closed");
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        iRet = tcp.Recv(buf, sizeof(buf));
        if (iRet < 0 && errno != EAGAIN)
        {
            perror("recv failed");
            return -1;
        }
        else if(iRet == 0)
        {
            perror("socket not connect");
            return -1;
        }

        fprintf(stdout, "recv data is:%sn", buf);
    }

    return 0;
}

それぞれ TestEpollServer.cpp と TestClient.cpp をコンパイルして、サーバーとクライアントのアプリケーションを生成し、通信を実現できます。

私が簡書に書いた内容はすべて私のオリジナルであり、転載には私の同意が必要です。簡書のホームページ:https://www.jianshu.com/u/e8c7bb5e3257

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。