B+ シーケンス木で挿入・分割、 削除・統合をそれぞれ一体化

引き続き、 実用性をめざして、 ページを大きくできるように手を加えていきます。

最初に、 find_index をページ内で要素数注釈が単調増加することを利用して二分探索へ変えます。 なお、 最右要素数よりも大きいときは、 最後の添字を返すことにします。 前までは BplusSequence クラスの手続きにしていましたが、 今度は Page クラスのメソッドにしてます。

class Page
  attr_reader :cell
  attr_accessor :back, :forw

  def initialize
    @cell = []
    @back = @forw = nil
  end

  def find_index(pos)
    # (0 ... @cell.size).find{|i| pos < @cell[i].count } || @cell.size - 1
    b, d = 0, @cell.size
    while b < d
      u = b + (d - b) / 2
      if pos < @cell[u].count
        d = u
      else
        b = u + 1
        break if pos == @cell[u].count
      end
    end
    b < @cell.size ? b : @cell.size - 1
  end

#@<cell 配列へのアダプタ・メソッド@>
end

ついでに、 Page クラスを Cell 配列のアダプタにすることにして、 他の BplusSequence クラスの Page 操作手続きをメソッドとして追加します。

#@<cell 配列へのアダプタ・メソッド@>=
  def leaf?() @cell.empty? or not Page === @cell.first.value end
  def count() (@cell.empty?) ? 0 : @cell.last.count end
  def size() @cell.size end
  def empty?() @cell.empty? end
  def [](r) @cell[r] end
  def []=(r, x) @cell[r] = x end
  def insert(i, x) @cell.insert(i, x); self end
  def delete_at(i) @cell.delete_at(i); self end

  def rotate(pos, src, range)
    @cell[pos, 0] = src.cell[range]
    src.cell[range] = []
  end

これらを使うよう BplusSequence を改めます。 手続きから Page クラスのメソッドへと機械的に書き直していきます。 一点だけ、 前と異なるのは、 前は BplusSequence クラスの page_size 手続きが B+ 木の流儀で子の間に挟まれた要素数注釈の個数を返していたのですが、 今回は Page のセル配列の個数を返すように変更したことです。 B+ 木のアルゴリズムを参考にするときは前の流儀の方が向いているのですが、 デバッグ時にダンプするときに紛らわしくて、 何度も混乱したので、 素直にセル配列の個数を返すようにしました。 この場合、 B+ 木の流儀での HALF_SIZE、 FULL_SIZE が「子の間に挟まれた要素数注釈の個数」であるため、 page.size - 1 < HALF_SIZE のように 1 つ少なくして比較をしないといけません。 これはこれで間違いの元になりそうなので、 underflow? 述語メソッドを将来追加するかもしれません。

class BplusSequence
  ORDER = 5  # at most children
  HALF_SIZE = (ORDER + 1) / 2 - 1
  FULL_SIZE = ORDER - 1

  attr_reader :root, :first, :last

  def initialize
    @root = Page.new
    @first = @last = @root
  end

  def fetch(pos)
    (0 <= pos and pos < @root.count) or raise RangeError
    page, pos, _ = trace(pos)
    page[pos].value
  end

  def store(pos, value)
    (0 <= pos and pos < @root.count) or raise RangeError
    page, pos, _ = trace(pos)
    page[pos].value = value
    self
  end

#@<insert メソッド@>
#@<delete_at メソッド@>
#@<overflow メソッド@>
#@<underflow メソッド@>

  def trace(pos)
    parent = []
    page = @root
    while not page.leaf?
      i = page.find_index(pos)
      parent.push page, i
      pos -= page[i - 1].count if i > 0
      page = page[i].value
    end
    [page, pos, parent]
  end

  def relink(u, x, y, v)
    x.back = u if not x.nil?
    y.forw = v if not y.nil?
  end
end

BplusSequence クラスの insert メソッドのページへの挿入と分割を一体化処理します。 これまでは、 挿入してみてあふれたら分割する書き方をしていました。 その書き方は記述が簡潔になる反面、 余分なメモリ・コピーが発生しがちで、 ページが大きくなるほどオーバーヘッドが増えます。 今度は、 挿入前にページが満杯かどうかをチェックし、 対象ページを分割してから挿入します。 満杯かどうかは、 ページの子の個数が ORDER に達しているかどうかでチェックできます。 ただし、 根に特別扱いが必要なので、 根以外の満杯ページのときに while ループを続けるようにします。 ループの中で、 満杯ページを分割する直前に、 親ページの要素数注釈に 1 を加えるのは前と同じです。 分割するとき、 満杯のページの右側に新しくページを追加するやりかたも前と同じです。 left 変数を満杯のページを参照する Cell オブジェクトへ、 right 変数を分割で新しくできるページを参照する Cell オブジェクトへ束縛します。 これらの束縛を、 前は overflow メソッドでやっていたのを、 insert メソッドでおこないます。 それに対して、 前の insert でおこなっていたセルの挿入は overflow メソッドでおこないます。 分割すると、 left は親ページの Cell オブジェクトへ束縛しているので、 副作用で親ページの満杯だったページが小さくなります。 この時点で、 right セルは親ページへ挿入しません。 次回の while ループの条件判定で、 親ページが満杯かどうかをチェックして、 right セルの挿入前に分割が必要かどうかを判断します。

ページが満杯でないときは、 その後の分割しなくて良くなります。 セルを挿入してから、 根まで戻りながら要素数注釈の書き直しをおこないます。

根が満杯のときは、 木の高さを一つ伸ばします。 満杯の根を left セルに、 分割で生じる新しいページを right セルにそれぞれ入れてから、 overflow メソッドで分割と挿入をおこないます。 分割・挿入の終わった 2 つのセルを、 新しいページに入れてそれを新しい根にします。

#@<insert メソッド@>=
  def insert(pos, value)
    (0 <= pos and pos <= @root.count) or raise RangeError
    page, pos, parent = trace(pos)
    (pos ... page.size).each {|i| page[i].count += 1 }
    cell = Cell.new(value, pos + 1)
    # ページが満杯のとき、 分割・挿入します。 ただし根ではないこと。
    while page.size >= ORDER and not parent.empty?
      page, idx = parent.pop(2)
      (idx ... page.size).each {|i| page[i].count += 1 }
      # この時点での page は while ループに入るときの page の親ページです。
      # while ループに入った時点の page は left.value です。
      left, right = page[idx], Cell.new(Page.new, 0)
      overflow(left, right, pos, cell)
      # 次回に page の idx+1 位置に right セルを挿入します。
      pos, cell = idx + 1, right
    end
    if page.size < ORDER
      # ページに空きがあるので、 挿入してもあふれません。
      page.insert(pos, cell)
      # 根まで戻りながら要素数注釈を更新します。
      while not parent.empty?
        page, idx = parent.pop(2)
        (idx ... page.size).each {|i| page[i].count += 1 }
      end
    elsif parent.empty?
      # 根が満杯のときは、 分割・挿入し、 2 つのセルを持つ新しい根を作って、 木を一段高くします。
      count = (pos < @root.size) ? @root.count : cell.count
      left, right = Cell.new(@root, count), Cell.new(Page.new, 0)
      overflow(left, right, pos, cell)
      @root = Page.new
      @root.insert(0, left)
      @root.insert(1, right)
    end
    self
  end

挿入するとページがあふれるときは、 ページをまず分割します。 分割した後、 左右どちらに挿入をおこなうかによって、 分割の区切りが一要素分ずれるのを考慮します。 overflow メソッドの引数は Page ではなく、 Cell にしています。 副作用ありの書き方をしており、 木に含まれている方を左側のページとし、 木にこれから追加する方を右側のページにしています。 なお、 下の記述には、 right.value ページの方へ挿入するとき、 余計なメモリ・コピーがまだ残ってます。

#@<overflow メソッド@>=
  # ..,(left.value,left.count),(right.value,right.count),...
  # 0    p    r-1  m                  0    r    p    m
  # [a..b|d..e|f..g]                  [a..b|c..d|f..g]
  # 0    p      r    0    m-r+1       0    r           0    p-r    m-r+1
  # [a..b|c|d..e]    [f..g]           [a..b]           [c..d|e|f..g]
  def overflow(left, right, pos, cell)
    #(Cell === left and Cell === right) or ArgumentError
    m = left.value.size
    r = (m + 1) / 2
    if pos < r
      right.value.rotate(0, left.value, r - 1 ... m)
      left.value.insert(pos, cell)
    else
      right.value.rotate(0, left.value, r ... m)
      right.value.insert(pos - r, cell)
    end
    d = left.value[r - 1].count
    (0 ... m - r + 1).each {|i| right.value[i].count -= d }
    right.count = left.count
    left.count -= right.value.count
    if left.value.leaf?
      relink(right.value, left.value.forw, right.value, left.value.forw)
      relink(left.value, right.value, left.value, right.value)
      @last = right.value if right.value.forw.nil?
    end
  end

削除とページの連結も一体化させるのですが、 記述内容は同じで変化はありません。

#@<delete_at メソッド@>=
  def delete_at(pos)
    (0 <= pos and pos < @root.count) or raise RangeError
    page, pos, parent = trace(pos)
    page.delete_at(pos)
    (pos ... page.size).each {|i| page[i].count -= 1 }
    while not parent.empty?
      page, pos = parent.pop(2)
      (pos ... page.size).each {|i| page[i].count -= 1 }
      underflow(page, pos) if page[pos].value.size - 1 < HALF_SIZE
    end
    if not @root.leaf? and @root.size == 1
      @root = @root[0].value
    end
    self
  end

前の underflow メソッドは、 左右のページを一つにまとめてから、 あふれたときはまた分割するという富豪的な記述でした。 今度は、 ページ統合・右削除、 左ページへセルを移動する再配分、 右ページへセルをする再配分、 の 3 通りをまじめに判断して処理します。 m, n は再配分する前のページ長です。 これらから再配分後のページ長さ m1, n1 を求めます。 左右の合計セル数が奇数のとき、 余りを左右のどちらかへ入れることになります。 ここでは左へ余りを含めることにし、 再配分後に左セル数≧右セル数になるように個数を割り振ります。 再配分後に右ページが半分以下にしかならないときは、 左右を左へ統合して、 右セルを削除します。 再配分後、 左が増えるとき、 右が増えるときをそれぞれ処理します。

#@<underflow メソッド@>=
  def underflow(page, pos)
    pos + 1 < page.size or pos -= 1
    left, right = page[pos], page[pos + 1]
    m, n = left.value.size, right.value.size
    m1, n1 = (m + n + 1) / 2, (m + n) / 2
    if n1 - 1 < HALF_SIZE
      underflow_join_left(left, right)
      underflow_delete_right(page, pos)
    elsif m < m1
      underflow_balance_left(left, right, m1 - m)
    else
      underflow_balance_right(left, right, m1)
    end
  end

#@<underflow_join_left メソッド@>
#@<underflow_delete_right メソッド@>
#@<underflow_balance_left メソッド@>
#@<underflow_balance_right メソッド@>

左セルと右セルを左ヘルへ統合します。 要素数注釈のつじつまあわせをしてから、 右ページ内容を左ページの末尾へ追加します。 統合後の左セルの要素数注釈は右セルのものに変わります。

#@<underflow_join_left メソッド@>=
  #   0    m         0    n
  #   [a..b]         [c..d]
  #   0    m    m+n
  #   [a..b|c..d]
  def underflow_join_left(left, right)
    m, n = left.value.size, right.value.size
    d = left.value[m - 1].count
    (0 ... n).each {|i| right.value[i].count += d }
    left.value.rotate(m, right.value, 0 ... n)
    left.count = right.count
  end

右セルを削除します。 まず、 左右のページが葉のとき、 葉のリストからも右ページをとりのぞきます。 このとき、 今の Page オブジェクトの葉判定ロジックでは、 右ページのセルは空になっているので正しく判定ができず、 左ページで判定しなければなりません。 続いて、 親ページ page 中の右セルを削除します。

#@<underflow_delete_right メソッド@>=
  def delete_right_cell(page, pos)
    left, right = page[pos], page[pos + 1]
    if left.value.leaf?
      relink(left.value, right.value.forw, left.value, right.value.forw)
      @last = left.value if right.value.forw.nil?
    end
    page.delete_at(pos + 1)
  end

左ページが増えるとき、 右ページから左ページへセルを移動します。 要素数注釈のつじつまあわせは、 移動対象セルと、 右に残るセルの両方に施します。

#@<underflow_balance_left メソッド@>=
  #   0    m    0    r    n
  #   [a..b]    [c..d|e..f]
  #   0    m    m+r  0    n-r
  #   [a..b|c..d]    [e..f]
  def underflow_balance_left(left, right, r)
    m, n = left.value.size, right.value.size
    left.value.rotate(m, right.value, 0 ... r)
    d = left.value[m + r - 1].count
    (0 ... n - r).each {|i| right.value[i].count -= d }
    d = left.value[m - 1].count
    (m ... m + r).each {|i| left.value[i].count += d }
    left.count = right.count - right.value.count
  end

右ページが増えるとき、 左ページから右ページへセルを移動します。 やはり、 移動対象セルと、 右に残るセルの両方の要素数注釈を書き換えます。

#@<underflow_balance_right メソッド@>=
  #   0    r    m    0    n
  #   [a..b|c..d]    [e..f]
  #   0    r    0    m-r  m+n-r
  #   [a..b]    [c..d|e..f]
  def underflow_balance_right(left, right, r)
    m, n = left.value.size, right.value.size
    right.value.rotate(0, left.value, r ... m)
    d = left.value[r - 1].count
    (0 ... m - r).each {|i| right.value[i].count -= d }
    d = right.value[m - r - 1].count
    (m - r ... m + n - r).each {|i| right.value[i].count += d }
    left.count = right.count - right.value.count
  end