Snapshot Isolation のおもちゃ・行持ち MVCC 版

前回のおもちゃトランザクションごとに大域データを丸ごと局所データにコピーして隔離を実現しました。 今度のおもちゃは複数のバージョンをデータに混ぜる Multiversion concurrency control (MVCC) で隔離してみます。 バージョンの混ぜ方は様々で、 今回は行ごとに履歴を保持する方式にします。
グローバル・データをハッシュ表の行を履歴にするため、 行を配列にします。 配列の中にバージョン有効範囲をつけたデータ・レコードを並べます。 バージョン有効範囲は、 そのレコードを生んだトランザクションのタイムスタンプ xmin で始まり、 そのレコードを削除したトランザクションのタイムスタンプ xmax で終わります。 削除されていないレコードでは終わりをゼロとします。

例えば、 空のデータに、 タイムスタンプ 1 のトランザクション t1 が id 1 のレコードと id 3 のものを追加したら次のようになります。 これをコミットすると、 そのままデータとして残り、 ロールバックすると空に戻します。

class Version
  attr_accessor :xmin, :xmax, :data
  def initialize(xmin, xmax, data) @xmin, @xmax, @data = xmin, xmax, data end
end

  # t1 = table.transaction
  # t1.insert(1, 'alice', 100)
  # t1.insert(3, 'carrol', 100)
  # t1.commit
  @data = {
    1 => [Version.new(1, 0, {:id => 1, :name => 'alice', :v => 100})],
    3 => [Version.new(1, 0, {:id => 3, :name => 'carrol', :v => 100})]}

続いて、 t1 のコミット後のタイムスタンプ 2 のトランザクション t2 と、 タイムスタンプ 3 のトランザクション t3 でデータを操作します。 t2 は id 1 のデータを更新し、 id 2 のデータを追加します。 t3 は id 3 のデータを削除します。 両方ともコミットする前は次のように、 それぞれのバージョンが並んで共存します。 さらに、 タイムスタンプ 4 のトランザクション t4 を加えて、 レコードの見え方を調べてみます。 t2 ではタイムスタンプ 1 と 2 のついたデータが見えており、 両方とも見えているレコードではタイムスタンプ 2 を優先します。 t3 ではタイムスタンプ 1 と 3 のついたデータが見えています。 t4 はタイムスタンプ 1 のついたデータだけが見えています。

  # t2 = table.transaction
  # t3 = table.transaction
  # t4 = table.transaction
  # t4 (1 alice 100) (3 carrol 100)
  # t2.update(1, 'alice', 50)
  # t2.insert(2, 'bob', 100)
  # t3.delete(3)
  # t2 (1 alice 50) (2 bob 100) (3 carrol 100)
  # t3 (1 alice 100)
  # t4 (1 alice 100) (3 carrol 100)
  @data = {
    1 => [Version.new(1, 0, {:id => 1, :name => 'alice', :v => 100}),
          Version.new(2, 0, {:id => 1, :name => 'alice', :v => 50})],
    2 => [Version.new(2, 0, {:id => 2, :name => 'bob', :v => 100})],
    3 => [Version.new(1, 0, {:id => 3, :name => 'carrol', :v => 100}),
          Version.new(3, 3, {:id => 3, :name => 'carrol', :v => 100})]}

t2 をコミットすると、 id 1 の t1 が追加したレコードの xmax に 2 が入ります。 t3 をコミットすると、 id 3 の t1 が追加したレコードの xmax に 3 が入ります。 さらに、 t3 が追加した t3 ローカルの削除レコードのバージョンを掃除して取り除きます。 t2 を先にコミットしたとすると、 その後も t3 で見えるのは相変わらず タイムスタンプ 3 と 1 だけです。 t4 で見えるのもタイムスタンプ 1 だけです。 t3 もコミットした後も t4 では相変わらずタイムスタンプ 1 だけが見えています。 一方、 t2 と t3 のコミット後のタイムスタンプ 5 で始まるトランザクション t5 からは、 タイムスタンプ 1、2、3 が見えていて、 1 よりも 2 が、 1 よりも 3 を優先します。

  # t2 = table.transaction
  # t3 = table.transaction
  # t4 = table.transaction
  # t2.update(1, 'alice', 50)
  # t2.insert(2, 'bob', 100)
  # t3.delete(3)
  # t2.commit
  # t3 (1 alice 100)
  # t4 (1 alice 100) (3 carrol 100)
  # t3.commit
  # t4 (1 alice 100) (3 carrol 100)
  # t5 = table.transaction
  # t5 (1 alice 50) (2 bob 100)
  @data = {
    1 => [Version.new(1, 2, {:id => 1, :name => 'alice', :v => 100}),
          Version.new(2, 0, {:id => 1, :name => 'alice', :v => 50})],
    2 => [Version.new(2, 0, {:id => 2, :name => 'bob', :v => 100})],
    3 => [Version.new(1, 3, {:id => 3, :name => 'carrol', :v => 100})]}
  # t4.commit
  # t5.commit

上の例から見当がつくように、 トランザクションごとに見えるレコードのバージョンを決めるには、 スナップショット取得時点より前と後のコミットを区別すれば良いわけです。 そのために、 まず現在有効なスナップショットのタイムスタンプをハッシュ表 @active に記録します。 @active のキーが有効なスナップショットのタイムスタンプです。 @active の値は集合で、 そのスナップショット取得以後のタイムスタンプを順次追加していきます。 スナップショット取得時に、 トランザクションには、 キーと値の組を渡します。 これにより、 各トランザクションはスナップショット取得以後のタイムスタンプのついたバージョンを除外することができるようになります。

class Conflict < RuntimeError; end

class Table
  def initialize()
    @xid = 0
    @active = {}
    @data = {}
  end

  def transaction()
    Transaction.new(self, @data)
  end

  def snapshot()
    @xid += 1
    xmask = Set.new(@active.each_key)
    @active[@xid] = xmask
    @active.each_value {|tx| tx << @xid }
    yield @xid, xmask
    self
  end

  def inactive(xid) @active.delete(xid); self end

#@<Table#get_version_non_repeatable@>
end

トランザクションは、 グローバル・データのハッシュ表を共用しています。 そのため、 挿入・削除・更新はデータのハッシュ表にトランザクション・ローカルなレコードをバージョンとして追加し、 それを対象に内容の更新をおこないます。

class Transaction
  def initialize(table, data)
    @table, @data = table, data
    @xid = @xmask = @change = nil
    restart
  end

  def restart
    @table.snapshot {|xid, xmask| @xid, @xmask, @change = xid, xmask, {} }
    self
  end

#@<Transaction#get_version@>
#@<Transaction#fetch@>
#@<Transaction#each@>
#@<Transaction#insert@>
#@<Transaction#update@>
#@<Transaction#delete@>
#@<Transaction#undo_change@>
#@<Transaction#commit@>
#@<Transaction#patch, undo@>
#@<Transaction#rollback@>
end

トランザクション中で見えるスナップショットは、 ある id の行にある複数のバージョンのうち、 自分自身が追加したバージョンと、 スナップショット取得時点より前からあるバージョンです。 自分自身が追加するとき、 xmin に自分のタイムスタンプ @xid を必ず付ける約束ごとにしておくと、 xmin が @xid のものと、 xmin が @xmask に含まれていないバージョンが見えているはずです。 両方あるときは xmin が @xid のものを優先します。 さらに、 削除マークがついているものを除外します。 もちろん、 削除マークがついていても xmax が @xmask に含まれている他のトランザクションのタイムスタンプなら、 その削除マークを無視します。

#@<Transaction#get_version@>=
  def get_version(id)
    if @data.key?(id)
      @data[id].reverse_each do |x|
        if x.xmin == @xid || ! @xmask.member?(x.xmin)
          return (x.xmax == 0 || (@xmask.member?(x.xmax) && x.xmax != @xid)) ? x : nil
        end
      end
    end
    nil
  end

これを使って、 レコードの読み取りと、 列挙をおこなうことができます。

#@<Transaction#fetch@>=
  def fetch(id)
    x = get_version(id)
    x and x.data
  end

#@<Transaction#each@>=
  def each()
    @data.each_key do |id|
      x = get_version(id)
      x and yield x.data
    end
    self
  end

レコードの挿入は、 スナップショットにレコードがないときに新しく挿入し、 削除済みを再挿入したときは更新に扱いを変えます。

#@<Transaction#insert@>=
  def insert(id, name, v)
    case (@change.key?(id) ? @change[id][0] : :none)
    when :none
      not get_version(id) or raise IndexError
      to = Version.new(@xid, 0, {:id => id, :name => name, :v => v})
      @change[id] = [:insert, nil, to]
      (@data[id] ||= []) << to
    when :insert, :update
      raise IndexError
    when :delete
      from = @change[id][1]
      to_data = {:id => id, :name => name, :v => v}
      if from.data == to_data
        undo_change(id)
      else
        @change[id][0] = :update
        @change[id][2].xmax = 0
        @change[id][2].data = to_data
      end
    end
    self
  end

レコードの更新は、 トランザクション・ローカルな作業レコードがないときは新しいバージョンを追加します。 挿入済みのとき、 挿入するレコードを差し替えます。 変更済みのときも、 変更後のレコードを差し替えます。

#@<Transaction::update@>=
  def update(id, name, v)
    to_data = {:id => id, :name => name, :v => v}
    case (@change.key?(id) ? @change[id][0] : :none)
    when :none
      from = get_version(id) or raise IndexError
      if from.data != to_data
        to = Version.new(@xid, 0, to_data)
        @change[id] = [:update, from, to]
        @data[id] << to
      end
    when :insert
      @change[id][2].data = to_data
    when :update
      if @change[id][1].data == to_data
        undo_change(id)
      else
        @change[id][2].data = to_data
      end
    when :delete
      raise IndexError
    end
    self
  end

レコードの削除は、 トランザクション・ローカルな削除マークのついたバージョンを追加します。 挿入済みのときは挿入を取り消します。 変更済みのときは、 削除に置き換えます。

#@<Transaction::delete@>=
  def delete(id)
    case (@change.key?(id) ? @change[id][0] : :none)
    when :none
      from = get_version(id) or raise IndexError
      to = Version.new(@xid, @xid, from.data)
      @change[id] = [:delete, from, to]
      @data[id] << to
    when :insert
      undo_change(id)
    when :update
      @change[id][0] = :delete
      @change[id][2].xmax = @xid
    when :delete
      raise IndexError
    end
    self
  end

insert、 update、 delete メソッドが使う変更取り消しメソッドは、 @data からローカル作業用のバージョンを除去し、 変更ログからも取り除きます。

#@<Transaction#undo_change@>=
  def undo_change(id)
    release_working_version(id)
    @change.delete(id)
  end

  def release_working_version(id)
    if @data.key?(id)
      @data[id].delete_if {|x| x.xmin == @xid }
      @data.delete(id) if @data[id].empty?
    end
  end

コミットによって、 トランザクションのローカルな作業バージョンから、 グローバルなコミット済みバージョンに扱いが変わります。

#@<Transaction#commit@>=
  def commit()
    log = []
    begin
      patch(log)
    rescue Conflict
      undo_commit(log)
      raise
    ensure
      @table.inactive(@xid)
      @xid = @xmask = @change = nil
    end
    self
  end

挿入・変更・削除メソッドがグローバル・データにバージョンを追加しているので、 パッチ当てはスナップショット取得時点とコミット時点のグローバル・データの比較による楽観的排他制御が処理の中心になります。 途中で衝突を検出すると、 undo_commit でトランザクションが加えた作業バージョンを取り除き、 書き換えてしまっていたバージョンの xmax を元に戻します。

#@<Transaction#patch, undo@>=
  def patch(log)
    @change.each do |id, (fn, from, to)|
      case fn
      when :insert
        not @table.get_version_non_repeatable(id) or raise Conflict
      when :update
        x = @table.get_version_non_repeatable(id) or raise Conflict
        x.xmin == from.xmin or raise Conflict
        from.xmax = @xid
        log << from
      when :delete
        x = @table.get_version_non_repeatable(id) or raise Conflict
        x.xmin == from.xmin or raise Conflict
        from.xmax = @xid
        release_working_version(id)
        log << from
      end
    end
  end

  def undo_commit(log)
    @change.each_key {|id| release_working_version(id) }
    log.each {|from| from.xmax = 0 }
  end

ロールバックでは、 ローカルな作業バージョンをグローバル・データから除去します。 その後、スナップショット取得時点からレコードが変化していないことを確認する楽観的排他制御をおこないます。

#@<Transaction#rollback@>=
  def rollback()
    @change.each_key {|id| release_working_version(id) }
    begin
      @change.each do |id, (fn, from, to)|
        case fn
        when :insert
          not @table.get_version_non_repeatable(id) or raise Conflict
        when :update, :delete
          x = @table.get_version_non_repeatable(id) or raise Conflict
          x.xmin == from.xmin or raise Conflict
        end
      end
    ensure
      @table.inactive(@xid)
      @xid = @xmask = @change = nil
    end
    self
  end

楽観的排他制御のために、 現時点でのコミット済みバージョンをグローバル・データから覗き見します。 これはクラス Table のメソッドです。

#@<Table#get_version_non_repeatable@>=
  def get_version_non_repeatable(id)
    if @data.key?(id)
      @data[id].reverse_each do |x|
        if ! @active.member?(x.xmin)
          return (x.xmax == 0 ? x : nil)
        end
      end
    end
    nil
  end