linux epoll を使う echo サーバをラウンド・ロビン方式へ

前回までのサーバは、 ソケットが入力可能になったら、入力が途切れるかクローズされるまで、ずっとそのソケットを読みつづけて、 直後にそのまま読み込んだ内容をソケットに出力し終わるまで処理を続ける書き方になっていました。 このやりかたは、マニュアル・ページ epoll (7) の「落とし穴」の項に説明がある飢餓 (starvation) 状態になるケースに該当します。

`man 7 epoll`

Possible Pitfalls and Ways to Avoid Them

o Starvation (edge-triggered)

If there is a large amount of I/O space, it is possible that by trying to drain it the other files will not get processed causing starvation. (This problem is not specific to epoll.)

The solution is to maintain a ready list and mark the file descriptor as ready in its associated data structure, thereby allowing the application to remember which files need to be processed but still round robin amongst all the ready files. This also supports ignoring subsequent events you receive for file descriptors that are already ready.

解決方法にあるように、 コネクションごとにイベントをフラグとして保持するようにして、 epoll_wait で得たイベントから connection_type オブジェクトのフラグを更新するようにします。 connection_type オブジェクトは、 EPOLLIN か EPOLLOUT の一方のイベントを受け付ける状態間を遷移し続けます。 現在の状態で得たいイベントがあるとき、 同オブジェクトは READY リングに入り、 イベント待ちのときは IDLE リングに入ります。 READY リングに入っているオブジェクトに対して、 ラウンド・ロビンで順に読み込みか書き込みの処理をおこなっていきます。 今回は EPOLLOUT の方を複数のタスクに分割してラウンド・ロビンを回せているかどうかを確かめるために、 fill メンバ関数を変更し、 buffer から body へ一行切り出して、 body の内容をソケットに出力するようにします。

//@<connection_type クラスを定義します@>=
struct connection_type {
    std::size_t prev, next;
    int state;              // つなげてあるリングの種類 FREE、IDLE、READY のいずれか
    uint32_t events;        // struct epoll_event の events メンバからコピーした値
    uint32_t events_mask;   // 現在受付中のイベント: EPOLLIN か EPOLLOUT のいずれか
    int sock;
    std::string remote_addr;
    std::time_t last_time;
    std::string body;
    std::string buffer;
    connection_type (std::size_t b, std::size_t f)
        : prev (b), next (f), state (0),
          events (0), events_mask (0), sock (-1), remote_addr (), last_time (0),
          body (), buffer () {}

    bool ok () const { return ! body.empty () && '\n' == body.back (); }
    bool quit () const { return body == ".\r\n"; }

    void fill (char* p, ssize_t n)
    {
        buffer.append (p, n);
    }

    bool decode ()
    {
        while (! buffer.empty ()) {
            int c = buffer.front ();
            buffer.erase (0, 1);
            if (';' == c) {
                c = '\n';
                body.push_back (';');
                body.push_back ('\r');
            }
            body.push_back (c);
            if ('\n' == c)
                break;
        }
        return ok ();
    }
};

サーバでは、リングの数を一つ増やします。 FREE は未使用の connection_type オブジェクト、 IDLE はイベント待ちのもの、 READY はイベント処理中を表します。 低レベルの epoll_ctl のラッパ・メンバ関数に EPOLL_CTL_MOD をおこなうものを追加しています。

//@<echo_tcpserver_type クラスを定義します@>=

//@<ring_in_vector クラスを定義します@>

class echo_tcpserver_type {
public:
    enum {STOP, INIT, POLLWAIT, RUN};
    enum {BUFFER_SIZE = 256};
    enum {FREE = 0, IDLE = 1, READY = 2};

    echo_tcpserver_type (int n, int to) : max_connections (n), timeout (to) {}
    void run (int port, int backlog);

private:
    int max_connections;
    int timeout;
    int listen_port;
    int listen_sock;
    int epoll_fd;
    ring_in_vector<connection_type> connections;

    int on_init (int port, int backlog);
    int on_pollwait (struct epoll_event *events, int max_events);
    void on_expires ();
    int on_accept ();
    std::size_t on_read (std::size_t const cur_id);
    std::size_t on_write (std::size_t const cur_id);
    std::size_t connection_close (std::size_t id);
    std::size_t connection_move (int k, int id);
    void on_shutdown ();

    int listen_socket_create (int const port, int const backlog);
    int accept_client (int const listen_sock, std::string& remote_addr);
    int fd_set_nonblock (int fd);
    int fd_add_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u);
    int fd_mod_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u);
    int fd_del_epoll (int epoll_fd, int fd);
};

//@<run メンバ関数を定義します@>
//@<on_init メンバ関数を定義します@>
//@<on_pollwait メンバ関数を定義します@>
//@<on_expires メンバ関数を定義します@>
//@<on_accept メンバ関数を定義します@>
//@<on_read メンバ関数を定義します@>
//@<on_write メンバ関数を定義します@>
//@<connection_close メンバ関数を定義します@>
//@<connection_move メンバ関数を定義します@>
//@<on_shutdown メンバ関数を定義します@>
//@<listen_socket_create メンバ関数を定義します@>
//@<accept_client メンバ関数を定義します@>
//@<fd_set_nonblock メンバ関数を定義します@>
//@<fd_add_epoll メンバ関数を定義します@>
//@<fd_mod_epoll メンバ関数を定義します@>
//@<fd_del_epoll メンバ関数を定義します@>

run メンバ関数では、 POLLWAIT と RUN の 2 つを交互に実行します。 POLLWAIT で READY リングが空でなくなると RUN へ移ります。 RUN では、 READY リングを一周分実行して POLLWAIT に戻ります。 RUN のループでは、 読み込みまたは書き込みをおこなったコネクションを READY リングの末尾へ動かしていく副作用があるため、 妙な書き方にしています。

//@<run メンバ関数を定義します@>=
void
echo_tcpserver_type::run (int port, int backlog)
{
    int max_events = max_connections + 1;
    struct epoll_event *events = new struct epoll_event[max_events];
    listen_sock = -1;
    epoll_fd = -1;
    int kont = INIT;
    for (;;) {
        if (SIGINT == g_signal_status)
            kont = STOP;
        if (INIT == kont)
            kont = on_init (port, backlog);
        else if (POLLWAIT == kont)
            kont = on_pollwait (events, max_events);
        else if (RUN == kont) {
            std::size_t id = connections[READY].next;
            std::size_t last_id = connections[READY].prev;
            while (0 == g_signal_status) {
                std::size_t cur_id = id;
                connection_type& conn = connections[id];
                if ((conn.events_mask & EPOLLIN) && (conn.events & EPOLLIN))
                    id = on_read (id);
                if ((conn.events_mask & EPOLLOUT) && (conn.events & EPOLLOUT))
                    id = on_write (id);
                if (cur_id == last_id)
                    break;
                id = connections[id].next;
            }
            kont = POLLWAIT;
        }
        else if (STOP == kont) {
            on_shutdown ();
            break;
        }
    }
    delete[] events;
    events = nullptr;
}

on_init メンバ関数では、 リングを FREE、 IDLE、 READY に 3 分割します。 さらに、 listen に成功した段階で listening のメッセージを出力するように変更しました。

//@<on_init メンバ関数を定義します@>=
int
echo_tcpserver_type::on_init (int port, int backlog)
{
    connections.resize (max_connections + 3);
    connections.erase (IDLE);
    connections.erase (READY);
    listen_port = port;
    int kont = STOP;
    if ((listen_sock = listen_socket_create (port, backlog)) < 0)
        ;
    else if (fd_set_nonblock (listen_sock) < 0)
        std::perror ("fcntl+listen_fd");
    else if ((epoll_fd = epoll_create (max_connections + 1)) < 0)
        std::perror ("epoll_create");
    else if (fd_add_epoll (epoll_fd, listen_sock, EPOLLIN, 0) < 0)
        std::perror ("epoll_ctl+listen_sock");
    else {
        kont = POLLWAIT;
        std::printf ("listening port %d\n", port);
        std::fflush (stdout);
    }
    return kont;
}

on_pollwait メンバ関数では、 最初にタイムアウトしているコネクションを閉じた後に、 epoll_wait でイベント待ちをします。 epoll_wait のタイムアウトは、 READY リングが空のときは 1000 ミリ秒、 空でないときはゼロ秒にしてイベントがないときは直ちに戻るようにしています。 イベントのユーザ・データの id がゼロのときは listening ソケットへの通知なので、 accept を実行します。 id がゼロより大きいときは、 その id の connection_type のイベントがゼロから 1 に変化したことを表します。 struct epoll_event の events メンバにソケットのイベントが格納されているので、 それを connection_type オブジェクトの events へ反映します。 connection_type オブジェクトの events_mask メンバで示すビットがオンになったときに、 connection_type オブジェクトが IDEL リングにつながっているときは、 READY リングへ移します。

//@<on_pollwait メンバ関数を定義します@>=
int
echo_tcpserver_type::on_pollwait (struct epoll_event *events, int max_events)
{
    on_expires ();
    int msec = connections.empty (READY) ? 1000 : 0;
    int nevents = epoll_wait (epoll_fd, events, max_events, msec);
    int kont = POLLWAIT;
    if (nevents < 0) {
        if (EINTR != errno)
            std::perror ("epoll_wait");
        return STOP;
    }
    for (int i = 0; i < nevents; ++i) {
        std::size_t id = events[i].data.u32;
        if (0 == id && ! connections.empty (FREE)) {
            kont = on_accept ();
            if (STOP == kont)
                return kont;
        }
        else if (id > 0) {
            connection_type& conn = connections[id];
            conn.events |= events[i].events & (EPOLLIN|EPOLLOUT);
            if (READY != conn.state && (conn.events_mask & conn.events))
                connection_move (READY, id);
        }
    }
    if (POLLWAIT == kont && ! connections.empty (READY))
        kont = RUN;
    return kont;
}

on_expires メンバ関数で、 IDLE と READY の 2 つのリングからタイムアウトしているコネクションを探して閉じます。

//@<on_expires メンバ関数を定義します@>=
void
echo_tcpserver_type::on_expires ()
{
    std::size_t id;
    std::time_t now = std::time (nullptr);
    int tmsec = timeout / 1000;
    for (id = connections[IDLE].next; IDLE != id; id = connections[id].next) {
        if (connections[id].last_time + tmsec > now)
            break;
        id = connection_close (id);
    }
    for (id = connections[READY].next; READY != id; id = connections[id].next) {
        if (connections[id].last_time + tmsec > now)
            break;
        id = connection_close (id);
    }
}

on_accept メンバ関数で、 新しいコネクションを作ります。 コネクションの初期化をおこない、 EPOLLIN から処理を始めるため、 コネクションの events_mask メンバをそれにセットします。

//@<on_accept メンバ関数を定義します@>=
int
echo_tcpserver_type::on_accept ()
{
    int kont = STOP;
    std::string remote_addr;
    int sock = accept_client (listen_sock, remote_addr);
    if (sock >= 0) {
        std::size_t id = connections[FREE].next;
        if (fd_set_nonblock (sock) < 0)
            perror ("fcntl+conn_sock");
        else if (fd_add_epoll (epoll_fd, sock, EPOLLIN|EPOLLET, id) < 0)
            perror ("epoll_ctl+conn_sock");
        else {
            connection_move (IDLE, id);
            connections[id].events = 0;
            connections[id].events_mask = EPOLLIN;
            connections[id].sock = sock;
            connections[id].remote_addr = remote_addr;
            connections[id].last_time = std::time (nullptr);
            kont = POLLWAIT;
        }
    }
    if (POLLWAIT != kont && sock >= 0)
        close (sock);
    return kont;
}

on_read メンバ関数では、 ソケットから read した内容を fill メンバ関数でコネクションの buffer メンバに追加します。 コネクションの decode メンバ関数を読んで、 body メンバに一行切り出させます。 切り出した行がピリオド 1 個のときは、 quit メンバ関数が真を返すので、 コネクションを閉じます。 切り出し途中のときは、 足りない分を再読み込みするため、 read を繰り返します。 切り出しに成功したら、 その行を EPOLLOUT イベントでソケットに出力します。 EAGAIN のときは、 コネクションの events の EPOLLIN ビットをクリアし、 IDLE リングに移動します。 エラーのときは、 ソケットを閉じます。

//@<on_read メンバ関数を定義します@>=
std::size_t
echo_tcpserver_type::on_read (std::size_t const cur_id)
{
    char buffer[BUFFER_SIZE];
    std::size_t id = cur_id;
    int state = FREE;
    connection_type& conn = connections[cur_id];
    ssize_t n = read (conn.sock, buffer, sizeof buffer);
    if (n > 0) {
        conn.last_time = std::time (nullptr);
        conn.fill (buffer, n);
        conn.decode ();
        if (conn.quit ())
            ;
        else if (! conn.ok ())
            state = READY;
        else if (fd_mod_epoll (epoll_fd, conn.sock, EPOLLOUT|EPOLLET, id) < 0)
            perror ("epoll_ctl!conn_sock");
        else {
            conn.events_mask = EPOLLOUT;
            state = (conn.events & EPOLLOUT) ? READY : IDLE;
        }
    }
    else if (n < 0 && (EAGAIN == errno || EWOULDBLOCK == errno)) {
        conn.events &= ~EPOLLIN;
        state = IDLE;
    }
    else if (n < 0 && EINTR != errno)
        perror ("read");
    if (FREE == state)
        return connection_close (cur_id);
    return connection_move (state, cur_id);
}

on_write メンバ関数も似た処理をします。 出力に成功したとき、 body は空になるので、 そのときは、 もしもあれば buffer から body に一行切り出します。 切り出しできないときは EPOLLIN に切り替えます。 EAGAIN は on_read と同じです。 出力中にクライアントがソケットを閉じたときは、 EPIPE になるため、 エラーのときにソケットを閉じておきます。

//@<on_write メンバ関数を定義します@>=
std::size_t
echo_tcpserver_type::on_write (std::size_t const cur_id)
{
    std::size_t id = cur_id;
    int state = FREE;
    connection_type& conn = connections[cur_id];
    ssize_t n = write (conn.sock, conn.body.c_str (), conn.body.size ());
    if (n > 0) {
        conn.last_time = std::time (nullptr);
        conn.body.erase (0, n);
        if (! conn.body.empty ())
            state = READY;
        else {
            conn.decode ();
            if (conn.quit ())
                ;
            else if (conn.ok ())
                state = READY;
            else if (fd_mod_epoll (epoll_fd, conn.sock, EPOLLIN|EPOLLET, id) < 0)
                perror ("epoll_ctl!conn_sock");
            else {
                conn.events_mask = EPOLLIN;
                state = (conn.events & EPOLLIN) ? READY : IDLE;
            }
        }
    }
    else if (n < 0 && (EAGAIN == errno || EWOULDBLOCK == errno)) {
        conn.events &= ~EPOLLOUT;
        state = IDLE;
    }
    else if (EINTR != errno && EPIPE != errno)
        perror ("write");
    if (FREE == state)
        return connection_close (cur_id);
    return connection_move (state, cur_id);
}

connection_close メンバ関数は、 前のものから少し変更し、 クローズしたコネクションの一つ前の IDLE か READY のコネクションを返します。

//@<connection_close メンバ関数を定義します@>=
std::size_t
echo_tcpserver_type::connection_close (std::size_t id)
{
    if (fd_del_epoll (epoll_fd, connections[id].sock) < 0)
        perror ("epoll_ctl-conn_sock");
    close (connections[id].sock);
    connections[id].sock = -1;
    connections[id].events_mask = 0;
    return connection_move (FREE, id);
}

新しく追加した connection_move メンバ関数は、 リング末尾へコネクションを移動します。

//@<connection_move メンバ関数を定義します@>=
std::size_t
echo_tcpserver_type::connection_move (int head, int id)
{
    if (connections[head].prev == id)
        return id;
    std::size_t prev_id = connections[id].prev;
    connections.erase (id);
    connections.insert (head, id);
    connections[id].state = head;
    return prev_id;
}

on_shutdown メンバ関数は、 accept したコネクションのソケットを閉じて、 epoll_fd と listen_sock も閉じます。

//@<on_shutdown メンバ関数を定義します@>=
void
echo_tcpserver_type::on_shutdown ()
{
    std::printf (" ..");
    std::fflush (stdout);
    for (std::size_t id = 3; id < max_connections + 3; ++id) {
        if (connections[id].sock < 0)
            continue;
        if (fd_del_epoll (epoll_fd, connections[id].sock) < 0)
            std::perror ("epoll_ctl-conn_sock");
        close (connections[id].sock);
        connections[id].sock = -1;
    }
    if (listen_sock >= 0) {
        if (fd_del_epoll (epoll_fd, listen_sock) < 0)
            std::perror ("epoll_ctl-listen_sock");
        close (listen_sock);
        listen_sock = -1;
    }
    if (epoll_fd >= 0) {
        close (epoll_fd);
        epoll_fd = -1;
    }
    std::printf (" shutdown\n");
}

fd_mod_epoll メンバ関数は、 EPOLL_CTL_MOD を使い、 どのイベントを監視するかの設定を変更します。

//@<fd_mod_epoll メンバ関数を定義します@>=
int
echo_tcpserver_type::fd_mod_epoll (int epoll_fd, int fd, uint32_t events, uint32_t u)
{
    struct epoll_event ev = {0};
    ev.events = events;
    ev.data.u32 = u;
    return epoll_ctl (epoll_fd, EPOLL_CTL_MOD, fd, &ev);
}