HashPrevalence

Ruby のための、永続化 Hash オブジェクト HashPrevalence を書いてみました。これは Klaus Wuestefeld 氏が提唱したオブジェクトの永続化方法 Advogato: OBJECT PREVALENCE (2001) の考え方を基にしています。Object Prevalence はA Simple and Efficient Implementation for Small Databases (1987, PDFファイル)の方式を応用し、ログ先行書き込み方式(Write Ahead Logging, WAL)のトランザクション処理機構を組み込んだメモリに全データを保持するやりかたです。この種のライブラリには既に汎用の Madeleine - Ruby Object Prevalence があるものの、HashPrevalence では Hash オブジェクトの一部のメソッドだけを利用することにして、ジャーナリングするコマンドを単純化しています。

  • 永続データを記録するディレクトリを指定して new します。初期化時にディレクトリ中のスナップショット・ファイルから読み取り、さらにジャーナリング・ファイルのコマンドを実行してデータを復元します。
  • 排他制御にはシンプルに flock を使っています。
  • 通常の Hash への要素書き込みと要素削除をしているかのように利用できます。内部では、こうしたデータ更新要求をコマンド発行におきかえて、コマンドをジャーナリングに記録してから、コマンドを実行してデータの更新をおこなっています。
  • トランザクションは transaction メソッドに渡すブロック中でおこないます。トランザクション中は、ジャーナリングもメモリ中に記録します。トランザクションがエラーなしで完了したらコミットとみなして、メモリ中に記録済みのコマンドをジャーナリング・ファイルに書き出します。
  • トランザクション中にエラーが発生するか、rollback をレシーバが受け取ったときは、メモリ中のジャナーリングを元にコマンドを undo してトランザクション前にデータを復帰します。このときは、ジャーナリング・ファイルへの書き出しはおこないません。
  • トランザクション外でのデータ更新メソッドの呼びだし、トランザクション完了時に、時刻をチェックしてスナップショットを作るときがあります。ほぼ1秒間隔でスナップショットを作成します。
  • コマンドとスナップショットは Marshal.dump を使ってシリアライズをおこないます。そのため、永続データの key と value は Marshal.dump 可能なオブジェクトでなければいけません。そうは言っても、key は素直に文字列か数にしておいた方が無難でしょう。value は静的な入れ子データならなんでも格納可能です。

Wiki のモデルのような利用例です。エントリにはハッシュを格納しています。

prevalence = HashPrevalence.new('test')
prevalence.transaction do |entries|
  word = 'FrontPage'
  word_id = 'w;%s' % [word]
  if not entries.key?(word_id)
    rev = 1
    revisions = []
  else
    rev = entries[word_id][:latest] + 1
    revisions = entries[word_id][:revisions].dup
  end
  content_id = 'c;%s;%8d' % [word, rev]
  revisions.push content_id
  entries[word_id] = {:id => word, :latest => rev, :revisions => revisions}
  entries[content_id] = {:id => word, :rev => rev}
end
prevalence.snapshot
prevalence.transaction do |entries|
  word = 'FrontPage'
  word_id = 'w;%s' % [word]
  break if not entries.key?(word_id)
  entries[word_id][:revisions].reverse_each do |content_id|
    content = entries[content_id]
    puts '%-10s %8d' % [content[:id], content[:rev]]
  end
end

HashPrevalence のソースです。

Simple Persistent Hash based on Object Prevalence ― Gist

put_journal でジャーナルへの追記をおこない、each_journal でジャーナル中のコマンドを順にとりだします。take_snapshot が現在のメモリ中のスナップショットを書き出してからジャーナルを消去します。コマンドは、undo/redo に対応するために差分データを保持しています。ジャーナルには時刻をスタンプしていますが、現在の実装では使っていません。

class HashPrevalence
  include Enumerable

  Rollback = Class.new(Exception)
  
  SNAPSHOT_PERIOD = 1 # seconds

  module BASENAME
    LOCK = 'lock'
    JOURNAL = 'journal'
    SNAPSHOT = 'snapshot'
  end

  attr_reader :data

  def initialize(directory)
    if not Dir.exist?(directory)
      Dir.mkdir(directory)
    elsif not FileTest.writable?(directory)
      raise IOError, "cannot write directory '#{directory}'"
    end
    @directory = directory
    lockname = File.join(@directory, BASENAME::LOCK)
    @lock = open(lockname, File::RDWR|File::CREAT, 0600)
    journalname = File.join(@directory, BASENAME::JOURNAL)
    @journal = open(journalname, File::RDWR|File::CREAT|File::SYNC, 0600).binmode
    @journal.sync = true
    @commands = nil
    @timer = Time.now
    recover
  end

  def keys() @data.keys end
  def key?(k) @data.key?(k) end
  def each(&p) @data.each(&p) end
  def [](key) @data[key] end
  def []=(key, value) journaling { store_command(key, value) } end
  def store(key, value) journaling { store_command(key, value) } end
  def delete(key) journaling { delete_command(key) } end

  def rollback() raise Rollback end

  def transaction
    synchronize do
      @commands = []
      begin
        yield self
      rescue Rollback
        @commands.reverse_each {|command| undo_command(command) }
      rescue StandardError => e
        @commands.reverse_each {|command| undo_command(command) }
        raise e
      else
        @commands.each {|command| put_journal(command) }
        schedule_checkpoint
      ensure
        @commands = nil
      end
    end
    self
  end

  def snapshot
    if @commands
      @timer = Time.now - SNAPSHOT_PERIOD * 2
    else
      synchronize { take_snapshot }
      @timer = Time.now
    end
    self
  end

  def dispose
    synchronize do
      if @commands
        @commands.clear
      end
      @journal.rewind
      @journal.truncate(0)
      @journal.fsync
      snapshotname = File.join(@directory, BASENAME::SNAPSHOT)
      File.delete(snapshotname)
      @data.clear
    end
    self
  end

private

  def store_command(key, value)
    if @data.key?(key)
      {:proc => :update, :key => key, :from => @data[key], :to => value}
    else
      {:proc => :insert, :key => key, :to => value}
    end
  end

  def delete_command(key)
    if @data.key?(key)
      {:proc => :delete, :key => key, :from => @data[key]}
    else
      nil
    end
  end

  def redo_command(command)
    case command[:proc]
    when :insert, :update
      @data[command[:key]] = command[:to]
    when :delete
      @data.delete(command[:key])
    end
  end

  def undo_command(command)
    case command[:proc]
    when :delete, :update
      @data[command[:key]] = command[:from]
    when :insert
      @data.delete(command[:key])
    end
  end

  def journaling
    if @commands
      command = yield
      return nil if command.nil?
      @commands.push command
      redo_command(command)    
    else
      value = nil
      synchronize do
        command = yield
        break if command.nil?
        put_journal(command)
        value = redo_command(command)    
        schedule_checkpoint
      end
      value
    end
  end

  def synchronize
    @lock.flock(File::LOCK_EX)
    begin
      yield
    ensure
      @lock.flock(File::LOCK_UN)
    end
    nil
  end

  def put_journal(command)
    content = Marshal.dump(command)
    stump = Time.now
    s = [
      'jnal',
      stump.to_i,
      stump.nsec,
      content.size,
      content,
    ].pack('a4NNNa*')
    @journal.seek(0, IO::SEEK_END)
    @journal.print s
    @journal.fsync
    nil
  end

  def each_journal
    @journal.rewind
    while not @journal.eof?
      csig, csec, cnsec, csize = @journal.read(16).unpack('a4NNN')
      if csig != 'jnal' || csec <= 0 || cnsec < 0 || csize <= 0
        raise IOError, 'Bad journaling file.'
      end
      command = Marshal.load(@journal.read(csize))
      yield csec, cnsec, command
    end
    nil
  end

  def recover
    synchronize do
      snapshotname = File.join(@directory, BASENAME::SNAPSHOT)
      @data = {}
      if File.exist?(snapshotname)
        open(snapshotname, 'rb') do |fd|
          @data = Marshal.load(fd)
        end
      end
      each_journal do |csec, cnsec, command|
        redo_command(command)
      end
    end
    self
  end

  def schedule_checkpoint
    cur = Time.now
    return if cur < @timer + SNAPSHOT_PERIOD
    take_snapshot
    @timer = cur
    self
  end

  def take_snapshot
    snapshotname = File.join(@directory, BASENAME::SNAPSHOT)
    backupname = nil
    if File.exist?(snapshotname)
      backupname = File.join(@directory, BASENAME::SNAPSHOT + '.orig')
      File.rename(snapshotname, backupname)
    end
    begin
      open(snapshotname, 'wb', 0600) do |fd|
        Marshal.dump(@data, fd)
      end
    rescue => e
      if backupname
        File.rename(backupname, snapshotname)
      elsif File.exist?(snapshotname)
        File.delete(snapshotname)
      end
      raise e
    else
      @journal.rewind
      @journal.truncate(0)
      @journal.fsync
    end
    self
  end
end