b+ trees

Table of Contents

1 Intro to B+ Trees

1.1 TODO what is a b+ tree?

1.2 TODO what problem does a b+ tree overcome?

2 Object Model and Public Interface

2.1 unit bp;

{ B+ Trees for pascal }
{$DEFINE DELPHIMODE} {$i xpc.inc} {$H+}
unit bp;
interface uses xpc, sysutils;

  const empty = maxint;

  type
    TKey   = integer;
    TVal   = TObject;
    IBPlus = interface
      <<IBPlusMethods>>
    end;
    TTree  = class; // forward reference for TIter
    <<TNode>>
    <<TIter>>
    <<TTree>>

2.2 Public IBPlus interface

procedure put( key : TKey; val : TVal );
//procedure del( key : TKey );
//function get( key : TKey ): TVal;
//function has( key : Tkey ): boolean;

2.3 type TNode

TNode = class ( TInterfacedObject, IBPlus )
 public
  constructor CreateLeaf( size : integer );
  constructor CreateInner( size : integer );
  <<IBPlusMethods>>
  function tostring: string; override;
 private
  isleaf      : boolean;
  _parent   : TNode;
  _next     : TNode;
  keys : array of TKey;
  vals : array of TObject;
  function locate( key : TKey ): TNode;
  function inskey( key : TKey ): cardinal;
  function full : boolean;
  procedure split;
  procedure rebalance;
  function firstkey : TKey;
  function isroot : boolean;
  procedure addchild( key : TKey; kid : TNode );
  function next : TNode;
  function findslot( key : TKey ): word;
  function findroot : TNode;
  function isinner : boolean;
end;

2.4 type TTree

TTree = class
 public
  constructor Create( branchfactor : integer = 64 );
  <<IBPlusMethods>>
  function GetEnumerator: TIter;
  function tostring: string; override;
 private
  root : TNode;
end;

2.5 type TIter

TIter = class
private
  tree : TTree; // only useful to support Reset
  node : TNode;
  step : integer;  // index/offset within the node
public
  constructor Create( aTree : TTree );
  function GetCurrent : TVal;
  function MoveNext : Boolean;
  procedure Reset;
  property Current:TVal read GetCurrent;
end;

3 Implementation (roughly following wikipedia: B+ Trees)

3.1 TODO [6/8] Node class

3.1.1 DONE constructor

constructor TNode.CreateLeaf( size : integer );
  var i : integer;
begin
  isleaf := true;
  _next  := nil;
  _parent := nil;
  SetLength( keys, size + 1 );
  SetLength( vals, size + 1 ); // extra link
  for i := 0 to size do
  begin
    keys[i] := empty;
    vals[i] := nil;
  end;
end;

constructor TNode.CreateInner( size : integer );
begin
  self.CreateLeaf(size);
  self.isleaf := false;
end;

3.1.2 DONE [1/1] location

  1. DONE TNode.locate
    { Find first child key less the new key. It is important that
      both =locate= and =inskey= use the same algorithm for locating
      the key, or the linked list at the bottom of the tree will break.
    
      Further, we will make sure that we walk left to right (lowest
      key to highest) because in inner nodes, the links always point to
      the first child.
    
      Example:
    
          tree: [ 5:[ 5 _ _ _ ] 8:[ 8 12 _ _ ] 50:[ 50 _ _ _ ] _ ]
          goal: find slot for 24
    
      Here, 24 is both "after 8" or "before 50". It's important to check
      both numbers, and then choose the /leftmost/ side of the range.
      (If we had chosen to use the highest value for the child keys instead
      of the lowest, then of course we would choose the rightmost child.) }
    
    function TNode.FindSlot( key : TKey ) : word;
    begin
      result := 0;
      if key >= keys[ 0 ] then
        repeat
          inc(result)
        until (result = high(keys)) or (key < keys[ result ])
    end;
    
    { This search routine always succeeds, since it finds the location
      where the key /should/ be in the tree, whether it's /actually/
      there or not. }
    function TNode.locate( key : TKey ) : TNode;
    var slot : integer;
    begin
      if isleaf
        then result := self
        else begin
          slot := findslot(key);
          if slot > 0 then dec( slot ); // keys[slot] = empty then dec( slot );
          result := (vals[slot] as TNode).locate( key );
        end
    end;
    

3.1.3 DONE [9/9] insertion

  1. DONE TNode.put
    procedure TNode.put( key : TKey; val : TVal );
    begin
      if isleaf then
        begin
          vals[inskey( key )] := val;
          rebalance;
        end
      else locate( key ).put( key, val )
    end;
    
  2. DONE TNode.inskey
    { insert key }
    function TNode.inskey( key : TKey ) : cardinal;
     var i, slot : integer;
    begin
      slot := findslot( key );
    
      // if first value is about to change, update the parent key:
      if (slot = 0) and not isroot then
      begin
        i := 0;
        while _parent.keys[i] <> keys[0] do inc(i);
        _parent.keys[i] := key;
      end;
    
      for i := high(keys) downto slot+1 do
      begin
        vals[i] := vals[i-1];
        keys[i] := keys[i-1];
      end;
    
      { finally, fill the hole we just made, and return its location }
      keys[ slot ] := key;
      vals[ slot ] := nil;
      result := slot
    end;
    
  3. DONE TNode.rebalance
    procedure TNode.rebalance;
    begin
      if full then split
    end;
    
  4. DONE TNode.full
    function TNode.full : boolean;
    begin
      result := keys[high(keys)] <> empty;
    end;
    
  5. DONE TNode.split
    procedure TNode.split;
      var newnode: TNode;
      <<copyhalf>>
    begin
      if isleaf then
        begin
          newnode := TNode.CreateLeaf( length(keys ));
          newnode._next := _next;
          _next := newnode;
        end
      else newnode := TNode.CreateInner( length(keys ));
      copyhalf;
      if isroot then begin
        _parent := TNode.CreateInner( length( keys ));
        _parent.addchild( self.firstkey, self )
      end;
      _parent.addchild( newnode.firstkey, newnode )
    end;
    
  6. DONE
    procedure copyhalf;
      var i, half : integer;
    begin
      half := length(keys) div 2;
      for i := half to high(keys) do
      begin
        newnode.keys[i-half] := keys[i];
        newnode.vals[i-half] := vals[i];
        if (newnode.isinner) and (vals[i] <> nil) then
          (vals[i] as TNode)._parent := newnode;
        keys[i] := empty;
        vals[i] := nil;
      end;
    end;
    
  7. DONE TNode.isroot
    function TNode.isroot : boolean;
    begin
      result := not assigned(_parent);
    end;
    
  8. DONE TNode.isinner
    function TNode.isinner : boolean;
    begin
      result := not isleaf
    end;
    
  9. DONE firstkey
    function TNode.firstkey : TKey;
    begin
      result := keys[0]
    end;
    
  10. DONE addchild
    procedure TNode.addchild( key : TKey; kid : TNode );
    begin
      assert( not isleaf );
      vals[inskey( key )] := kid;
      kid._parent := self;
      rebalance;
    end;
    

3.1.4 DONE iteration

function TNode.next : TNode;
begin
  result := _next;
end;

3.1.5 DONE finding the new root of the tree

function TNode.findroot : TNode;
begin
  if isroot
    then result := self
    else result := _parent.findroot
end;

3.1.6 DONE debugging with .tostring

function TTree.tostring: string;
begin
  result := self.root.tostring;
end;

var gIndent : string = '';
function TNode.ToString : string;
  var s : string; i : integer;
begin

  s := '';

  // draw the keys
  if isleaf then s += '['  else s += '{';

  for i := low(keys) to high(keys) do
    if (i = 0) and (keys[i] = empty) then s := s + ' < '
    else if (self.keys[i] = empty) then s := s + ' - '
    else s := s + ' ' + IntToStr(self.keys[ i ]) + ' ';

  if isleaf and assigned(_next) then
    s := s + ' -> ' + IntToStr(_next.firstkey);

  if isleaf then s += ' ]' else s += ' }';

  // draw the values
  gIndent += '  ';
  for i := low(keys) to high(keys) do
    if assigned(self.vals[ i ]) then
       s := s + lineending + gIndent + self.vals[ i ].ToString;
  setlength(gIndent, length(gIndent)-2);
  result := s;
end;

3.1.7 TODO deletion

procedure TNode.del( key : TKey );
begin
end;

3.1.8 TODO bulk-loading

constructor TNode.bulk( pairs : TKeyValPair );
begin
end;

3.2 TODO [0/1] The container class (TTree)

3.2.1 TODO [0/0] constructor

constructor TTree.Create( branchfactor : integer = 64 );
begin
  root := TNode.CreateLeaf( branchfactor );
end;

3.2.2 put

procedure TTree.put( key : TKey; val : TVal );
begin
  root.put( key, val );
  // find the new root
  root := root.findroot; // "for the home team"
end;

3.2.3 enumerator

function TTree.GetEnumerator : TIter;
begin
  result := TIter.Create( self )
end;

3.3 TODO [4/4] TIter : iterator for the Tree class

3.3.1 TEXT Walking a B+-Tree

The leaf nodes of a B+Tree are linked together to form a chain, making it easy to iterate over the links in sequential order.

3.3.2 DONE constructor

constructor TIter.Create( aTree : TTree );
begin
  self.tree := aTree;
  self.Reset;
end;

3.3.3 DONE reset

procedure TIter.Reset;
begin
  // this should walk down to the first (leftmost) leaf node
  node := self.tree.root.locate(-maxint);
  step := -1;
end;

3.3.4 DONE getcurrent

function TIter.GetCurrent : TObject;
begin
  result := node.vals[step];
end;

3.3.5 DONE movenext

function TIter.MoveNext : Boolean;
begin
  result := false;
  if assigned(node) then begin
    inc(step);
    if (step > high(node.keys))
    or (node.keys[step] = empty)
    then begin
      node := node.next;
      step := 0;
    end;
    result := assigned(node);
  end
end;

3.4 OUTPUT: bp.pas

{ NOTE : this file is generated from ../ref/bplus.org , so...
  --->> DON'T EDIT THIS FILE! <<--- }
<<interface>>
implementation
  <<imp>>
end.

4 Usage: An Indexed Triple Store

4.1 declarations

{$i xpc.inc}
program bpdemo;
uses bp, strutils, sysutils;

  const kMax = 255; // 65535;  { change if you want to test speed }
  type
    TTriple = class
      sub, rel, obj : integer;
      constructor Create( SubId, RelId, ObjId : integer );
      procedure Print;
      function tostring: string; override;
      // function reversed : IEnumerator;
    end;

  constructor TTriple.Create( SubId, RelId, ObjId : integer );
  begin
    sub := subid;
    rel := relid;
    obj := objid;
  end;


  var building : boolean = true;
  function TTriple.ToString : string;
  begin
    result := '('  + PadLeft(IntToStr( sub ), 4) +
              ', ' + PadLeft(IntToStr( rel ), 4) +
              ', ' + PadLeft(IntToStr( obj ), 4) +
              ')';
  end;

  procedure TTriple.print;
  begin
    writeln( self.tostring )
  end;

  <<main>>
end.

4.2

4.2.1 populate

  var
    subs, rels, objs : bp.TTree;
    trip             : TObject;
    i, j, k, tmp     : cardinal;
    nums             : array [0..2, 0..kMax] of word;
begin

randomize;

{ create three indices for a triplestore }
subs := bp.TTree.create(16); // just to make the trace interesting
rels := bp.TTree.create;
objs := bp.TTree.create;

{ generate the numbers 0..kMax in three columns }
for j := 0 to 2 do for i := 0 to kMax do nums[j][i] := i;

{ shuffle the columns independently }
for j := 0 to 2 do for i := 0 to kMax * 4 do
begin
  k := random(kMax);
  tmp := nums[j][k];
  nums[j][k] := nums[j][k+1];
  nums[j][k+1] := tmp;
end;

{ initial index: }
writeln('initial index:');
writeln(subs.tostring);

{ generate and index the random triples }
for i := 0 to kMax do begin
  trip := TTriple.create(nums[0][i], nums[1][i], nums[2][i]);
  with TTriple(trip) do begin
    { for debugging, show one of the indices being built step by step }
    writeln;
    writeln('adding key:', sub:2 ); //, '-> ', rel:2, ', ',  obj:2 );

    subs.put( sub, trip );
    rels.put( rel, trip );
    objs.put( obj, trip );

    writeln(subs.tostring);
  end;
end;

4.2.2 print forward

 building := false;
{ print them in order by each index }
writeln('--subs--');
for trip in subs do TTriple(trip).print;
writeln('--rels--');
for trip in rels do TTriple(trip).print;
writeln('--objs--');
for trip in objs do TTriple(trip).print;

4.2.3 TODO print backward

{ and reversed }
writeln('--subs desc--');
for trip in subs.reversed do TTriple(trip).print;
writeln('--objs desc--');
for trip in objs.reversed do Triple(trip).print;
writeln('--rels desc--');
for trip in rels.reversed do TTriple(trip).print;

5 end