N10K 掲示板の更新にトランザクションを導入

昨日の続きです。掲示板の投稿をデータ・ファイルとインデックス・ファイルをワンセットで扱うアプリケーションのため、両方の内容の整合性が崩れるとうまく動かなくなってしまいます。データ・ファイルからインデックス・ファイルを再編成することはできるものの、念のために、ファイル・サブシステムにオート・フラッシュをかけて、その上でトランザクションを導入することにしました。トランザクションといっても単純なもので、更新・削除のときにまずいことが生じるとロールバックするようにしただけです。

(2014-02-28: バージョン 0.07 までリファクタリングを進めて、その結果をこの項目にも反映しました。)

https://gist.github.com/tociyuki/9189068 n10k.pl バージョン 0.07

追記では、ロールバックに必要になるのはデータ・ファイルとインデックス・ファイルの追記前のサイズなので、それを最初にジャーナリング・ファイルに書き込みます。ジャーナリング・ファイルは、スクリプトがなんらかの異常で停止したとき、次回の処理前にロールバックをおこなうために使われます。通常は、例外処理でロールバックを処理するので、不要になったジャーナリング・ファイルも削除します。

use IO::File;

sub put {
    my($self, $object) = @_;
    my $already = -e $self->data_path;
    my($dh, $xh) = $self->open_data(LOCK_EX, '>>', '>>');
    $already or $dh->print("#BOARD1\n");
    if (! $self->is_full) {
        my $org_dsize = $dh->tell;
        my $org_xsize = $xh->tell;
        $self->journal_begin_work("put $org_dsize $org_xsize\n");
        if (! eval{
            $object = $object->new({%{$object}, 'rowid' => $org_dsize});
            my $s = encode_utf8($object->marshal_dump);
            my $w = $INTWIDTH - 1;
            $dh->printf("%${w}d\n%s", length $s, $s);
            $dh->flush; $dh->sync;
            $xh->printf("%${w}d\n", $org_dsize);
            $xh->flush; $xh->sync;
            1;
        }) { # rollback
            chomp(my $err = $@);
            warn "put: rollback: $err\n";
            $dh->truncate($org_dsize);
            $xh->truncate($org_xsize);
        }
    }
    $self->close_data($dh, $xh);
    $self->journal_commit;
    return $object;
}

open_data はデータファイルとインデックスファイルをオープンして flock をかけてファイルハンドルを返します。 close_data はそれら 2 つをクローズします。journal_begin_work はこれからおこなう処理をロールバックするための情報をジャーナルに書き込みます。journal_commit は、処理が正常に完了したことで不要になったジャーナルファイルを削除します。

削除は追記に比べて、ロールバックでエントリを元に戻さなければならない面倒さがあるため、楽をするために、データ・ファイルのレコードのフォーマットをロールバック向けに変更しました。レコードの先頭に書き込んでいるレコード・サイズの値を削除済みマークのときは負にして、通常の追記ではこれまで通り正にして、データファイルに記入します。これにより、ロールバックするとき、レコード・サイズの符号をプラスに書き直すだけで済みます。インデックス・ファイルを元に戻しやすくするために、インデックス・ファイルの内容を書き換えるのを止めて、一時ファイルへ削除対象以外をコピーして、データファイルに削除マークをつけた後に元のインデックス・ファイルと入れ替えます。これで、削除のジャーナリング・ファイルに必要な情報はエントリ・レコードのデータ・ファイル上のバイト位置だけに減りました。デバッグのために、さらにインデックス・ファイルで削除する箇所のオフセットも一緒に記録しています。

sub del {
    my($self, $rowid) = @_;
    return if ! -e $self->data_path;
    my($dh, $xh) = $self->open_data(LOCK_EX, '+<', '<');
    my($offset, $addr) = $self->search_index($xh, $rowid);
    if ($addr == $rowid) {
        $self->journal_begin_work("del $rowid $offset\n");
        my $new_index = $self->index_path . '.tmp';
        if (eval{
            my $xsize = (stat $xh)[7];
            my $xaddr = $offset * $INTWIDTH;
            my $oh = IO::File->new($new_index, '>')
                or die "del: cannot create new index file: $!\n";
            $oh->binmode;
            $self->copy_slice($xh, $oh, 0, $xaddr);
            $self->copy_slice($xh, $oh, $xaddr + $INTWIDTH, $xsize);
            $oh->flush; $oh->sync;
            $oh->close;
            $self->mark_record_size($dh, $rowid, -1);
            1;
        }) { # commit
            $xh->close;
            rename $new_index, $self->index_path;
        }
        else { # rollback
            chomp(my $err = $@);
            warn "del: rollback: $err\n";
            -e $new_index and unlink $new_index;
            $self->mark_record_size($dh, $rowid, +1);
        }
    }
    $self->close_data($dh, $xh);
    $self->journal_commit;
    return;
}

異常終了から復帰するためにロールバックをおこなう recover は、ジャーナル・ファイルからロールバックに必要な事項を読み出して、上記 2 種のそれぞれのロールバックをおこなってから、ジャーナル・ファイルを削除します。

sub recover {
    my($self) = @_;
    return if ! -e $self->data_path;
    my($dh, $xh) = $self->open_data(LOCK_EX, '+<', '+<');
    my $statement = $self->journal_restore;
    if ($statement) {
        my($op, $org_dsize, $org_xsize) = split /\s+/msx, $statement;
        if ($op eq 'put') {
            $dh->truncate($org_dsize);
            $xh->truncate($org_xsize);
        }
        elsif ($op eq 'del') {
            my $new_index = $self->index_path . '.tmp';
            -e $new_index and unlink $new_index;
            $self->mark_record_size($dh, $org_dsize, +1);
        }
    }
    $self->close_data($dh, $xh);
    $self->journal_commit;
    return $self;
}

recover は、常に実行する必要があるので、ルータの under 定義の中で実行するようにします。

under sub {
    $board->recover;
    return 1;
};