linux epoll を使う echo サーバでタイムアウト・クローズ

HTTP/1.1 には、 Connection: keep-alive にタイムアウトで接続を切る機能があります。 echo サーバで、 このやりかたを試します。 さらに、 Connection: close で接続を切る機能に対応させて、 echo サーバが受け取った文字列の先頭がピリオドのときに、 接続を切るようにしてみます。

いつタイムアウトするかタイミングがわからないといけないため、 コネクションごとに最後にソケットから読んだ時刻を記録しておきます。 そして、 epoll_wait の前にタイムアウトになっているコネクションを閉じることにします。 素朴に実装するなら、 タイムアウトになっているかどうか全コネクション構造体を調べることになるでしょう。 もう少しマシにするなら、 オープンしているコネクションをリストにして、 読み込みごとにリストの後ろへ動かしておきます。 そうするとリストは自然に時刻の昇順に並んでいるので、 リストの先頭からタイムアウトしているコネクションを閉じていき、 タイムアウトに満たないコネクションに辿り着いたところで探索を中断すれば良いでしょう。

ヘッダに ctime を追加します。 また、 list がいらなくなったので削ります。

//@<ヘッダファイルをインクルードします@>=
#include <string>
#include <vector>
#include <clocale>
#include <cstdio>
#include <cstdlib>
#include <csignal>
#include <ctime>
#include <cerrno>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>

リストの途中からコネクションを外して後ろにつなぐ目的には、 双方向リンク・リングが向いています。 基本的に、 配列の添字を id として使うため、 配列添字でリンクすれば良いわけで、 実装が楽です。 さらに、リングのデータ構造の中に複数のリングを作ることができることを利用して、 ゼロ番を FREE リストのためのリングの番兵、 1 番を ACTIVE リストのためのリングの番兵に使うことにします。 リングを初期化すると一本のリングができます。 使う側で、 1 番を取り外してリングを 2 本にします。

//@<ring_in_vector クラスを定義します@>=
template <class NODE_T>
class ring_in_vector {
public:
    ring_in_vector () : node () {}

    void
    resize (std::size_t n)
    {
        if (0 == n)
            n = 16; // default size;
        node.clear ();
        for (std::size_t i = 0; i < n; ++i)
            node.emplace_back ((n + i - 1) % n, (i + 1) % n);
    }

    bool
    empty (std::size_t i) const
    {
        return node[i].next == i;
    }

    // j 番の前に k 番を挿入します。
    void
    insert (std::size_t const j, std::size_t const k)
    {
        node[k].prev = node[j].prev;
        node[k].next = j;
        node[node[k].next].prev = k;
        node[node[k].prev].next = k;
    }

    // i 番を削除します。
    void
    erase (std::size_t const i)
    {
        node[node[i].next].prev = node[i].prev;
        node[node[i].prev].next = node[i].next;
        node[i].prev = i;
        node[i].next = i;
    }

    NODE_T& operator[] (std::size_t i)
    {
        return node[i];
    }

private:
    std::vector<NODE_T> node;
};

connection_type をリングのノードにできるように手直しします。 ノードにするには、 prev と next のメンバがあれば良く、 コンストラクタでこれらを初期化できるようにしておきます。 タイムアウトを検出するために、 last_received メンバを追加します。 fill では、先頭の一文字がピリオドなら -1 を返し、 そうでないときはゼロを返すようにします。 fill を呼ぶ on_read メンバ関数では、 戻り値をチェックして、 -1 のときは接続を切ることにします。

//@<connection_type クラスを定義します@>=
struct connection_type {
    std::size_t prev, next;
    int sock;
    std::string remote_addr;
    std::time_t last_received;
    connection_type (std::size_t b, std::size_t f)
        : prev (b), next (f), sock (-1), remote_addr () {}

    int fill (char* buffer, std::size_t n)
    {
        std::string s (buffer, n);
        std::size_t n1 = write (sock, buffer, n);
        return '.' == s[0] || n1 != n ? -1 : 0;
    }
};

echo_tcpserver_type で、 リングを使うようにメンバを修正します。 さらに、 タイムアウト処理用の on_sleep メンバ関数の宣言を追加します。 on_read メンバ関数on_sleep メンバ関数の 2 箇所でソケットのクローズをおこなうため、 クローズの処理を connection_close メンバ関数に切り出しておきます。

//@<echo_tcpserver_type クラスを定義します@>=

//@<ring_in_vector クラスを定義します@>

class echo_tcpserver_type {
public:
    enum {STOP, INIT, POLLWAIT, NEXTEVENT, READ};
    enum {BUFFER_SIZE = 256};
    enum {FREE = 0, ACTIVE = 1};

    echo_tcpserver_type (int n, int to) : max_connections (n), timeout (to) {}
    void run (int port, int backlog);

private:
    int max_connections;
    int timeout;
    int listen_port;
    int listen_sock;
    int epoll_fd;
    ring_in_vector<connection_type> connections;

    int on_init (int port, int backlog);
    int on_accept ();
    int on_read (int const id);
    int on_sleep ();
    void connection_close (std::size_t id);
    void on_shutdown ();

    int listen_socket_create (int const port, int const backlog);
    int accept_client (int const listen_sock, std::string& remote_addr);
    int fd_set_nonblock (int fd);
    int fd_add_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u);
    int fd_del_epoll (int epoll_fd, int fd);
};

//@<run メンバ関数を定義します@>
//@<on_init メンバ関数を定義します@>
//@<on_accept メンバ関数を定義します@>
//@<on_read メンバ関数を定義します@>
//@<on_sleep メンバ関数を定義します@>
//@<connection_close メンバ関数を定義します@>
//@<on_shutdown メンバ関数を定義します@>
//@<listen_socket_create メンバ関数を定義します@>
//@<accept_client メンバ関数を定義します@>
//@<fd_set_nonblock メンバ関数を定義します@>
//@<fd_add_epoll メンバ関数を定義します@>
//@<fd_del_epoll メンバ関数を定義します@>

run メンバ関数では epoll_wait の前に on_sleep を呼ぶようにします。 さらに epoll_wait のタイムアウトを 5 秒から 1 秒に短くします。 なお、リングでは ゼロと 1 を番兵に使い、 コネクションの id である添字は 2 以上に必ずなるため、 id をそのままイベントのユーザ・データに登録するように変更しました。

//@<run メンバ関数を定義します@>=
void
echo_tcpserver_type::run (int port, int backlog)
{
    int max_events = max_connections + 1;
    struct epoll_event *events = new struct epoll_event[max_events];
    listen_sock = -1;
    epoll_fd = -1;
    std::size_t conn_id;
    int nevents, i;
    int state = INIT;
    for (;;) {
        if (SIGINT == g_signal_status)
            state = STOP;
        if (INIT == state) 
            state = on_init (port, backlog);
        else if (POLLWAIT == state) {
            on_sleep ();
            nevents = epoll_wait (epoll_fd, events, max_events, 1000);
            if (nevents > 0) {
                i = 0;
                state = NEXTEVENT;
            }
            else if (nevents < 0 && EINTR == errno)
                state = STOP;
            else if (nevents < 0)
                std::perror ("epoll_wait");
        }
        else if (NEXTEVENT == state) {
            if (i >= nevents)
                state = POLLWAIT;
            else if (0 == events[i].data.u32)
                state = on_accept ();
            else {
                conn_id = events[i].data.u32;
                state = READ;
            }
            ++i;
        }
        else if (READ == state)
            state = on_read (conn_id);
        else if (STOP == state) {
            on_shutdown ();
            break;
        }
    }
    delete[] events;
}

on_init メンバ関数でリングを初期化します。 このとき、 リングのサイズを max_connections より 2 つ大きくして、 ゼロ番と 1 番を番兵に使えるようにします。 番兵の添字に使う FREE の値は 0、 ACTIVE の値は 1 です。 初期化直後は全ノードが一つのリングにつながっており、 それから ACTIVE をとりはずしておきます。 なお、 erase したノードはそれ自体が要素数 1 個のリングになっています。

//@<on_init メンバ関数を定義します@>=
int
echo_tcpserver_type::on_init (int port, int backlog)
{
    connections.resize (max_connections + 2);
    connections.erase (ACTIVE);
    listen_port = port;
    if ((listen_sock = listen_socket_create (port, backlog)) < 0)
        ;
    else if (fd_set_nonblock (listen_sock) < 0)
        std::perror ("fcntl+listen_fd");
    else if ((epoll_fd = epoll_create (max_connections + 1)) < 0)
        std::perror ("epoll_create");
    else if (fd_add_epoll (epoll_fd, listen_sock, EPOLLIN, 0) < 0)
        std::perror ("epoll_ctl+listen_sock");
    else
        return POLLWAIT;
    return STOP;
}

on_accept メンバ関数では、 FREE のリングの先頭からノードを削って、 ACTIVE リングの末尾に挿入します。 さらに、 epoll イベントにコネクションの id をそのまま使うように変更します。

//@<on_accept メンバ関数を定義します@>=
int
echo_tcpserver_type::on_accept ()
{
    if (connections.empty (FREE))
        return NEXTEVENT;
    int state = STOP;
    int id = connections[FREE].next;
    std::string remote_addr;
    int conn_sock = accept_client (listen_sock, remote_addr);
    if (conn_sock < 0) {
        if (EAGAIN == errno || EWOULDBLOCK == errno)
            state = NEXTEVENT;
        else if (EINTR != errno)
            std::perror ("accept");
    }
    else if (fd_set_nonblock (conn_sock) < 0)
        std::perror ("fcntl+conn_fd");
    else if (fd_add_epoll (epoll_fd, conn_sock, EPOLLIN|EPOLLET, id) < 0)  // 変更
        std::perror ("epoll_ctl+conn_sock");
    else {
        connections.erase (id);
        connections.insert (ACTIVE, id);
        connections[id].sock = conn_sock;
        connections[id].remote_addr = remote_addr;
        connections[id].last_received = std::time (nullptr);
        state = NEXTEVENT;
    }
    if (NEXTEVENT != state && conn_sock >= 0)
        close (conn_sock);
    return state;
}

on_read メンバ関数では、 last_received を更新し、 fill の戻り値が -1 のときは接続を閉じます。 そうでないときは、 コネクションを ACTIVE リングの末尾へ移動します。

//@<on_read メンバ関数を定義します@>=
int
echo_tcpserver_type::on_read (int const id)
{
    char buffer[BUFFER_SIZE];
    int state = READ;
    connection_type& conn = connections[id];
    int n = read (conn.sock, buffer, sizeof buffer);
    if (n > 0) {
        int r = conn.fill (buffer, n);
        conn.last_received = std::time (nullptr);
        if (r < 0) {
            connection_close (id);
            state = NEXTEVENT;
        }
        else {
            connections.erase (id);
            connections.insert (ACTIVE, id);
            state = READ;
        }
    }
    else if (n < 0 && (EAGAIN == errno || EWOULDBLOCK == errno))
        state = NEXTEVENT;
    else {
        if (n < 0 && EINTR != errno)
            std::perror ("read");
        connection_close (id);
        state = NEXTEVENT;
    }
    return state;
}

新しく追加した on_sleep メンバ関数では、 ACTIVE リングを先頭から末尾へと辿っていき、タイムアウトしているコネクションを閉じます。 connection_close は id 番のコネクションを ACTIVE リングから FREE リングへ移し替えます。 そのため、 id を一つ前に戻しておいてから、 connection_close を呼び出すようにします。

//@<on_sleep メンバ関数を定義します@>=
int
echo_tcpserver_type::on_sleep ()
{
    std::time_t now = std::time (nullptr);
    int timeout_sec = timeout / 1000;
    if (connections.empty (ACTIVE))
        return 0;
    for (std::size_t id = connections[ACTIVE].next; id != ACTIVE; id = connections[id].next) {
        connection_type& conn = connections[id];
        if (conn.last_received + timeout_sec > now)
            break;
        std::size_t gone_id = id;
        id = conn.prev;
        connection_close (gone_id); // side effects prev and next links
    }
    return 0;
}

コネクションを閉じる処理を切り出した connection_close メンバ関数では、 ACTIVE リングからコネクションを取り外して、 FREE リングの末尾へ移します。

//@<connection_close メンバ関数を定義します@>=
void
echo_tcpserver_type::connection_close (std::size_t const id)
{
    connection_type& conn = connections[id];
    if (fd_del_epoll (epoll_fd, conn.sock) < 0)
        std::perror ("epoll_ctl-conn_sock");
    close (conn.sock);
    conn.sock = -1;
    conn.remote_addr.clear ();
    connections.erase (id);
    connections.insert (FREE, id);
}

on_shutdown メンバ関数では、 ACTIVE リングにつながっているリングをクローズするようにします。

//@<on_shutdown メンバ関数を定義します@>=
void
echo_tcpserver_type::on_shutdown ()
{
    for (std::size_t id = connections[ACTIVE].next; id != ACTIVE; id = connections[id].next) {
        connection_type& conn = connections[id];
        if (conn.sock >= 0) {
            if (fd_del_epoll (epoll_fd, conn.sock) < 0)
                std::perror ("shutdown-epoll_ctl-conn_sock");
            close (conn.sock);
        }
    }
    if (listen_sock >= 0) {
        if (fd_del_epoll (epoll_fd, listen_sock) < 0)
            std::perror ("epoll_ctl-listen_sock");
        close (listen_sock);
    }
    if (epoll_fd >= 0)
        close (epoll_fd);
}

これで修正はおしまいです。