linux epoll を使う echo サーバ

HTTP/1.1 オリジン・サーバをいきなり書くのは辛いので、 手始めに Linux の epoll (7) を使って、 シングル・スレッドの echo サーバを書いてみます。

//@<tcpechod.cpp@>=
// $ clang++ -std=c++11 -Wall -O2 -o tcpechod tcpechod.cpp

//@<ヘッダファイルをインクルードします@>
//@<定数を定義します@>
//@<シグナル・ハンドラを定義します@>
//@<sockaddr_ptr キャストを定義します@>
//@<connection_type クラスを定義します@>
//@<echo_tcpserver_type クラスを定義します@>
//@<main 関数を定義します@>

TCP を使うことにします。

//@<ヘッダファイルをインクルードします@>=
#include <string>
#include <vector>
#include <list>
#include <clocale>
#include <cstdio>
#include <cstdlib>
#include <csignal>
#include <cerrno>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>

ポート番号等を定義します。

//@<定数を定義します@>=
enum {
    SERVER_PORT = 10007,
    BACKLOG = 10,
    MAX_CONNECTIONS = 10,
    TIMEOUT = 5000,
};

サーバのシャットダウンに EINTR シグナルを使うことにします。 すなわち、 サーバを動かした端末で ^C を打つとシャットダウン処理に遷移するようにします。

//@<シグナル・ハンドラを定義します@>=
namespace {
    volatile std::sig_atomic_t g_signal_status;
}

static void
signal_handler (int signal)
{
    g_signal_status = signal;
}

sockaddr ポインタにするキャストを定義します。

//@<sockaddr_ptr キャストを定義します@>=
static inline struct sockaddr *
sockaddr_ptr (struct sockaddr_in& addr)
{
    return reinterpret_cast<struct sockaddr *> (&addr);
}

このサーバでは、MAX_CONNECTIONS 個まで同時接続できるようにしています。 epoll でリスン・ソケットと、accept した個々のソケットのイベントを監視して、処理を多重化します。 epoll での read イベントが発生するごとに connection_type クラスの fill メンバ関数を呼び、メンバ関数では読んだ内容をそのままオーム返しします。

//@<connection_type クラスを定義します@>=
struct connection_type {
    int sock;
    std::string remote_addr;
    connection_type () : sock (-1), remote_addr () {}

    int fill (char* buffer, std::size_t n)
    {
        return n == write (sock, buffer, n) ? 0 : -1;
    }
};

サーバは、epoll イベントとシグナルの両方によって状態遷移をするイベントループになっています。サーバのコンスタラクタで最大同時接続数とタイムアウトを指定します。タイムアウトの単位はミリ秒です。

//@<echo_tcpserver_type クラスを定義します@>=
class echo_tcpserver_type {
public:
    enum {STOP, INIT, POLLWAIT, NEXTEVENT, READ};
    enum {BUFFER_SIZE = 256};

    echo_tcpserver_type (int n, int to) : max_connections (n), timeout (to) {}
    void run (int port, int backlog);

private:
    int max_connections;
    int timeout;
    int listen_port;
    int listen_sock;
    int epoll_fd;
    std::vector<connection_type> connections;
    std::list<std::size_t> freelist;

    int on_init (int port, int backlog);
    int on_accept ();
    int on_read (int const id);
    void on_shutdown ();

    int listen_socket_create (int const port, int const backlog);
    int accept_client (int const listen_sock, std::string& remote_addr);
    int fd_set_nonblock (int fd);
    int fd_add_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u);
    int fd_del_epoll (int epoll_fd, int fd);
};

//@<run メンバ関数を定義します@>
//@<on_init メンバ関数を定義します@>
//@<on_accept メンバ関数を定義します@>
//@<on_read メンバ関数を定義します@>
//@<on_shutdown メンバ関数を定義します@>
//@<listen_socket_create メンバ関数を定義します@>
//@<accept_client メンバ関数を定義します@>
//@<fd_set_nonblock メンバ関数を定義します@>
//@<fd_add_epoll メンバ関数を定義します@>
//@<fd_del_epoll メンバ関数を定義します@>

最初に epoll_wait 用の配列を割り当てて、イベント・ループを回します。ループはシグナルを受け取ったときと、状態が state が STOP になったときのいずれかで抜け出します。ファイル・ディスクリプタは負のときは未使用・クローズ済みを表すことにし、ゼロ以上でオープン中になっていることを表します。そのため、ファイル・ディスクリプタ listen_sock と epoll_fd の初期値を -1 にしてあります。

状態 INIT で listen_sock と epoll_fd を初期化して、 listen_sock の読み込みイベントの監視を登録します。

状態 POLLWAIT で、イベントを待ちます。 nevents 個のイベントが発生したら、 events の先頭から nevents 個を状態 NEXTEVENT で順に調べます。 events にはファイル・ディスクリプタごとに登録しておいたユーザ・データが並びます。 man ページの例では、ユーザ・データにファイル・ディスクリプタを登録してありますが、他のものであってもかまいません。このサーバでは、ゼロは listen_sock を、1 以上は connections 配列の添字 conn_id を + 1 し、 ユーザ・データとして登録してあります。ゼロのときは、 on_accept メンバ関数を呼び、 1 以上では 状態 READ へ遷移します。 状態 READ から抜け出すと、正常時は状態 NEXTEVENT へ復帰します。シグナルを受け取ったらループを抜けて、 on_shutdown メンバ関数で後片付けをします。

//@<run メンバ関数を定義します@>=
void
echo_tcpserver_type::run (int port, int backlog)
{
    int max_events = max_connections + 1;
    struct epoll_event *events = new struct epoll_event[max_events];
    listen_sock = -1;
    epoll_fd = -1;
    int conn_id;
    int nevents, i;
    int state = INIT;
    for (;;) {
        if (SIGINT == g_signal_status)
            state = STOP;
        if (INIT == state) 
            state = on_init (port, backlog);
        else if (POLLWAIT == state) {
            // timeout で connection を close したいときは、ここで処理します。
            nevents = epoll_wait (epoll_fd, events, max_events, timeout);
            if (nevents > 0) {
                i = 0;
                state = NEXTEVENT;
            }
            else if (nevents < 0 && EINTR == errno)
                state = STOP;
            else if (nevents < 0)
                std::perror ("epoll_wait");
        }
        else if (NEXTEVENT == state) {
            if (i >= nevents)
                state = POLLWAIT;
            else if (0 == events[i].data.u32)
                state = on_accept ();
            else {
                conn_id = events[i].data.u32 - 1;
                state = READ;
            }
            ++i;
        }
        else if (READ == state)
            state = on_read (conn_id);
        else if (STOP == state) {
            on_shutdown ();
            break;
        }
    }
    delete[] events;
}

初期化では、connections に要素を max_connections 個作って、 freelist に conn_id を並べます。さらに、 listen_sock を作り、 bind して、 listen します。 それから、EINTR シグナルで accept を中断できるようにするために、 ノンブロッキングに設定します。 最後に epoll_fd を作って、 listen_sock の入力イベントを監視します。このファイル・ディスクリプタの epoll に設定するユーザ・データはゼロです。 ところで、 EPOLLET でエッジ・トリガにしていないのは、 イベント・ループで、 最大接続数を越えたとき accept を次回以降に後回しするためです。

//@<on_init メンバ関数を定義します@>=
int
echo_tcpserver_type::on_init (int port, int backlog)
{
    connections.clear ();
    freelist.clear ();
    for (int i = 0; i < max_connections; ++i) {
        connections.emplace_back ();
        freelist.push_back (i);
    }
    listen_port = port;
    if ((listen_sock = listen_socket_create (port, backlog)) < 0)
        ;
    else if (fd_set_nonblock (listen_sock) < 0)
        std::perror ("fcntl+listen_fd");
    else if ((epoll_fd = epoll_create (max_connections + 1)) < 0)
        std::perror ("epoll_create");
    else if (fd_add_epoll (epoll_fd, listen_sock, EPOLLIN, 0) < 0)
        std::perror ("epoll_ctl+listen_sock");
    else
        return POLLWAIT;
    return STOP;
}

listen_sock の入力イベントにより、 accept を試みます。 connections がすべて accept で使用済みのときは freelist が空になっています。そのときは、 accept をせずに他のイベントの処理に移ります。 accept で接続ソケット conn_sock を得たら、 ノンブロッキングに設定して、 入力イベントを epoll で監視します。 このときの、ユーザ・データは freelist から得た conn_id に 1 を足した数にします。

//@<on_accept メンバ関数を定義します@>=
int
echo_tcpserver_type::on_accept ()
{
    if (freelist.empty ())
        return NEXTEVENT;
    int state = STOP;
    int id = freelist.front ();
    std::string remote_addr;
    int conn_sock = accept_client (listen_sock, remote_addr);
    if (conn_sock < 0) {
        if (EAGAIN == errno || EWOULDBLOCK == errno)
            state = NEXTEVENT;
        else if (EINTR != errno)
            std::perror ("accept");
    }
    else if (fd_set_nonblock (conn_sock) < 0)
        std::perror ("fcntl+conn_fd");
    else if (fd_add_epoll (epoll_fd, conn_sock, EPOLLIN|EPOLLET, id + 1) < 0)
        std::perror ("epoll_ctl+conn_sock");
    else {
        freelist.pop_front ();
        connections[id].sock = conn_sock;
        connections[id].remote_addr = remote_addr;
        state = NEXTEVENT;
    }
    if (NEXTEVENT != state && conn_sock >= 0)
        close (conn_sock);
    return state;
}

クライアントからデータが届くと、on_read で読み取ります。 イベントをエッジトリガにしているため、入力がある限り、 状態 READ を繰り返します。 ノンブロッキング入力なので、 EAGAIN になったときは、 ソケットをクローズせずに状態を NEXTEVENT にして、他のイベントの処理をします。それ以外のときは、ソケットをクローズします。クローズするときは、 epoll のイベント監視も削除しておきます。さらに、 freelist に conn_id を登録して再利用できるようにしておきます。

//@<on_read メンバ関数を定義します@>=
int
echo_tcpserver_type::on_read (int const id)
{
    char buffer[BUFFER_SIZE];
    int state = READ;
    connection_type& conn = connections[id];
    int n = read (conn.sock, buffer, sizeof buffer);
    if (n > 0) {
        conn.fill (buffer, n);
        state = READ;
    }
    else if (n < 0 && (EAGAIN == errno || EWOULDBLOCK == errno))
        state = NEXTEVENT;
    else {
        if (n < 0 && EINTR != errno)
            std::perror ("read");
        if (fd_del_epoll (epoll_fd, conn.sock) < 0)
            std::perror ("epoll_ctl-conn_sock");
        close (conn.sock);
        conn.sock = -1;
        conn.remote_addr.clear ();
        freelist.push_back (id);
        state = NEXTEVENT;
    }
    return state;
}

サーバをシャットダウンします。開いているすべてのソケットを close して、 epoll イベントの監視を削除します。

//@<on_shutdown メンバ関数を定義します@>=
void
echo_tcpserver_type::on_shutdown ()
{
    for (auto& conn : connections)
        if (conn.sock >= 0) {
            if (fd_del_epoll (epoll_fd, conn.sock) < 0)
                std::perror ("epoll_ctl-conn_sock");
            close (conn.sock);
        }
    if (listen_sock >= 0) {
        if (fd_del_epoll (epoll_fd, listen_sock) < 0)
            std::perror ("epoll_ctl-listen_sock");
        close (listen_sock);
    }
    if (epoll_fd >= 0)
        close (epoll_fd);
}

以降は、定石ごとにメンバ関数にまとめたものです。

TCP ストリームのサーバのソケット listen_sock を作ります。

//@<listen_socket_create メンバ関数を定義します@>=
int
echo_tcpserver_type::listen_socket_create (int const port, int const backlog)
{
    struct sockaddr_in addr = {0};
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_ANY);
    addr.sin_port = htons (port);
    int yes = 1;
    int const sock = socket (PF_INET, SOCK_STREAM, 0);
    if (sock < 0)
        std::perror ("socket");
    else if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) < 0) // 追加
        std::perror ("setsockopt");
    else if (bind (sock, sockaddr_ptr (addr), sizeof addr) < 0)
        std::perror ("bind");
    else if (listen (sock, backlog) < 0)
        std::perror ("listen");
    else
        return sock;
    if (sock >= 0)
        close (sock);
    return -1;
}

クライアント接続を accept したソケットを返します。このとき、リモート・アドレスも取得しておきます。TCP ストリームでは、後で getpeername を使ってリモート・アドレスを得ることも可能ですが、ここで得ておきます。

//@<accept_client メンバ関数を定義します@>=
int
echo_tcpserver_type::accept_client (int const listen_sock, std::string& remote_addr)
{
    struct sockaddr_in addr;
    socklen_t len = sizeof addr;
    int const conn_sock = accept (listen_sock, sockaddr_ptr (addr), &len);
    if (conn_sock < 0)
        std::perror ("accept");
    else
        remote_addr = inet_ntoa (addr.sin_addr);
    return conn_sock;
}

ファイル・ディスクリプタのフラグをノンブロッキングに変更します。

//@<fd_set_nonblock メンバ関数を定義します@>=
int
echo_tcpserver_type::fd_set_nonblock (int fd)
{
    int flag;
    if ((flag = fcntl (fd, F_GETFL, 0)) >= 0)
        flag = fcntl (fd, F_SETFL, flag | O_NONBLOCK);
    return flag;
}

epoll_ctl でイベント監視を追加・削除します。

//@<fd_add_epoll メンバ関数を定義します@>=
int
echo_tcpserver_type::fd_add_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u)
{
    struct epoll_event ev = {0};
    ev.events = events;
    ev.data.u32 = u;
    return epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}

//@<fd_del_epoll メンバ関数を定義します@>=
int
echo_tcpserver_type::fd_del_epoll (int epoll_fd, int fd)
{
    struct epoll_event ev;
    return epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, &ev);
}

main 関数では、念のために locale を C に設定しておき、SIGPIPE を無視、 SIGINT にハンドラを設定してから、server を走らせます。

//@<main 関数を定義します@>=
int
main (int argc, char *argv[])
{
    std::setlocale (LC_ALL, "C");
    std::signal (SIGPIPE, SIG_IGN);
    std::signal (SIGINT, signal_handler);

    int port = SERVER_PORT;
    std::printf ("listening port %d\n", port);
    echo_tcpserver_type server (MAX_CONNECTIONS, TIMEOUT);
    server.run (port, BACKLOG);
    std::printf (" shutdown\n");

    return EXIT_SUCCESS;
}