ストップ&コピー式の Baker のトレッドミル

以前、インクリメンタルな Baker のトレッドミルのようなものを作ってみましたが、リアルタイム要求が緩いときは、インクリメンタル動作をしないストップ&コピー式の方がトータル時間では有利になります。ライトバリアのオーバーヘッドがなくなるためです。

トレッドミルのデータ構造はストップ&コピー式でもインクリメンタル版と同じです。ヒープ・メモリに白色のセルを詰めて、双方向リンクで循環構造にぐるりと一周につなぎます。4 つのポインタを初期化します。トレッドミルは top と bottom のポインタがそれぞれ指している番兵セルによって 2 分され、時計回り方向で bottom と top の間が、コピー中の未走査セルリストになります。ミューテータ動作時は、未走査セルリストは空になり、bottom と top の番兵セルは隣接し、常に scan ポインタは top の番兵セルを指しつづけます。時計回りで top の次から割り当て済みセル・リストが始まり、続いて、free ポインタが指すセルからフリー・リストが始まります。フリー・リストの最後のセルの次が bottom の番兵セルになります。初期状況では、フリー・リストしかないので、top の次のセルを free ポインタが指します。

トレッドミルの状況を次のように表現することにします。先頭は scan または free ポインタが指している位置、> でルートの位置を表します。続いて bottom と top の位置、セルの中は順にセル番号、色、back と forw ポインタ、子セル・リンクのポインタ・リストを並べます。bottom を B、top を T としています。セルは bottom ポインタの番兵から始めて、時計回りに下へ並べて表示することにします。

--- init
     B  #( 0 color=white back=13 forw= 1 link=[])
scan T  #( 1 color=white back= 0 forw= 2 link=[])
free    #( 2 color=white back= 1 forw= 3 link=[])
        #( 3 color=white back= 2 forw= 4 link=[])
        ...
        #(13 color=white back=12 forw= 0 link=[])

セルの色は白色か黒色の 2 色のどちらかを整数値で区別することにします。セル色を直接シンボルにしないのには理由があり、複数のセルの色をいっせいに塗り替えるためです。そこで、ゼロか 1 の整数値のいずれが黒色かを gc_black インスタンス変数で決めることにします。初期状況では gc_black を 1 にしてますので、セルを白色にするため color をゼロにします。

class TreadmillGc
  HeapFullError = Class.new(StandardError)
  Cell = Struct.new(:color, :back, :forw, :link)

  def initialize(n = 12)
    @mem = Array.new(n + 2) {|i| Cell.new(0, i - 1, i + 1, []) }
    @mem[0].back = @mem.size - 1
    @mem[@mem.size - 1].forw = 0

    @gc_bottom = 0
    @gc_top = @mem[@gc_bottom].forw
    @gc_scan = @gc_top
    @gc_free = @mem[@gc_top].forw
    @gc_black = 1
    @root = nil
  end

#@<セルを割り当てる@>
#@<ストップ&コピーする@>

#@<デモを実行する@>
end

TreadmillGc.new(12).demo

セルを割り当てるには、free ポインタを使います。free が指すセルを新しく割り当てられるセルにします。free ポインタを時計回り方向へ 1 つずらして、フリー・リストを縮めます。例えば、free が 9 番目のセルを指していた直後に、9 番目を割り当てた状況はこうなります。なお、番兵セルの色は白色と黒色のどちらにもなりえます。

--- allocated #( 9) cell
     B  #( 0 color=white back=13 forw= 1 link=[])
scan T  #( 1 color=white back= 0 forw= 2 link=[])
       >#( 2 color=black back= 1 forw= 3 link=[3, 4])
        #( 3 color=black back= 2 forw= 4 link=[5, 6])
        ...
        #( 9 color=black back= 8 forw=10 link=[])
free    #(10 color=white back= 9 forw=11 link=[])
        #(11 color=white back=10 forw=12 link=[])
        #(12 color=white back=11 forw=13 link=[])
        #(13 color=white back=12 forw= 0 link=[])

フリー・リストが空のときは、ストップ&コピーをおこなってから、吐き出させたフリー・リストを使って割り当てをおこないます。新しく割り当てたセルを黒色にして、セル自身の free 処理をここでおこない、link リストを空にします。セル自身の free 処理を割り当て時まで遅延するのは、トレッドミルに限らずゴミ集めのオーバーヘッドを減らすためのテクニックの一つです。

#@<セルを割り当てる@>=
  def alloc
    if @gc_free == @gc_bottom
      gc_flip
      if @gc_free == @gc_bottom
        raise HeapFullError, 'heap full'
      end
    end
    x = @gc_free
    @gc_free = @mem[@gc_free].forw
    @mem[x].color = @gc_black
    @mem[x].link.clear          # セル自身の free 処理
    x
  end

ストップ&コピーするには、セルの双方向リンクを付け直すことで、ルートから辿れる生存セルのリストと、フリー・リストへセルを割り振ります。

#@<ストップ&コピーする@>=
  def gc_flip
    # top と bottom を交換する。
    @gc_bottom, @gc_top = @gc_top, @gc_bottom
    @gc_free, @gc_scan = @gc_scan, @gc_free
    # 黒色セルを一斉に白色に塗り替える。
    @gc_black = 1 - @gc_black
    # ルート・セルを黒色に塗って scan リストへ移動する。
    if not @root.nil?
      @mem[@root].color = @gc_black
      @gc_scan = gc_copy(@root, @gc_scan)
    end
    # scan リストが空になるまで、
    while @gc_scan != @gc_top
      # scan リストから一つセルを取り出して
      x = @gc_scan
      @gc_scan = @mem[@gc_scan].back
      # 取り出したセルの link リストが指す子セルに対して
      for y in @mem[x].link
        # 白色のセルを scan リストに移動する。
        next if @mem[y].color == @gc_black
        @mem[y].color = @gc_black
        @gc_scan = gc_copy(y, @gc_scan)
      end
    end
    # 参照されていない残った白色セルのリストをフリー・リストにする。
    @gc_bottom = @mem[@gc_top].back
    @mem[@gc_free].color = 1 - @gc_black # トレース表示向けに bottom セルを白に塗る。 塗らなくても問題はない。
=begin
    if @mem[@gc_top].back != @gc_bottom
      @gc_free = @mem[@gc_bottom].forw
      gc_copy(@gc_bottom, @mem[@gc_top].back)
    end
=end
  end

  # x ポインタのセルを at ポインタの次に移動する。
  def gc_copy(x, at)
    @mem[@mem[x].forw].back = @mem[x].back
    @mem[@mem[x].back].forw = @mem[x].forw
    @mem[x].back = at
    @mem[x].forw = @mem[at].forw
    @mem[@mem[x].forw].back = x
    @mem[@mem[x].back].forw = x
    x
  end

ストップ&コピーする状況ではフリー・リストが空になっています。フリー・リストが空かどうか、free ポインタと bottom ポインタが一致しているかどうかで判定可能です。このとき、すべてのセルが割り当て済みなので、番兵以外はすべて黒色になっています。

--- free list empty
free B  #( 0 color=white back=13 forw= 1 link=[])
scan T  #( 1 color=white back= 0 forw= 2 link=[])
       >#( 2 color=black back= 1 forw= 3 link=[3, 4])
        #( 3 color=black back= 2 forw= 4 link=[5, 6])
        ...
        #(13 color=black back=12 forw= 0 link=[])


最初に top と bottom を交換します。このとき、free と scan も一緒に交換します。同時に、セルの色をいっせいに黒色から白色に塗り替えます。

--- flip B <=> T
free B  #( 1 color=black back= 0 forw= 2 link=[])
       >#( 2 color=white back= 1 forw= 3 link=[3, 4])
        #( 3 color=white back= 2 forw= 4 link=[5, 6])
        ...
        #(13 color=white back=12 forw= 0 link=[])
scan T  #( 0 color=black back=13 forw= 1 link=[])

続いて、root セルを黒色に塗り、scan リストへ移動します。scan リストは scan ポインタが先頭で反時計回りに top ポインタまで伸びます。

--- copy root
free B  #( 1 color=black back= 2 forw= 3 link=[])
        #( 3 color=white back= 1 forw= 4 link=[5, 6])
        ...
        #(13 color=white back=12 forw= 0 link=[])
     T  #( 0 color=black back=13 forw= 2 link=[])
scan   >#( 2 color=black back= 0 forw= 1 link=[3, 4])

この後は、scan ポインタを反時計回りに動かして、動かす前のセルの link リストが参照している子セルの色が白色なら黒色に塗り替えて scan リストに加えます。これを scan リストが空になるまで続けます。scan リストが空かどうかは、scan ポインタが top ポインタに一致するかどうかで判定可能です。scan リストに子セルを加えるとき、top ポインタ側に加えると幅優先、scan ポインタ側に加えると深さ優先になります。深さ優先の方がプロセッサのキャッシュミスが減る傾向があるので、scan ポインタ側に付け加えることにします。

scan リストが空になると、生存しているセルはすべて黒色塗られて、top ポインタの下へ移動しています。top ポインタより上に残ったセルは不要なものですので、これからフリー・リストを作ります。フリー・リストにするには、bottom の次のセルに free ポインタを動かしてから、bottom を番兵セルごと top の番兵セルの前に移動します。

--- scan end
free B  #( 1 color=black back= 2 forw= 9 link=[])
        #( 9 color=white back= 1 forw=10 link=[])
        #(10 color=white back= 9 forw=11 link=[])
        ...
        #(13 color=white back=12 forw= 0 link=[])
scan T  #( 0 color=black back=13 forw= 5 link=[])
        #( 5 color=black back= 0 forw= 6 link=[])
        ...
       >#( 2 color=black back= 4 forw= 1 link=[3, 4])
--- gc end
     B  #( 1 color=black back=13 forw= 0 link=[])
scan T  #( 0 color=black back= 1 forw= 5 link=[])
        #( 5 color=black back= 0 forw= 6 link=[])
        ...
       >#( 2 color=black back= 4 forw= 9 link=[3, 4])
free    #( 9 color=white back= 2 forw=10 link=[])
        #(10 color=white back= 9 forw=11 link=[])
        ...
        #(13 color=white back=12 forw= 1 link=[])

デモはインクリメンタル版と同じです。

#@<デモを実行する@>=
  def run
    @root = a = alloc
    @mem[a].link[0] = b = alloc
    @mem[a].link[1] = c = alloc
    @mem[b].link[0] = d = alloc
    @mem[b].link[1] = e = alloc
    @mem[c].link[0] = f = alloc
    @mem[c].link[1] = g = alloc
    @mem[f].link[0] = a
    20.times do |i|
      alloc
      print_trace("run #{i}")
    end
  rescue HeapFullError => e
    puts '--- heap full'
    dump
  end

  def dump
    x = @gc_bottom
    @mem.each_index do |i|
      puts '%s %s %s#(%2d color=%s back=%2d forw=%2d link=%s)' % [
        x == @gc_scan ? 'scan' : x == @gc_free ? 'free' : '    ',
        x == @gc_bottom ? 'B' : x == @gc_top ? 'T' : ' ',
        x == @root ? '>' : ' ',
        x,
        @mem[x].color == @gc_black ? 'black' : 'white',
        @mem[x].back,
        @mem[x].forw,
        @mem[x].link.nil? ? '' : @mem[x].link.to_s,
      ]
      x = @mem[x].forw
    end
  end