view lshlib.cpp @ 340:a6edbe97fddf

Added LSH_CORE_ARRAY structure for hashtables instead of linked lists. Maintained Backwards Compatibiliity with indexes build for linked list format. Added tests for indexing and merging. Tested backwards compatibility OK.\n\n The purpose of the LSH_CORE_ARRAY data structure is greater space efficiency and L1/2 cache usage. Essential for multiple indexes with multiple hashtables in RAM
author mas01mc
date Wed, 10 Sep 2008 18:55:16 +0000
parents fe4d5b763086
children fdeb8db97678
line wrap: on
line source
#include "lshlib.h"

//#define LSH_DUMP_CORE_TABLES
//#define USE_U_FUNCTIONS

// Use backward-compatible CORE ARRAY lsh index
#define LSH_CORE_ARRAY  // Set to use arrays for hashtables rather than linked-lists
#define LSH_LIST_HEAD_COUNTERS // Enable counters in hashtable list heads

// Critical path logic
#if defined LSH_CORE_ARRAY && !defined LSH_LIST_HEAD_COUNTERS
#define LSH_LIST_HEAD_COUNTERS
#endif

#define LSH_CORE_ARRAY_BIT (0x80000000) //  LSH_CORE_ARRAY test bit for list head


void err(char*s){cout << s << endl;exit(2);}

Uns32T get_page_logn(){
  int pagesz = (int)sysconf(_SC_PAGESIZE);
  return (Uns32T)log2((double)pagesz);  
}

unsigned align_up(unsigned x, unsigned w){ return ((x) + ((1<<w)-1) & ~((1<<w)-1)); }

void H::error(const char* a, const char* b, const char *sysFunc) {
  cerr << a << ": " << b << endl;
  if (sysFunc) {
    perror(sysFunc);
  }
  exit(1);
}

H::H(){
  // Delay initialization of lsh functions until we know the parameters
}

H::H(Uns32T kk, Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float ww, float rr):
#ifdef USE_U_FUNCTIONS
  use_u_functions(true),
#else
  use_u_functions(false),
#endif
  maxp(0),
  bucketCount(0),
  pointCount(0),
  N(NN),
  C(CC),
  k(kk),
  m(mm),
  L((mm*(mm-1))/2),
  d(dd),
  w(ww),
  radius(rr)
{

  if(m<2){
    m=2;
    L=1; // check value of L
    cout << "warning: setting m=2, L=1" << endl;
  }
  if(use_u_functions && k%2){
    k++; // make sure k is even
    cout << "warning: setting k even" << endl;
  }
  
  // We have the necessary parameters, so construct hashfunction datastructures
  initialize_lsh_functions();
}

void H::initialize_lsh_functions(){
  H::P = UH_PRIME_DEFAULT;
  
  /* FIXME: don't use time(); instead use /dev/random or similar */
  /* FIXME: write out the seed somewhere, so that we can get
     repeatability */
#ifdef MT19937
  init_genrand(time(NULL));
#else
  srand(time(NULL)); // seed random number generator
#endif
  Uns32T i,j, kk;
#ifdef USE_U_FUNCTIONS
  H::A = new float**[ H::m ]; // m x k x d random projectors
  H::b = new float*[ H::m ];  // m x k random biases
#else
  H::A = new float**[ H::L ]; // m x k x d random projectors
  H::b = new float*[ H::L ];  // m x k random biases
#endif
  H::g = new Uns32T*[ H::L ];   //  L x k random projections 
  assert( H::g && H::A && H::b ); // failure
#ifdef USE_U_FUNCTIONS
  // Use m \times u_i functions \in R^{(k/2) \times (d)}
  // Combine to make L=m(m-1)/2 hash functions \in R^{k \times d}
  for( j = 0; j < H::m ; j++ ){ // m functions u_i(v)
    H::A[j] = new float*[ H::k/2 ];  // k/2 x d  2-stable distribution coefficients     
    H::b[j] = new float[ H::k/2 ];   // bias
    assert( H::A[j] && H::b[j] ); // failure
    for( kk = 0; kk < H::k/2 ; kk++ ){
      H::A[j][kk] = new float[ H::d ];
      assert( H::A[j][kk] ); // failure
      for(Uns32T i = 0 ; i < H::d ; i++ )
	H::A[j][kk][i] = H::randn(); // Normal
      H::b[j][kk] = H::ranf()*H::w;        // Uniform
    }
  }
#else
  // Use m \times u_i functions \in R^{k \times (d)}
  // Combine to make L=m(m-1)/2 hash functions \in R^{k \times d}
  for( j = 0; j < H::L ; j++ ){ // m functions u_i(v)
    H::A[j] = new float*[ H::k ];  // k x d  2-stable distribution coefficients     
    H::b[j] = new float[ H::k ];   // bias
    assert( H::A[j] && H::b[j] ); // failure
    for( kk = 0; kk < H::k ; kk++ ){
      H::A[j][kk] = new float[ H::d ];
      assert( H::A[j][kk] ); // failure
      for(Uns32T i = 0 ; i < H::d ; i++ )
	H::A[j][kk][i] = H::randn(); // Normal
      H::b[j][kk] = H::ranf()*H::w;        // Uniform
    }
  }
#endif

  // Storage for LSH hash function output (Uns32T)
  for( j = 0 ; j < H::L ; j++ ){    // L functions g_j(u_a, u_b) a,b \in nchoosek(m,2)
    H::g[j] = new Uns32T[ H::k ];   // k x 32-bit hash values, gj(v)=[x0 x1 ... xk-1] xk \in Z
    assert( H::g[j] );
  }

  // LSH Hash tables
  H::h = new bucket**[ H::L ];  
  assert( H::h );
  for( j = 0 ; j < H::L ; j++ ){
    H::h[j] = new bucket*[ H::N ];
    assert( H::h[j] );
    for( i = 0 ; i < H::N ; i++)
      H::h[j][i] = 0;
  }

  // Standard hash functions
  H::r1 = new Uns32T*[ H::L ];
  H::r2 = new Uns32T*[ H::L ];
  assert( H::r1 && H::r2 ); // failure
  for( j = 0 ; j < H::L ; j++ ){
    H::r1[ j ] = new Uns32T[ H::k ];
    H::r2[ j ] = new Uns32T[ H::k ];
    assert( H::r1[j] && H::r2[j] ); // failure
    for( i = 0; i<H::k; i++){
      H::r1[j][i] = randr();
      H::r2[j][i] = randr();
    }
  }

  // Storage for whole or partial function evaluation depdenting on USE_U_FUNCTIONS
  H::initialize_partial_functions();
}

void H::initialize_partial_functions(){

#ifdef USE_U_FUNCTIONS
  H::uu = vector<vector<Uns32T> >(H::m);
  for( Uns32T aa=0 ; aa < H::m ; aa++ )
    H::uu[aa] = vector<Uns32T>( H::k/2 );
#else
  H::uu = vector<vector<Uns32T> >(H::L);
  for( Uns32T aa=0 ; aa < H::L ; aa++ )
    H::uu[aa] = vector<Uns32T>( H::k );
#endif
}


// Generate z ~ N(0,1)
float H::randn(){
// Box-Muller
  float x1, x2;
  do{
    x1 = ranf();
  } while (x1 == 0); // cannot take log of 0
  x2 = ranf();
  float z;
  z = sqrtf(-2.0 * logf(x1)) * cosf(2.0 * M_PI * x2);
  return z;
}

float H::ranf(){
#ifdef MT19937
  return (float) genrand_real2();
#else
  return (float)( (double)rand() / ((double)(RAND_MAX)+(double)(1)) );
#endif
}

// range is 1..2^29
/* FIXME: that looks like an ... odd range.  Still. */
Uns32T H::randr(){
#ifdef MT19937
  return (Uns32T)((genrand_int32() >> 3) + 1);
#else
  return (Uns32T) ((rand() >> 2) + 1); 
#endif
}

// Destruct hash tables
H::~H(){
  Uns32T i,j,kk;
  bucket** pp;
#ifdef USE_U_FUNCTIONS
  for( j = 0 ; j < H::m ; j++ ){
    for( kk = 0 ; kk < H::k/2 ; kk++ )
      delete[] A[j][kk];
    delete[] A[j];
  }
  delete[] A;    
  for( j = 0 ; j < H::m ; j++ )
    delete[] b[j];
  delete[] b;
#else
  for( j = 0 ; j < H::L ; j++ ){
    for( kk = 0 ; kk < H::k ; kk++ )
      delete[] A[j][kk];
    delete[] A[j];
  }
  delete[] A;    
  for( j = 0 ; j < H::L ; j++ )
    delete[] b[j];
  delete[] b;
#endif

  for( j = 0 ; j < H::L ; j++ )
    delete[] g[j];
  delete[] g;
  for( j=0 ; j < H::L ; j++ ){
      delete[] H::r1[ j ];
      delete[] H::r2[ j ];
      for(i = 0; i< H::N ; i++){
	pp = 0;
#ifdef LSH_CORE_ARRAY
	  if(H::h[ j ][ i ])
	    if(H::h[ j ][ i ]->t2 & LSH_CORE_ARRAY_BIT){	      
	      pp = get_pointer_to_bucket_linked_list(H::h[ j ][ i ]);
	      if(*pp){
		(*pp)->snext=0; // Head of list uses snext as a non-pointer value
		delete *pp;   // Now the destructor can do its work properly
	      }
	      free(H::h[ j ][ i ]->next);
	      H::h[ j ][ i ]->next = 0; // Zero next pointer
	      H::h[ j ][ i ]->snext = 0; // Zero head-of-list pointer as above
	    }
#endif
	delete H::h[ j ][ i ];
      }
      delete[] H::h[ j ];
  }
  delete[] H::r1;
  delete[] H::r2;
  delete[] H::h;
}


// Compute all hash functions for vector v
// #ifdef USE_U_FUNCTIONS use Combination of m \times h_i \in R^{(k/2) \times d}
// to make L \times g_j functions \in Z^k
void H::compute_hash_functions(vector<float>& v){ // v \in R^d
  float iw = 1. / H::w; // hash bucket width
  Uns32T aa, kk;
  if( v.size() != H::d ) 
    error("v.size != H::d","","compute_hash_functions"); // check input vector dimensionality
  double tmp = 0;
  float *pA, *pb;
  Uns32T *pg;
  int dd;
  vector<float>::iterator vi;
  vector<Uns32T>::iterator ui;

#ifdef USE_U_FUNCTIONS
  Uns32T bb;
  // Store m dot products to expand
  for( aa=0; aa < H::m ; aa++ ){
    ui = H::uu[aa].begin();
    for( kk = 0 ; kk < H::k/2 ; kk++ ){
      pb = *( H::b + aa ) + kk;
      pA = * ( * ( H::A + aa ) + kk );
      dd = H::d;
      tmp = 0.;
      vi = v.begin();
      while( dd-- )
	tmp += *pA++ * *vi++;  // project
      tmp += *pb;              // translate
      tmp *= iw;               // scale      
      *ui++ = (Uns32T) floor(tmp);   // floor
    }
  }
  // Binomial combinations of functions u_{a,b} \in Z^{(k/2) \times d}
  Uns32T j;
  for( aa=0, j=0 ; aa < H::m-1 ; aa++ )
    for( bb = aa + 1 ; bb < H::m ; bb++, j++ ){
      pg= *( H::g + j ); // L \times functions g_j(v) \in Z^k      
      // u_1 \in Z^{(k/2) \times d}
      ui = H::uu[aa].begin();
      kk=H::k/2;
      while( kk-- )
	*pg++ = *ui++;      // hash function g_j(v)=[x1 x2 ... x(k/2)]; xk \in Z
      // u_2 \in Z^{(k/2) \times d}
      ui = H::uu[bb].begin();
      kk=H::k/2;
      while( kk--)
	*pg++ = *ui++;      // hash function g_j(v)=[x(k/2+1) x(k/2+2) ... xk]; xk \in Z
    }
#else
  for( aa=0; aa < H::L ; aa++ ){
    ui = H::uu[aa].begin();
    for( kk = 0 ; kk < H::k ; kk++ ){
      pb = *( H::b + aa ) + kk;
      pA = * ( * ( H::A + aa ) + kk );
      dd = H::d;
      tmp = 0.;
      vi = v.begin();
      while( dd-- )
	tmp += *pA++ * *vi++;  // project
      tmp += *pb;              // translate
      tmp *= iw;               // scale      
      *ui++ = (Uns32T) (floor(tmp));   // floor
    }
  }
  // Compute hash functions
  for( aa=0 ; aa < H::L ; aa++ ){
    pg= *( H::g + aa ); // L \times functions g_j(v) \in Z^k      
    // u_1 \in Z^{k \times d}
    ui = H::uu[aa].begin();
    kk=H::k;    
    while( kk-- )
      *pg++ = *ui++;      // hash function g_j(v)=[x1 x2 ... xk]; xk \in Z
  }
#endif
}

// make hash value \in Z
void H::generate_hash_keys(Uns32T*g, Uns32T* r1, Uns32T* r2){
  H::t1 = computeProductModDefaultPrime( g, r1, H::k ) % H::N;
  H::t2 = computeProductModDefaultPrime( g, r2, H::k );
}

#define CR_ASSERT(b){if(!(b)){fprintf(stderr, "ASSERT failed on line %d, file %s.\n", __LINE__, __FILE__); exit(1);}}

// Computes (a.b) mod UH_PRIME_DEFAULT
inline Uns32T H::computeProductModDefaultPrime(Uns32T *a, Uns32T *b, IntT size){
  LongUns64T h = 0;

  for(IntT i = 0; i < size; i++){
    h = h + (LongUns64T)a[i] * (LongUns64T)b[i];
    h = (h & TWO_TO_32_MINUS_1) + 5 * (h >> 32);
    if (h >= UH_PRIME_DEFAULT) {
      h = h - UH_PRIME_DEFAULT;
    }
    CR_ASSERT(h < UH_PRIME_DEFAULT);
  }
  return h;
}

Uns32T H::bucket_insert_point(bucket **pp){
  collisionCount = 0;
#ifdef LSH_LIST_HEAD_COUNTERS
  if(!*pp){
    *pp = new bucket();
    (*pp)->t2 = 0; // Use t2 as a collision counter for the row
    (*pp)->next = new bucket();
  }
  // The list head holds point collision count
  if( (*pp)->t2 & LSH_CORE_ARRAY_BIT )
    bucket_insert_point(get_pointer_to_bucket_linked_list(*pp)); // recurse
  else{
    collisionCount = (*pp)->t2;
    if(collisionCount < H::C){ // Block if row is full
      (*pp)->t2++; // Increment collision counter (numPoints in row)
      pointCount++;
      collisionCount++;
      // Locate the bucket linked list 
      __bucket_insert_point( (*pp)->next ); 
    }
  }
#else // NOT USING LSH_LIST_HEAD_COUNTERS
  if(!*pp)
    *pp = new bucket();
  pointCount++;
  __bucket_insert_point(*pp); // No collision count storage
#endif
  return collisionCount;
}

// insert points into hashtable row collision chain
void H::__bucket_insert_point(bucket* p){
  if(p->t2 == IFLAG){ // initialization flag, is it in the domain of t2?
    p->t2 = H::t2;
    bucketCount++; // Record start of new point-locale collision chain
    p->snext = new sbucket();
    __sbucket_insert_point(p->snext);
    return;
  }

  if(p->t2 == H::t2){
    __sbucket_insert_point(p->snext);
    return;
  }

  if(p->next){
    // Construct list in t2 order
    if(H::t2 < p->next->t2){
      bucket* tmp = new bucket();
      tmp->next = p->next;
      p->next = tmp;
      __bucket_insert_point(tmp);
    }
    else
      __bucket_insert_point(p->next);
  }
  else {
    p->next = new bucket();
    __bucket_insert_point(p->next);
  }
}

// insert points into point-locale collision chain
void H::__sbucket_insert_point(sbucket* p){
  if(p->pointID==IFLAG){
    p->pointID = H::p;
    return;
  }
  
  // Search for pointID
  if(p->snext){    
    __sbucket_insert_point(p->snext);
  }
  else{
    // Make new point collision bucket at end of list
    p->snext = new sbucket();
    __sbucket_insert_point(p->snext);
  }
}

inline bucket** H::get_bucket(int j){
  return  *(h+j);
}

// Find the linked-list pointer at the end of the CORE_ARRAY
bucket** H::get_pointer_to_bucket_linked_list(bucket* rowPtr){
   Uns32T numBuckets = (Uns32T) rowPtr->snext; // Cast pointer to unsigned int
   Uns32T numPoints = rowPtr->t2 & 0x7FFFFFFF; // Value is stored in low 31 bits of t2 field
   bucket** listPtr = reinterpret_cast<bucket**> (reinterpret_cast<unsigned int*>(rowPtr->next)+numPoints+numBuckets+1);
   return listPtr;
}

// Interface to Locality Sensitive Hashing G
G::G(float ww, Uns32T kk,Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float rr):
  H(kk,mm,dd,NN,CC,ww,rr), // constructor to initialize data structures
  indexName(0),
  lshHeader(0),
  calling_instance(0),
  add_point_callback(0)
{

}

// Serialize from file LSH constructor
// Read parameters from database file
// Load the hash functions, close the database
// Optionally load the LSH tables into head-allocated lists in core 
G::G(char* filename, bool lshInCoreFlag):
  H(), // default base-class constructor call delays data-structure initialization 
  indexName(0),
  lshHeader(0),
  calling_instance(0),
  add_point_callback(0)
{
  int dbfid = unserialize_lsh_header(filename);
  indexName = new char[O2_INDEX_MAXSTR];
  strncpy(indexName, filename, O2_INDEX_MAXSTR); // COPY THE CONTENTS TO THE NEW POINTER
  H::initialize_lsh_functions(); // Base-class data-structure initialization
  unserialize_lsh_functions(dbfid); // populate with on-disk hashfunction values

  // Format1 only needs unserializing if specifically requested
  if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2) && lshInCoreFlag){
    unserialize_lsh_hashtables_format1(dbfid);
  }

  // Format2 always needs unserializing
  if(lshHeader->flags&O2_SERIAL_FILEFORMAT2 && lshInCoreFlag){
    FILE* dbFile = fdopen(dbfid, "rb");
    if(!dbFile)
      error("Cannot open LSH file for reading", filename);
    unserialize_lsh_hashtables_format2(dbFile);
  }
  serial_close(dbfid);
}

G::~G(){
  delete lshHeader;
}

// single point insertion; inserted values are hash value and pointID
Uns32T G::insert_point(vector<float>& v, Uns32T pp){
  Uns32T collisionCount = 0;
  H::p = pp;
  if(H::maxp && pp<=H::maxp)
    error("points must be indexed in strict ascending order", "LSH::insert_point(vector<float>&, Uns32T pointID)");
  H::maxp=pp; // Store highest pointID in database
  H::compute_hash_functions( v );
  for(Uns32T j = 0 ; j < H::L ; j++ ){ // insertion
    H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); 
    collisionCount += bucket_insert_point( *(h + j) + t1 );
  }
  return collisionCount;
}


// batch insert for a point set
// inserted values are vector hash value and pointID starting at basePointID
void G::insert_point_set(vector<vector<float> >& vv, Uns32T basePointID){
  for(Uns32T point=0; point<vv.size(); point++)
    insert_point(vv[point], basePointID+point);
}

// point retrieval routine
void G::retrieve_point(vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){
  calling_instance = caller;
  add_point_callback = add_point;
  H::compute_hash_functions( v );
  for(Uns32T j = 0 ; j < H::L ; j++ ){
    H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); 
    if( bucket* bPtr = *(get_bucket(j) + get_t1()) )
#ifdef LSH_LIST_HEAD_COUNTERS
      if(bPtr->t2&LSH_CORE_ARRAY_BIT)
	retrieve_from_core_hashtable_array((Uns32T*)(bPtr->next), qpos);
      else
	bucket_chain_point( bPtr->next, qpos);
#else
    bucket_chain_point( bPtr , qpos);
#endif
  }
}

void G::retrieve_point_set(vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller){
  for(Uns32T qpos = 0 ; qpos < vv.size() ; qpos++ )
    retrieve_point(vv[qpos], qpos, add_point, caller);
}

// export lsh tables to table structure on disk
// 
// LSH TABLE STRUCTURE
// ---header 64 bytes ---
// [magic #tables #rows #cols elementSize databaseSize version flags dim #funs 0 0 0 0 0 0]
//
// ---random projections L x k x d float ---
// A[0][0][0] A[0][0][1] ... A[0][0][d-1]
// A[0][1][0] A[0][1][1] ... A[1][1][d-1]
// ...
// A[0][K-1][0] A[0][1][1] ... A[0][k-1][d-1]
// ...
// ...
// A[L-1][0][0] A[M-1][0][1] ... A[L-1][0][d-1]
// A[L-1][1][0] A[M-1][1][1] ... A[L-1][1][d-1]
// ...
// A[L-1][k-1][0] A[M-1][1][1] ... A[L-1][k-1][d-1]
//
// ---bias L x k float ---
// b[0][0] b[0][1] ... b[0][k-1]
// b[1][0] b[1][1] ... b[1][k-1]
// ...
// b[L-1][0] b[L-1][1] ... b[L-1][k-1]
//
// ---random r1 L x k float ---
// r1[0][0] r1[0][1] ... r1[0][k-1]
// r1[1][0] r1[1][1] ... r1[1][k-1]
// ...
// r1[L-1][0] r1[L-1][1] ... r1[L-1][k-1]
//
// ---random r2 L x k float ---
// r2[0][0] r2[0][1] ... r2[0][k-1]
// r2[1][0] r2[1][1] ... r2[1][k-1]
// ...
// r2[L-1][0] r2[L-1][1] ... r2[L-1][k-1]
//
// ******* HASHTABLES FORMAT1 (optimized for LSH_ON_DISK retrieval) *******
// ---hash table 0: N x C x 8 ---
// [t2 pointID][t2 pointID]...[t2 pointID]
// [t2 pointID][t2 pointID]...[t2 pointID]
// ...
// [t2 pointID][t2 pointID]...[t2 pointID]
//
// ---hash table 1: N x C x 8 ---
// [t2 pointID][t2 pointID]...[t2 pointID]
// [t2 pointID][t2 pointID]...[t2 pointID]
// ...
// [t2 pointID][t2 pointID]...[t2 pointID]
// 
// ...
// 
// ---hash table L-1: N x C x 8 ---
// [t2 pointID][t2 pointID]...[t2 pointID]
// [t2 pointID][t2 pointID]...[t2 pointID]
// ...
// [t2 pointID][t2 pointID]...[t2 pointID]
//
// ******* HASHTABLES FORMAT2 (optimized for LSH_IN_CORE retrieval) *******
//
// State machine controlled by regular expression.
// legend:
//
// O2_SERIAL_TOKEN_T1 = 0xFFFFFFFCU
// O2_SERIAL_TOKEN_T2 = 0xFFFFFFFDU
// O2_SERIAL_TOKEN_ENDTABLE = 0xFFFFFFFEU
//
// T1 - T1 hash token 
// t1 - t1 hash key (t1 range 0..2^29-1) 
// T2 - T2 token
// t2 - t2 hash key (range 1..2^32-6)
// p  - point identifier (range 0..2^32-1)
// E  - end hash table token
// {...} required arguments
// [...] optional arguments
// *  - match zero or more occurences
// +  - match one or more occurences
// {...}^L - repeat argument L times
//
// FORMAT2 Regular expression:
// { [T1 t1 [T2 t2 p+]+ ]* E }^L
//

// Serial header constructors
SerialHeader::SerialHeader(){;}
SerialHeader::SerialHeader(float W, Uns32T L, Uns32T N, Uns32T C, Uns32T k, Uns32T d, float r, Uns32T p, Uns32T FMT, Uns32T pc):
  lshMagic(O2_SERIAL_MAGIC),
  binWidth(W),
  numTables(L),
  numRows(N),
  numCols(C),
  elementSize(O2_SERIAL_ELEMENT_SIZE),
  version(O2_SERIAL_VERSION),  
  size(0), // we are deprecating this value
  flags(FMT),
  dataDim(d),
  numFuns(k),
  radius(r),
  maxp(p),
  size_long((unsigned long long)L * align_up((unsigned long long)N * C * O2_SERIAL_ELEMENT_SIZE, get_page_logn())   // hash tables
	    + align_up(O2_SERIAL_HEADER_SIZE + // header + hash functions
		  (unsigned long long)L*k*( sizeof(float)*d+2*sizeof(Uns32T)+sizeof(float)),get_page_logn())),
  pointCount(pc){
  
  if(FMT==O2_SERIAL_FILEFORMAT2)
    size_long = (unsigned long long)align_up(O2_SERIAL_HEADER_SIZE 
	     + (unsigned long long)L*k*(sizeof(float)*d+2+sizeof(Uns32T)
	      +sizeof(float)) + (unsigned long long)pc*16UL,get_page_logn());
} // header

float* G::get_serial_hashfunction_base(char* db){
  if(db&&lshHeader)
    return (float*)(db+O2_SERIAL_HEADER_SIZE); 
  else return NULL;
}

SerialElementT* G::get_serial_hashtable_base(char* db){
  if(db&&lshHeader)
    return (SerialElementT*)(db+get_serial_hashtable_offset());
  else 
    return NULL;
}
  
Uns32T G::get_serial_hashtable_offset(){
  if(lshHeader)
    return align_up(O2_SERIAL_HEADER_SIZE + 
		    L*lshHeader->numFuns*( sizeof(float)*lshHeader->dataDim+2*sizeof(Uns32T)+sizeof(float)),get_page_logn());
  else
    return 0;
}

void G::serialize(char* filename, Uns32T serialFormat){
  int dbfid;
  char* db;
  int dbIsNew=0;

  // Check requested serialFormat
  if(!(serialFormat==O2_SERIAL_FILEFORMAT1 || serialFormat==O2_SERIAL_FILEFORMAT2))
    error("Unrecognized serial file format request: ", "serialize()");
 
  // Test to see if file exists
  if((dbfid = open (filename, O_RDONLY)) < 0)
    // If it doesn't, then create the file (CREATE)
    if(errno == ENOENT){
      // Create the file
      std::cout << "Creating new serialized LSH database:" << filename << "...";
      std::cout.flush();
      serial_create(filename, serialFormat);
      dbIsNew=1;
    }
    else
      // The file can't be opened
      error("Can't open the file", filename, "open");

  // Load the on-disk header into core
  dbfid = serial_open(filename, 1); // open for write
  db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer
  serial_get_header(db);           // read header
  serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap

  // Check compatibility of core and disk data structures
  if( !serial_can_merge(serialFormat) )
    error("Incompatible core and serial LSH, data structure dimensions mismatch.");
  
  // For new LSH databases write the hashfunctions
  if(dbIsNew)
    serialize_lsh_hashfunctions(dbfid);
  // Write the hashtables in the requested format
  if(serialFormat == O2_SERIAL_FILEFORMAT1)
    serialize_lsh_hashtables_format1(dbfid, !dbIsNew);
  else{
    FILE* dbFile = fdopen(dbfid, "r+b");
    if(!dbFile)
      error("Cannot open LSH file for writing",filename);
    serialize_lsh_hashtables_format2(dbFile, !dbIsNew);
    fflush(dbFile);
  }

  if(!dbIsNew) { 
    db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer
    //serial_get_header(db);           // read header
    cout << "maxp = " << H::maxp << endl;
    lshHeader->maxp=H::maxp;
    // Default to FILEFORMAT1
    if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2))
      lshHeader->flags|=O2_SERIAL_FILEFORMAT1;
    memcpy((char*)db, (char*)lshHeader, sizeof(SerialHeaderT));
    serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap
  }  
    serial_close(dbfid);
}

// Test to see if core structure and requested format is
// compatible with currently opened database
int G::serial_can_merge(Uns32T format){
  SerialHeaderT* that = lshHeader;
  if( (format==O2_SERIAL_FILEFORMAT2 && !that->flags&O2_SERIAL_FILEFORMAT2) 
      || (format!=O2_SERIAL_FILEFORMAT2 && that->flags&O2_SERIAL_FILEFORMAT2)
      || !( this->w == that->binWidth &&
	    this->L == that->numTables &&
	    this->N == that->numRows &&
	    this->k == that->numFuns &&
	    this->d == that->dataDim &&
	    sizeof(SerialElementT) == that->elementSize &&
	    this->radius == that->radius)){
    serial_print_header(format);
    return 0;
  }
  else
    return 1;
}

// Used as an error message for serial_can_merge()
void G::serial_print_header(Uns32T format){
  std::cout << "Fc:" << format << " Fs:" << lshHeader->flags << endl;
  std::cout << "Wc:" << w << " Ls:" << lshHeader->binWidth << endl;
  std::cout << "Lc:" << L << " Ls:" << lshHeader->numTables << endl;
  std::cout << "Nc:" << N << " Ns:" << lshHeader->numRows << endl;
  std::cout << "kc:" << k << " ks:" << lshHeader->numFuns << endl;
  std::cout << "dc:" << d << " ds:" << lshHeader->dataDim << endl;
  std::cout << "sc:" << sizeof(SerialElementT) << " ss:" << lshHeader->elementSize << endl;
  std::cout << "rc:" << this->radius << " rs:" << lshHeader->radius << endl;
}

int G::serialize_lsh_hashfunctions(int fid){
  float* pf;
  Uns32T *pu;
  Uns32T x,y,z;
  
  char* db = serial_mmap(fid, get_serial_hashtable_offset(), 1);// get database pointer  
  pf = get_serial_hashfunction_base(db);

  // HASH FUNCTIONS
  // Write the random projectors A[][][]
#ifdef USE_U_FUNCTIONS
  for( x = 0 ; x < H::m ; x++ )
    for( y = 0 ; y < H::k/2 ; y++ )
#else
      for( x = 0 ; x < H::L ; x++ )
	for( y = 0 ; y < H::k ; y++ )
#endif
	  for( z = 0 ; z < d ; z++ )
	    *pf++ = H::A[x][y][z];
  
  // Write the random biases b[][]
#ifdef USE_U_FUNCTIONS
  for( x = 0 ; x < H::m ; x++ )
    for( y = 0 ; y < H::k/2 ; y++ )
#else
      for( x = 0 ; x < H::L ; x++ )
	for( y = 0 ; y < H::k ; y++ )
#endif
	  *pf++ = H::b[x][y];
  
  pu = (Uns32T*)pf;
  
  // Write the Z projectors r1[][]
  for( x = 0 ; x < H::L ; x++)
    for( y = 0 ; y < H::k ; y++)
      *pu++ = H::r1[x][y];
  
  // Write the Z projectors r2[][]
  for( x = 0 ; x < H::L ; x++)
    for( y = 0; y < H::k ; y++)
      *pu++ = H::r2[x][y];  
  
  serial_munmap(db, get_serial_hashtable_offset());
  return 1;
}

int G::serialize_lsh_hashtables_format1(int fid, int merge){
  SerialElementT *pe, *pt;
  Uns32T x,y;

  if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT1) )
    error("Cannot merge core and serial LSH, data structure dimensions mismatch.");

  Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;  
  Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount;
  // Write the hash tables
  for( x = 0 ; x < H::L ; x++ ){
    std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT1...";
    std::cout.flush();
    // memory map a single hash table for sequential access
    // Align each hash table to page boundary
    char* dbtable = serial_mmap(fid, hashTableSize, 1, 
				align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn()));
#ifdef __CYGWIN__
    // No madvise in CYGWIN
#else
    if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0)
      error("could not advise hashtable memory","","madvise");
#endif
    maxColCount=0;
    minColCount=O2_SERIAL_MAX_COLS;
    meanColCount=0;
    colCountN=0;
    pt=(SerialElementT*)dbtable;
    for( y = 0 ;  y < H::N ; y++ ){
      // Move disk pointer to beginning of row
      pe=pt+y*lshHeader->numCols;
      
      colCount=0;
      if(bucket* bPtr = h[x][y])
	if(merge)
#ifdef LSH_LIST_HEAD_COUNTERS
	  serial_merge_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket
	else
	  serial_write_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket
#else
      serial_merge_hashtable_row_format1(pe, bPtr, colCount);
      else
	serial_write_hashtable_row_format1(pe, bPtr, colCount);
#endif
      if(colCount){
	if(colCount<minColCount)
	  minColCount=colCount;
	if(colCount>maxColCount)
	  maxColCount=colCount;
	meanColCount+=colCount;
	colCountN++;
      }
    }
    if(colCountN)
      std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 
		<< ", min = " << minColCount << ", max = " << maxColCount 
		<< endl;
    serial_munmap(dbtable, hashTableSize);
  }
  
  // We're done writing
  return 1;
}

void G::serial_merge_hashtable_row_format1(SerialElementT* pr, bucket* b, Uns32T& colCount){
  while(b && b->t2!=IFLAG){
    SerialElementT*pe=pr; // reset disk pointer to beginning of row
    serial_merge_element_format1(pe, b->snext, b->t2, colCount);  
    b=b->next;
  }
}

void G::serial_merge_element_format1(SerialElementT* pe, sbucket* sb, Uns32T t2, Uns32T& colCount){
  while(sb){        
    if(colCount==lshHeader->numCols){
      std::cout << "!point-chain full " << endl;
      return;
    }
    Uns32T c=0;
    // Merge collision chains 
    while(c<lshHeader->numCols){
      if( (pe+c)->hashValue==IFLAG){
	(pe+c)->hashValue=t2;
	(pe+c)->pointID=sb->pointID;
	colCount=c+1;
	if(c+1<lshHeader->numCols)
	  (pe+c+1)->hashValue=IFLAG;
	break;
      }
      c++;
    }
    sb=sb->snext;    
  }
  return;
}

void G::serial_write_hashtable_row_format1(SerialElementT*& pe, bucket* b, Uns32T& colCount){
  pe->hashValue=IFLAG;
  while(b && b->t2!=IFLAG){
    serial_write_element_format1(pe, b->snext, b->t2, colCount);
    b=b->next;
  }
}

void G::serial_write_element_format1(SerialElementT*& pe, sbucket* sb, Uns32T t2, Uns32T& colCount){
  while(sb){        
    if(colCount==lshHeader->numCols){
      std::cout << "!point-chain full "	<< endl;
      return;
    }
    pe->hashValue=t2;
    pe->pointID=sb->pointID;
    pe++;
    colCount++;
    sb=sb->snext;  
  }
  pe->hashValue=IFLAG;
  return;
}

int G::serialize_lsh_hashtables_format2(FILE* dbFile, int merge){
  Uns32T x,y;

  if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT2) )
    error("Cannot merge core and serial LSH, data structure dimensions mismatch.");

  // We must pereform FORMAT2 merges in core FORMAT1 (dynamic list structure)
  if(merge)
    unserialize_lsh_hashtables_format2(dbFile, merge);

  Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount, t1;
  if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
    fclose(dbFile);
    error("fSeek error in serialize_lsh_hashtables_format2");
  }

  // Write the hash tables
  for( x = 0 ; x < H::L ; x++ ){
    std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT2...";
    std::cout.flush();
    maxColCount=0;
    minColCount=O2_SERIAL_MAX_COLS;
    meanColCount=0;
    colCountN=0;
    for( y = 0 ;  y < H::N ; y++ ){
      colCount=0;
      if(bucket* bPtr = h[x][y]){
	// Check for empty row (even though row was allocated)
#ifdef LSH_LIST_HEAD_COUNTERS
	if(bPtr->next->t2==IFLAG){
	  fclose(dbFile);
	  error("b->next->t2==IFLAG","serialize_lsh_hashtables_format2()");
	}
#else
	if(bPtr->t2==IFLAG){
	  fclose(dbFile);
	  error("b->t2==IFLAG","serialize_lsh_hashtables_format2()");
	}
#endif
	t1 = O2_SERIAL_TOKEN_T1;
	WRITE_UNS32(&t1, "[T1]");
	t1 = y;
	WRITE_UNS32(&t1, "[t1]");
#ifdef LSH_CORE_ARRAY
	t1 = count_buckets_and_points_hashtable_row(bPtr);
	WRITE_UNS32(&t1,"[count]"); // write numElements
#endif
#ifdef LSH_LIST_HEAD_COUNTERS
	serial_write_hashtable_row_format2(dbFile, bPtr->next, colCount); // skip collision counter bucket
#else
	serial_write_hashtable_row_format2(dbFile, bPtr, colCount);
#endif
      }
      if(colCount){
	if(colCount<minColCount)
	  minColCount=colCount;
	if(colCount>maxColCount)
	  maxColCount=colCount;
	meanColCount+=colCount;
	colCountN++;
      }
    }
    // Write END of table marker
    t1 = O2_SERIAL_TOKEN_ENDTABLE;
    WRITE_UNS32(&t1,"[end]");
    if(colCountN)
      std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 
		<< ", min = " << minColCount << ", max = " << maxColCount 
		<< endl;
  }  
  // We're done writing
  return 1;
}

Uns32T G::count_buckets_and_points_hashtable_row(bucket* bPtr){
  Uns32T total_count = 0;
  bucket* p = 0;

  // count points
#ifdef LSH_LIST_HEAD_COUNTERS
  total_count = bPtr->t2; // points already counted    
  p = bPtr->next; 
#else
  total_count = count_points_hashtable_row(bPtr);
  p = bPtr; 
#endif

  // count buckets
  do{
    total_count++;    
  }while((p=p->next));
  
  return total_count;
}

Uns32T G::count_points_hashtable_row(bucket* bPtr){
  Uns32T point_count = 0;
  bucket* p = bPtr;
  sbucket* s = 0;
  while(p){
    s = p->snext;
    while(s){
      point_count++;
      s=s->snext;
    }
    p=p->next;
  }
  return point_count;
}

void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){
  while(b && b->t2!=IFLAG){
    if(!b->snext){
      fclose(dbFile);
      error("Empty collision chain in serial_write_hashtable_row_format2()");
    }
    t2 = O2_SERIAL_TOKEN_T2;
    if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
      fclose(dbFile);
      error("write error in serial_write_hashtable_row_format2()");
    }
    t2 = b->t2;
    if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
      fclose(dbFile);
      error("write error in serial_write_hashtable_row_format2()");
    }
    serial_write_element_format2(dbFile, b->snext, colCount);
    b=b->next;
  }
}

void G::serial_write_element_format2(FILE* dbFile, sbucket* sb, Uns32T& colCount){
  while(sb){
    if(fwrite(&sb->pointID, sizeof(Uns32T), 1, dbFile) != 1){
      fclose(dbFile);
      error("Write error in serial_write_element_format2()");
    }
    colCount++;
    sb=sb->snext;  
  }
}


int G::serial_create(char* filename, Uns32T FMT){
  return serial_create(filename, w, L, N, C, k, d, FMT);
}


int G::serial_create(char* filename, float binWidth, Uns32T numTables, Uns32T numRows, Uns32T numCols, 
		     Uns32T numFuns, Uns32T dim, Uns32T FMT){

  if(numTables > O2_SERIAL_MAX_TABLES || numRows > O2_SERIAL_MAX_ROWS 
     || numCols > O2_SERIAL_MAX_COLS || numFuns > O2_SERIAL_MAX_FUNS
     || dim>O2_SERIAL_MAX_DIM){
    error("LSH parameters out of bounds for serialization");
  }

  int dbfid;
  if ((dbfid = open (filename, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)) < 0)    
    error("Can't create serial file", filename, "open");
  get_lock(dbfid, 1);
  
  // Make header first to get size of serialized database
  lshHeader = new SerialHeaderT(binWidth, numTables, numRows, numCols, numFuns, dim, radius, maxp, FMT, pointCount);  

  cout << "file size: <=" << lshHeader->get_size()/1024UL << "KB" << endl;
  if(lshHeader->get_size()>O2_SERIAL_MAXFILESIZE)
    error("Maximum size of LSH file exceded: > 4000MB");

  // go to the location corresponding to the last byte
  if (lseek (dbfid, lshHeader->get_size() - 1, SEEK_SET) == -1)
    error("lseek error in db file", "", "lseek");
  
  // write a dummy byte at the last location
  if (write (dbfid, "", 1) != 1)
    error("write error", "", "write");
  
  char* db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);
  
  memcpy (db, lshHeader, O2_SERIAL_HEADER_SIZE);
  
  serial_munmap(db, O2_SERIAL_HEADER_SIZE);
  
  close(dbfid);

  std::cout << "done initializing tables." << endl;

  return 1;  
}

char* G::serial_mmap(int dbfid, Uns32T memSize, Uns32T forWrite, off_t offset){
  char* db;
  if(forWrite){
    if ((db = (char*) mmap(0, memSize, PROT_READ | PROT_WRITE,
			   MAP_SHARED, dbfid, offset)) == (caddr_t) -1)
      error("mmap error in request for writable serialized database", "", "mmap");  
  }
  else if ((db = (char*) mmap(0, memSize, PROT_READ, MAP_SHARED, dbfid, offset)) == (caddr_t) -1)
    error("mmap error in read-only serialized database", "", "mmap");  
      
  return db;
}

SerialHeaderT* G::serial_get_header(char* db){
  lshHeader = new SerialHeaderT();
  memcpy((char*)lshHeader, db, sizeof(SerialHeaderT));  

  if(lshHeader->lshMagic!=O2_SERIAL_MAGIC)
    error("Not an LSH database file");

  return lshHeader;
}

void G::serial_munmap(char* db, Uns32T N){
  munmap(db, N);
}

int G::serial_open(char* filename, int writeFlag){
  int dbfid;
  if(writeFlag){
    if ((dbfid = open (filename, O_RDWR)) < 0)    
      error("Can't open serial file for read/write", filename, "open");
    get_lock(dbfid, writeFlag);
  }
  else{
    if ((dbfid = open (filename, O_RDONLY)) < 0)
      error("Can't open serial file for read", filename, "open");
    get_lock(dbfid, 0);
  }

  return dbfid;  
}

void G::serial_close(int dbfid){

  release_lock(dbfid);
  close(dbfid);
}

int G::unserialize_lsh_header(char* filename){

  int dbfid;
  char* db;
  // Test to see if file exists
  if((dbfid = open (filename, O_RDONLY)) < 0)
    error("Can't open the file", filename, "open");
  close(dbfid);
  dbfid = serial_open(filename, 0); // open for read
  db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
  serial_get_header(db);           // read header
  serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap

  // Unserialize header parameters
  H::L = lshHeader->numTables;
  H::m = (Uns32T)( (1.0 + sqrt(1 + 8.0*(int)H::L)) / 2.0);
  H::N = lshHeader->numRows;
  H::C = lshHeader->numCols;
  H::k = lshHeader->numFuns;
  H::d = lshHeader->dataDim;
  H::w = lshHeader->binWidth;
  H::radius = lshHeader->radius;
  H::maxp = lshHeader->maxp;
  H::pointCount = lshHeader->pointCount;

  return dbfid;
}

// unserialize the LSH parameters
// we leave the LSH tree on disk as a flat file
// it is this flat file that we search by memory mapping
void G::unserialize_lsh_functions(int dbfid){
  Uns32T j, kk;
  float* pf;
  Uns32T* pu;

  // Load the hash functions into core
  char* db = serial_mmap(dbfid, get_serial_hashtable_offset(), 0);// get database pointer again  
  
  pf = get_serial_hashfunction_base(db);
  
#ifdef USE_U_FUNCTIONS
  for( j = 0 ; j < H::m ; j++ ){ // L functions gj(v)
    for( kk = 0 ; kk < H::k/2 ; kk++ ){ // Normally distributed hash functions
#else
      for( j = 0 ; j < H::L ; j++ ){ // L functions gj(v)
	for( kk = 0 ; kk < H::k ; kk++ ){ // Normally distributed hash functions
#endif
	  for(Uns32T i = 0 ; i < H::d ; i++ )
	    H::A[j][kk][i] = *pf++; // Normally distributed random vectors
	}
      }
#ifdef USE_U_FUNCTIONS
      for( j = 0 ; j < H::m ; j++ ) // biases b
	for( kk = 0 ; kk < H::k/2 ; kk++ )
#else
	  for( j = 0 ; j < H::L ; j++ ) // biases b
	    for( kk = 0 ; kk < H::k ; kk++ )
#endif
	      H::b[j][kk] = *pf++;
      
      pu = (Uns32T*)pf;
      for( j = 0 ; j < H::L ; j++ ) // Z projectors r1
	for( kk = 0 ; kk < H::k ; kk++ )
	  H::r1[j][kk] = *pu++;
      
      for( j = 0 ; j < H::L ; j++ ) // Z projectors r2
	for( kk = 0 ; kk < H::k ; kk++ )
	  H::r2[j][kk] = *pu++;

      serial_munmap(db, get_serial_hashtable_offset());
}

void G::unserialize_lsh_hashtables_format1(int fid){
  SerialElementT *pe, *pt;
  Uns32T x,y;
  Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
  // Read the hash tables into core
  for( x = 0 ; x < H::L ; x++ ){
    // memory map a single hash table
    // Align each hash table to page boundary
    char* dbtable = serial_mmap(fid, hashTableSize, 0, 
				align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn()));
#ifdef __CYGWIN__
    // No madvise in CYGWIN
#else
    if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0)
      error("could not advise hashtable memory","","madvise");    
#endif
    pt=(SerialElementT*)dbtable;
    for( y = 0 ; y < H::N ; y++ ){
      // Move disk pointer to beginning of row
      pe=pt+y*lshHeader->numCols;
      unserialize_hashtable_row_format1(pe, h[x]+y);
#ifdef LSH_DUMP_CORE_TABLES
      printf("S[%d,%d]", x, y);
      serial_bucket_dump(pe);
      printf("C[%d,%d]", x, y);
      dump_hashtable_row(h[x][y]);
#endif      
    }
    serial_munmap(dbtable, hashTableSize);
  }  
}

void G::unserialize_hashtable_row_format1(SerialElementT* pe, bucket** b){
  Uns32T colCount = 0;  
  while(colCount!=lshHeader->numCols && pe->hashValue !=IFLAG){
    H::p = pe->pointID; // current point ID      
    t2 = pe->hashValue;
    bucket_insert_point(b);
    pe++;
    colCount++;
  }
}
 
void G::unserialize_lsh_hashtables_format2(FILE* dbFile, bool forMerge){
  Uns32T x=0,y=0;

  // Seek to hashtable base offset
  if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
    fclose(dbFile);
    error("fSeek error in unserialize_lsh_hashtables_format2");
  }

  // Read the hash tables into core (structure is given in header) 
  while( x < H::L){
    if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){
      fclose(dbFile);
      error("Read error","unserialize_lsh_hashtables_format2()");
    }
    if(H::t1==O2_SERIAL_TOKEN_ENDTABLE)
      x++; // End of table
    else
      while(y < H::N){
	// Read a row and move file pointer to beginning of next row or _bittable
	if(!(H::t1==O2_SERIAL_TOKEN_T1)){
	  fclose(dbFile);
	  error("State matchine error T1","unserialize_lsh_hashtables_format2()");
	}
	if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){
	  fclose(dbFile);
	  error("Read error: t1","unserialize_lsh_hashtables_format2()");
	}
	y = H::t1;
	if(y>=H::N){
	  fclose(dbFile);
	  error("Unserialized hashtable row pointer out of range","unserialize_lsh_hashtables_format2()");
	}
	Uns32T token = 0;
#ifdef LSH_CORE_ARRAY
	Uns32T numElements;
	if(fread(&numElements, sizeof(Uns32T), 1, dbFile) != 1){
	  fclose(dbFile);
	  error("Read error: numElements","unserialize_lsh_hashtables_format2()");
	}
	
	// BACKWARD COMPATIBILITY: check to see if T2 or END token was read
	if(numElements==O2_SERIAL_TOKEN_T2 || numElements==O2_SERIAL_TOKEN_ENDTABLE ){
	  forMerge=true; // Force use of dynamic linked list core format
	  token = numElements;
	}

	if(forMerge)
	  // Use linked list CORE format
	  token = unserialize_hashtable_row_format2(dbFile, h[x]+y, token);
	else
	  // Use ARRAY CORE format with numElements counter
	  token = unserialize_hashtable_row_to_array(dbFile, h[x]+y, numElements);
#else
	token = unserialize_hashtable_row_format2(dbFile, h[x]+y);	
#endif
	
#ifdef LSH_DUMP_CORE_TABLES
	printf("C[%d,%d]", x, y);
	dump_hashtable_row(h[x][y]);
#endif
	// Check that token is valid
	if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){
	  fclose(dbFile);
	  error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()");
	}
	// Check for end of table flag
	if(token==O2_SERIAL_TOKEN_ENDTABLE){
	  x++;
	  break;
	}
	// Check for new row flag
	if(token==O2_SERIAL_TOKEN_T1)
	  H::t1 = token;
      }
  }
}
 
Uns32T G::unserialize_hashtable_row_format2(FILE* dbFile, bucket** b, Uns32T token){
  bool pointFound = false;
  
  if(token)
    H::t2 = token;
  else if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
      fclose(dbFile);
      error("Read error T2 token","unserialize_hashtable_row_format2");
    }

  if( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2)){
    fclose(dbFile);
    error("State machine error: expected E or T2");
  }

  while(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1)){
    pointFound=false;
    // Check for T2 token
    if(H::t2!=O2_SERIAL_TOKEN_T2){
      fclose(dbFile);
      error("State machine error T2 token", "unserialize_hashtable_row_format2()");
    }
    // Read t2 value
    if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
      fclose(dbFile);
      error("Read error t2","unserialize_hashtable_row_format2");
    }
    if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){
      fclose(dbFile);
      error("Read error H::p","unserialize_hashtable_row_format2");
    }
    while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){
      pointFound=true;
      bucket_insert_point(b);
      if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){
	fclose(dbFile);
	error("Read error H::p","unserialize_hashtable_row_format2");
      }
    }
    if(!pointFound)
      error("State machine error: point", "unserialize_hashtable_row_format2()");    
    H::t2 = H::p; // Copy last found token to t2
  }  
  return H::t2; // holds current token
}

// Unserialize format2 hashtable row to a CORE_ARRAY pointed to
// by the hashtable row pointer: rowPtr
//
// numElements is the total number of t2 buckets plus points
// memory required is numElements+1+sizeof(hashtable ptr)
//
// numElements = numPoints + numBuckets
//
// During inserts (merges) new hashtable entries are appened at rowPtr+numPoints+numBuckets+1
//
// ASSUME: that LSH_LIST_HEAD_COUNTERS is set so that the first hashtable bucket is used to count
// point and bucket entries
//
// We store the values of numPoints and numBuckets in separate fields of the first bucket
// rowPtr->t2 // numPoints
// (Uns32T)(rowPtr->snext) // numBuckets
//
// We cast the rowPtr->next pointer to (Uns32*) malloc(numElements*sizeof(Uns32T) + sizeof(bucket*))
// To get to the fist bucket, we use 
//

#define READ_UNS32T(VAL,TOKENSTR)   if(fread(VAL, sizeof(Uns32T), 1, dbFile) != 1){\
    fclose(dbFile);error("Read error unserialize_hashtable_format2",TOKENSTR);}

#define TEST_TOKEN(TEST, TESTSTR)   if(TEST){fclose(dbFile);error("State machine error: ", TESTSTR);}

#define SKIP_BITS_LEFT_SHIFT_MSB (30)

#define SKIP_BITS_RIGHT_SHIFT_MSB (28)
#define SKIP_BITS_RIGHT_SHIFT_LSB (30)

#define MAX_POINTS_IN_BUCKET_CORE_ARRAY (16)
#define LSH_CORE_ARRAY_END_ROW_TOKEN (0xFFFFFFFD)

// Encode the skip bits. Zero if only one point, MAX 8 (plus first == 9)
#define ENCODE_POINT_SKIP_BITS  TEST_TOKEN(!numPointsThisBucket, "no points found");\
    if(numPointsThisBucket==1){\
             secondPtr=ap++;\
             *secondPtr=0;\
             numPoints++;\
    }\
    if(numPointsThisBucket>1){\
      *firstPtr |=  ( (numPointsThisBucket-1) & 0x3 ) << SKIP_BITS_LEFT_SHIFT_MSB;\
      *secondPtr |= ( ( (numPointsThisBucket-1) & 0xC) >> 2 ) << SKIP_BITS_LEFT_SHIFT_MSB;}

Uns32T G::unserialize_hashtable_row_to_array(FILE* dbFile, bucket** rowPP, Uns32T numElements){
  Uns32T numPointsThisBucket = 0;
  Uns32T numBuckets = 0;
  Uns32T numPoints = 0;
  Uns32T* firstPtr = 0;
  Uns32T* secondPtr = 0;

  // Initialize new row
  if(!*rowPP){
    *rowPP = new bucket();
#ifdef LSH_LIST_HEAD_COUNTERS
    (*rowPP)->t2 = 0; // Use t2 as a collision counter for the row
    (*rowPP)->next = 0;
#endif
  }
  bucket* rowPtr = *rowPP;

  READ_UNS32T(&(H::t2),"t2");
  TEST_TOKEN(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2), "expected E or T2");
  // Because we encode points in 16-point blocks, we sometimes allocate repeated t2 elements
  // So over-allocate by a factor of two and realloc later to actual numElements
  CR_ASSERT(rowPtr->next = (bucket*) malloc((2*numElements+1)*sizeof(Uns32T)+sizeof(bucket**)));
  Uns32T* ap = reinterpret_cast<Uns32T*>(rowPtr->next); // Cast pointer to Uns32T* for array format
  while( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1) ){
    numPointsThisBucket = 0;// reset bucket point counter
    secondPtr = 0; // reset second-point pointer
    TEST_TOKEN(H::t2!=O2_SERIAL_TOKEN_T2, "expected T2");
    READ_UNS32T(&(H::t2), "Read error t2");
    *ap++ = H::t2; // Insert t2 value into array
    numBuckets++;
    READ_UNS32T(&(H::p), "Read error H::p"); 
    while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){      
      if(numPointsThisBucket==MAX_POINTS_IN_BUCKET_CORE_ARRAY){
	ENCODE_POINT_SKIP_BITS;
	*ap++ = H::t2; // Extra element
	numBuckets++;  // record this as a new bucket
	numPointsThisBucket=0; // reset bucket point counter
	secondPtr = 0; // reset second-point pointer
      }
      if( ++numPointsThisBucket == 1 )
	firstPtr = ap;  // store pointer to first point to insert skip bits later on
      else if( numPointsThisBucket == 2 )
	secondPtr = ap;  // store pointer to first point to insert skip bits later on
      numPoints++;
      *ap++ = H::p;
      READ_UNS32T(&(H::p), "Read error H::p"); 
    }
    ENCODE_POINT_SKIP_BITS;    
    H::t2 = H::p; // Copy last found token to t2
  }    
  // Reallocate the row to its actual size
  CR_ASSERT(rowPtr->next = (bucket*) realloc(rowPtr->next, (numBuckets+numPoints+1)*sizeof(Uns32T)+sizeof(bucket**)));
  // Record the sizes at the head of the row
  rowPtr->snext = (sbucket*) numBuckets;
  rowPtr->t2 = numPoints;
  // Place end of row marker
  *ap++ = LSH_CORE_ARRAY_END_ROW_TOKEN;
  // Set the LSH_CORE_ARRAY_BIT to identify data structure for insertion and retrieval
  rowPtr->t2 |= LSH_CORE_ARRAY_BIT;
  // Allocate a new dynamic list head at the end of the array
  bucket** listPtr = reinterpret_cast<bucket**> (ap);
  *listPtr = 0;
  // Return current token
  return H::t2; // return H::t2 which holds current token [E or T1]
}



 // *p is a pointer to the beginning of a hashtable row array
 // The array consists of t2 hash keys and one or more point identifiers p for each hash key
 // Retrieval is performed by generating a hash key query_t2 for query point q
 // We identify the row that t2 is stored in using a secondary hash t1, this row is the entry
 // point for retrieve_from_core_hashtable_array
#define SKIP_BITS (0xC0000000)
void G::retrieve_from_core_hashtable_array(Uns32T* p, Uns32T qpos){
  Uns32T skip;
  Uns32T t2;
  Uns32T p1;
  Uns32T p2;

  CR_ASSERT(p);

  do{
    t2 = *p++;
    if( t2 > H::t2 )
      return;
    p1 = *p++;
    p2 = *p++;
    skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB);
    if( t2 == H::t2 ){
      add_point_callback(calling_instance, p1 ^ (p1 & SKIP_BITS), qpos, radius);
      if(skip--){
	add_point_callback(calling_instance, p2 ^ (p2 & SKIP_BITS), qpos, radius);
	while(skip-- )
	  add_point_callback(calling_instance, *p++, qpos, radius);
      }
    }
    else 
      if(*p != LSH_CORE_ARRAY_END_ROW_TOKEN)
	p = p + skip;
  }while( *p != LSH_CORE_ARRAY_END_ROW_TOKEN );
}
 
void G::dump_hashtable_row(bucket* p){
  while(p && p->t2!=IFLAG){
    sbucket* sbp = p->snext;
    while(sbp){
      printf("(%0X,%u)", p->t2, sbp->pointID);
      fflush(stdout);
      sbp=sbp->snext;
    }      
    p=p->next;
  }
  printf("\n");
}


// G::serial_retrieve_point( ... )
// retrieves (pointID) from a serialized LSH database
//
// inputs:
// filename - file name of serialized LSH database
//       vv - query point set
//
// outputs:
// inserts retrieved points into add_point() callback method
void G::serial_retrieve_point_set(char* filename, vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller)
{
  int dbfid = serial_open(filename, 0); // open for read
  char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
  serial_get_header(dbheader);           // read header
  serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap

  if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
    serial_close(dbfid);
    error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only");
  }

  // size of each hash table
  Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
  calling_instance = caller; // class instance variable used in ...bucket_chain_point()
  add_point_callback = add_point;

  for(Uns32T j=0; j<L; j++){
    // memory map a single hash table for random access
    char* db = serial_mmap(dbfid, hashTableSize, 0, 
			   align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
#ifdef __CYGWIN__
    // No madvise in CYGWIN
#else
    if(madvise(db, hashTableSize, MADV_RANDOM)<0)
      error("could not advise local hashtable memory","","madvise");
#endif
    SerialElementT* pe = (SerialElementT*)db ;
    for(Uns32T qpos=0; qpos<vv.size(); qpos++){
      H::compute_hash_functions(vv[qpos]);
      H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); 
      serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row
    }
    serial_munmap(db, hashTableSize); // drop hashtable mmap
  }  
  serial_close(dbfid);    
}

void G::serial_retrieve_point(char* filename, vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){
  int dbfid = serial_open(filename, 0); // open for read
  char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
  serial_get_header(dbheader);           // read header
  serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap

  if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
    serial_close(dbfid);
    error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only");
  }

  // size of each hash table
  Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
  calling_instance = caller;
  add_point_callback = add_point;
  H::compute_hash_functions(v);
  for(Uns32T j=0; j<L; j++){
    // memory map a single hash table for random access
    char* db = serial_mmap(dbfid, hashTableSize, 0, 
			   align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
#ifdef __CYGWIN__
    // No madvise in CYGWIN
#else
    if(madvise(db, hashTableSize, MADV_RANDOM)<0)
      error("could not advise local hashtable memory","","madvise");
#endif
    SerialElementT* pe = (SerialElementT*)db ;
    H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); 
    serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row
    serial_munmap(db, hashTableSize); // drop hashtable mmap
  }  
  serial_close(dbfid);    
}

void G::serial_dump_tables(char* filename){
  int dbfid = serial_open(filename, 0); // open for read
  char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
  serial_get_header(dbheader);           // read header
  serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
  Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
  for(Uns32T j=0; j<L; j++){
    // memory map a single hash table for random access
    char* db = serial_mmap(dbfid, hashTableSize, 0, 
			   align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
#ifdef __CYGWIN__
    // No madvise in CYGWIN
#else
    if(madvise(db, hashTableSize, MADV_SEQUENTIAL)<0)
      error("could not advise local hashtable memory","","madvise");
#endif
    SerialElementT* pe = (SerialElementT*)db ;
    printf("*********** TABLE %d ***************\n", j);
    fflush(stdout);
    int count=0;
    do{
      printf("[%d,%d]", j, count++);
      fflush(stdout);
      serial_bucket_dump(pe);
      pe+=lshHeader->numCols;
    }while(pe<(SerialElementT*)db+lshHeader->numRows*lshHeader->numCols);
  }

}

void G::serial_bucket_dump(SerialElementT* pe){
  SerialElementT* pend = pe+lshHeader->numCols;
  while( !(pe->hashValue==IFLAG || pe==pend ) ){
    printf("(%0X,%u)",pe->hashValue,pe->pointID);
    pe++;
  }
  printf("\n");
  fflush(stdout);
}

void G::serial_bucket_chain_point(SerialElementT* pe, Uns32T qpos){
  SerialElementT* pend = pe+lshHeader->numCols;
  while( !(pe->hashValue==IFLAG || pe==pend ) ){
    if(pe->hashValue==t2){ // new match
      add_point_callback(calling_instance, pe->pointID, qpos, radius);
    }
    pe++;
  }
}

void G::bucket_chain_point(bucket* p, Uns32T qpos){
  if(!p || p->t2==IFLAG)
    return;
  if(p->t2==t2){ // match
    sbucket_chain_point(p->snext, qpos); // add to reporter
  }  
  if(p->next){
    bucket_chain_point(p->next, qpos); // recurse
  }
}

void G::sbucket_chain_point(sbucket* p, Uns32T qpos){  
  add_point_callback(calling_instance, p->pointID, qpos, radius);
  if(p->snext){
    sbucket_chain_point(p->snext, qpos);
  }
}

void G::get_lock(int fd, bool exclusive) {
  struct flock lock;
  int status;  
  lock.l_type = exclusive ? F_WRLCK : F_RDLCK;
  lock.l_whence = SEEK_SET;
  lock.l_start = 0;
  lock.l_len = 0; /* "the whole file" */  
 retry:
  do {
    status = fcntl(fd, F_SETLKW, &lock);
  } while (status != 0 && errno == EINTR);  
  if (status) {
    if (errno == EAGAIN) {
      sleep(1);
      goto retry;
    } else {
      error("fcntl lock error", "", "fcntl");
    }
  }
}
 
void G::release_lock(int fd) {
   struct flock lock;
   int status;
   
   lock.l_type = F_UNLCK;
   lock.l_whence = SEEK_SET;
   lock.l_start = 0;
   lock.l_len = 0;
   
   status = fcntl(fd, F_SETLKW, &lock);

  if (status)
    error("fcntl unlock error", "", "fcntl");
}