diff lshlib.cpp @ 340:a6edbe97fddf

Added LSH_CORE_ARRAY structure for hashtables instead of linked lists. Maintained Backwards Compatibiliity with indexes build for linked list format. Added tests for indexing and merging. Tested backwards compatibility OK.\n\n The purpose of the LSH_CORE_ARRAY data structure is greater space efficiency and L1/2 cache usage. Essential for multiple indexes with multiple hashtables in RAM
author mas01mc
date Wed, 10 Sep 2008 18:55:16 +0000
parents fe4d5b763086
children fdeb8db97678
line wrap: on
line diff
--- a/lshlib.cpp	Fri Sep 05 20:01:58 2008 +0000
+++ b/lshlib.cpp	Wed Sep 10 18:55:16 2008 +0000
@@ -1,8 +1,19 @@
 #include "lshlib.h"
 
-//#define __LSH_DUMP_CORE_TABLES__
+//#define LSH_DUMP_CORE_TABLES
 //#define USE_U_FUNCTIONS
-#define LSH_BLOCK_FULL_ROWS
+
+// Use backward-compatible CORE ARRAY lsh index
+#define LSH_CORE_ARRAY  // Set to use arrays for hashtables rather than linked-lists
+#define LSH_LIST_HEAD_COUNTERS // Enable counters in hashtable list heads
+
+// Critical path logic
+#if defined LSH_CORE_ARRAY && !defined LSH_LIST_HEAD_COUNTERS
+#define LSH_LIST_HEAD_COUNTERS
+#endif
+
+#define LSH_CORE_ARRAY_BIT (0x80000000) //  LSH_CORE_ARRAY test bit for list head
+
 
 void err(char*s){cout << s << endl;exit(2);}
 
@@ -193,6 +204,7 @@
 // Destruct hash tables
 H::~H(){
   Uns32T i,j,kk;
+  bucket** pp;
 #ifdef USE_U_FUNCTIONS
   for( j = 0 ; j < H::m ; j++ ){
     for( kk = 0 ; kk < H::k/2 ; kk++ )
@@ -221,8 +233,23 @@
   for( j=0 ; j < H::L ; j++ ){
       delete[] H::r1[ j ];
       delete[] H::r2[ j ];
-      for(i = 0; i< H::N ; i++)
+      for(i = 0; i< H::N ; i++){
+	pp = 0;
+#ifdef LSH_CORE_ARRAY
+	  if(H::h[ j ][ i ])
+	    if(H::h[ j ][ i ]->t2 & LSH_CORE_ARRAY_BIT){	      
+	      pp = get_pointer_to_bucket_linked_list(H::h[ j ][ i ]);
+	      if(*pp){
+		(*pp)->snext=0; // Head of list uses snext as a non-pointer value
+		delete *pp;   // Now the destructor can do its work properly
+	      }
+	      free(H::h[ j ][ i ]->next);
+	      H::h[ j ][ i ]->next = 0; // Zero next pointer
+	      H::h[ j ][ i ]->snext = 0; // Zero head-of-list pointer as above
+	    }
+#endif
 	delete H::h[ j ][ i ];
+      }
       delete[] H::h[ j ];
   }
   delete[] H::r1;
@@ -333,28 +360,35 @@
 
 Uns32T H::bucket_insert_point(bucket **pp){
   collisionCount = 0;
+#ifdef LSH_LIST_HEAD_COUNTERS
   if(!*pp){
     *pp = new bucket();
-#ifdef LSH_BLOCK_FULL_ROWS
     (*pp)->t2 = 0; // Use t2 as a collision counter for the row
     (*pp)->next = new bucket();
-#endif
   }
-#ifdef LSH_BLOCK_FULL_ROWS
-  collisionCount = (*pp)->t2;
-  if(collisionCount < H::C){ // Block if row is full
-    (*pp)->t2++; // Increment collision counter
-    pointCount++;
-    collisionCount++;
-    __bucket_insert_point((*pp)->next); // First bucket holds collision count
+  // The list head holds point collision count
+  if( (*pp)->t2 & LSH_CORE_ARRAY_BIT )
+    bucket_insert_point(get_pointer_to_bucket_linked_list(*pp)); // recurse
+  else{
+    collisionCount = (*pp)->t2;
+    if(collisionCount < H::C){ // Block if row is full
+      (*pp)->t2++; // Increment collision counter (numPoints in row)
+      pointCount++;
+      collisionCount++;
+      // Locate the bucket linked list 
+      __bucket_insert_point( (*pp)->next ); 
+    }
   }
-#else
+#else // NOT USING LSH_LIST_HEAD_COUNTERS
+  if(!*pp)
+    *pp = new bucket();
   pointCount++;
   __bucket_insert_point(*pp); // No collision count storage
 #endif
   return collisionCount;
 }
 
+// insert points into hashtable row collision chain
 void H::__bucket_insert_point(bucket* p){
   if(p->t2 == IFLAG){ // initialization flag, is it in the domain of t2?
     p->t2 = H::t2;
@@ -370,30 +404,37 @@
   }
 
   if(p->next){
-    __bucket_insert_point(p->next);
+    // Construct list in t2 order
+    if(H::t2 < p->next->t2){
+      bucket* tmp = new bucket();
+      tmp->next = p->next;
+      p->next = tmp;
+      __bucket_insert_point(tmp);
+    }
+    else
+      __bucket_insert_point(p->next);
   }
-
-  else{
+  else {
     p->next = new bucket();
     __bucket_insert_point(p->next);
   }
-
 }
 
+// insert points into point-locale collision chain
 void H::__sbucket_insert_point(sbucket* p){
   if(p->pointID==IFLAG){
     p->pointID = H::p;
     return;
   }
-
+  
   // Search for pointID
-  if(p->snext){
+  if(p->snext){    
     __sbucket_insert_point(p->snext);
   }
   else{
-  // Make new point collision bucket at end of list
-  p->snext = new sbucket();
-  __sbucket_insert_point(p->snext);
+    // Make new point collision bucket at end of list
+    p->snext = new sbucket();
+    __sbucket_insert_point(p->snext);
   }
 }
 
@@ -401,6 +442,14 @@
   return  *(h+j);
 }
 
+// Find the linked-list pointer at the end of the CORE_ARRAY
+bucket** H::get_pointer_to_bucket_linked_list(bucket* rowPtr){
+   Uns32T numBuckets = (Uns32T) rowPtr->snext; // Cast pointer to unsigned int
+   Uns32T numPoints = rowPtr->t2 & 0x7FFFFFFF; // Value is stored in low 31 bits of t2 field
+   bucket** listPtr = reinterpret_cast<bucket**> (reinterpret_cast<unsigned int*>(rowPtr->next)+numPoints+numBuckets+1);
+   return listPtr;
+}
+
 // Interface to Locality Sensitive Hashing G
 G::G(float ww, Uns32T kk,Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float rr):
   H(kk,mm,dd,NN,CC,ww,rr), // constructor to initialize data structures
@@ -479,8 +528,11 @@
   for(Uns32T j = 0 ; j < H::L ; j++ ){
     H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); 
     if( bucket* bPtr = *(get_bucket(j) + get_t1()) )
-#ifdef LSH_BLOCK_FULL_ROWS
-      bucket_chain_point( bPtr->next, qpos);
+#ifdef LSH_LIST_HEAD_COUNTERS
+      if(bPtr->t2&LSH_CORE_ARRAY_BIT)
+	retrieve_from_core_hashtable_array((Uns32T*)(bPtr->next), qpos);
+      else
+	bucket_chain_point( bPtr->next, qpos);
 #else
     bucket_chain_point( bPtr , qpos);
 #endif
@@ -796,7 +848,7 @@
       colCount=0;
       if(bucket* bPtr = h[x][y])
 	if(merge)
-#ifdef LSH_BLOCK_FULL_ROWS
+#ifdef LSH_LIST_HEAD_COUNTERS
 	  serial_merge_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket
 	else
 	  serial_write_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket
@@ -887,9 +939,9 @@
   if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT2) )
     error("Cannot merge core and serial LSH, data structure dimensions mismatch.");
 
-  // We must pereform FORMAT2 merges in core
+  // We must pereform FORMAT2 merges in core FORMAT1 (dynamic list structure)
   if(merge)
-    unserialize_lsh_hashtables_format2(dbFile);
+    unserialize_lsh_hashtables_format2(dbFile, merge);
 
   Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount, t1;
   if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
@@ -909,7 +961,7 @@
       colCount=0;
       if(bucket* bPtr = h[x][y]){
 	// Check for empty row (even though row was allocated)
-#ifdef LSH_BLOCK_FULL_ROWS
+#ifdef LSH_LIST_HEAD_COUNTERS
 	if(bPtr->next->t2==IFLAG){
 	  fclose(dbFile);
 	  error("b->next->t2==IFLAG","serialize_lsh_hashtables_format2()");
@@ -921,19 +973,17 @@
 	}
 #endif
 	t1 = O2_SERIAL_TOKEN_T1;
-	if( fwrite(&t1, sizeof(Uns32T), 1, dbFile) != 1 ){
-	  fclose(dbFile);
-	  error("write error in serial_write_hashtable_format2() [T1]");
-	}
+	WRITE_UNS32(&t1, "[T1]");
 	t1 = y;
-	if( fwrite(&t1, sizeof(Uns32T), 1, dbFile) != 1 ){
-	  fclose(dbFile);
-	  error("write error in serial_write_hashtable_format2() [t1]");
-	}
-#ifdef LSH_BLOCK_FULL_ROWS
+	WRITE_UNS32(&t1, "[t1]");
+#ifdef LSH_CORE_ARRAY
+	t1 = count_buckets_and_points_hashtable_row(bPtr);
+	WRITE_UNS32(&t1,"[count]"); // write numElements
+#endif
+#ifdef LSH_LIST_HEAD_COUNTERS
 	serial_write_hashtable_row_format2(dbFile, bPtr->next, colCount); // skip collision counter bucket
 #else
-      serial_write_hashtable_row_format2(dbFile, bPtr, colCount);
+	serial_write_hashtable_row_format2(dbFile, bPtr, colCount);
 #endif
       }
       if(colCount){
@@ -947,21 +997,52 @@
     }
     // Write END of table marker
     t1 = O2_SERIAL_TOKEN_ENDTABLE;
-    if( fwrite(&t1, sizeof(Uns32T), 1, dbFile ) != 1 ){
-      fclose(dbFile);
-      error("write error in serial_write_hashtable_format2() [end]");
-    }
-
+    WRITE_UNS32(&t1,"[end]");
     if(colCountN)
       std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 
 		<< ", min = " << minColCount << ", max = " << maxColCount 
 		<< endl;
-  }
-  
+  }  
   // We're done writing
   return 1;
 }
 
+Uns32T G::count_buckets_and_points_hashtable_row(bucket* bPtr){
+  Uns32T total_count = 0;
+  bucket* p = 0;
+
+  // count points
+#ifdef LSH_LIST_HEAD_COUNTERS
+  total_count = bPtr->t2; // points already counted    
+  p = bPtr->next; 
+#else
+  total_count = count_points_hashtable_row(bPtr);
+  p = bPtr; 
+#endif
+
+  // count buckets
+  do{
+    total_count++;    
+  }while((p=p->next));
+  
+  return total_count;
+}
+
+Uns32T G::count_points_hashtable_row(bucket* bPtr){
+  Uns32T point_count = 0;
+  bucket* p = bPtr;
+  sbucket* s = 0;
+  while(p){
+    s = p->snext;
+    while(s){
+      point_count++;
+      s=s->snext;
+    }
+    p=p->next;
+  }
+  return point_count;
+}
+
 void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){
   while(b && b->t2!=IFLAG){
     if(!b->snext){
@@ -1185,7 +1266,7 @@
       // Move disk pointer to beginning of row
       pe=pt+y*lshHeader->numCols;
       unserialize_hashtable_row_format1(pe, h[x]+y);
-#ifdef __LSH_DUMP_CORE_TABLES__
+#ifdef LSH_DUMP_CORE_TABLES
       printf("S[%d,%d]", x, y);
       serial_bucket_dump(pe);
       printf("C[%d,%d]", x, y);
@@ -1207,7 +1288,7 @@
   }
 }
  
-void G::unserialize_lsh_hashtables_format2(FILE* dbFile){
+void G::unserialize_lsh_hashtables_format2(FILE* dbFile, bool forMerge){
   Uns32T x=0,y=0;
 
   // Seek to hashtable base offset
@@ -1240,9 +1321,31 @@
 	  fclose(dbFile);
 	  error("Unserialized hashtable row pointer out of range","unserialize_lsh_hashtables_format2()");
 	}
-	Uns32T token = unserialize_hashtable_row_format2(dbFile, h[x]+y);
+	Uns32T token = 0;
+#ifdef LSH_CORE_ARRAY
+	Uns32T numElements;
+	if(fread(&numElements, sizeof(Uns32T), 1, dbFile) != 1){
+	  fclose(dbFile);
+	  error("Read error: numElements","unserialize_lsh_hashtables_format2()");
+	}
 	
-#ifdef __LSH_DUMP_CORE_TABLES__
+	// BACKWARD COMPATIBILITY: check to see if T2 or END token was read
+	if(numElements==O2_SERIAL_TOKEN_T2 || numElements==O2_SERIAL_TOKEN_ENDTABLE ){
+	  forMerge=true; // Force use of dynamic linked list core format
+	  token = numElements;
+	}
+
+	if(forMerge)
+	  // Use linked list CORE format
+	  token = unserialize_hashtable_row_format2(dbFile, h[x]+y, token);
+	else
+	  // Use ARRAY CORE format with numElements counter
+	  token = unserialize_hashtable_row_to_array(dbFile, h[x]+y, numElements);
+#else
+	token = unserialize_hashtable_row_format2(dbFile, h[x]+y);	
+#endif
+	
+#ifdef LSH_DUMP_CORE_TABLES
 	printf("C[%d,%d]", x, y);
 	dump_hashtable_row(h[x][y]);
 #endif
@@ -1263,16 +1366,21 @@
   }
 }
  
-Uns32T G::unserialize_hashtable_row_format2(FILE* dbFile, bucket** b){
+Uns32T G::unserialize_hashtable_row_format2(FILE* dbFile, bucket** b, Uns32T token){
   bool pointFound = false;
-  if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
-    fclose(dbFile);
-    error("Read error T2 token","unserialize_hashtable_row_format2");
-  }
+  
+  if(token)
+    H::t2 = token;
+  else if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
+      fclose(dbFile);
+      error("Read error T2 token","unserialize_hashtable_row_format2");
+    }
+
   if( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2)){
     fclose(dbFile);
     error("State machine error: expected E or T2");
   }
+
   while(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1)){
     pointFound=false;
     // Check for T2 token
@@ -1304,6 +1412,154 @@
   return H::t2; // holds current token
 }
 
+// Unserialize format2 hashtable row to a CORE_ARRAY pointed to
+// by the hashtable row pointer: rowPtr
+//
+// numElements is the total number of t2 buckets plus points
+// memory required is numElements+1+sizeof(hashtable ptr)
+//
+// numElements = numPoints + numBuckets
+//
+// During inserts (merges) new hashtable entries are appened at rowPtr+numPoints+numBuckets+1
+//
+// ASSUME: that LSH_LIST_HEAD_COUNTERS is set so that the first hashtable bucket is used to count
+// point and bucket entries
+//
+// We store the values of numPoints and numBuckets in separate fields of the first bucket
+// rowPtr->t2 // numPoints
+// (Uns32T)(rowPtr->snext) // numBuckets
+//
+// We cast the rowPtr->next pointer to (Uns32*) malloc(numElements*sizeof(Uns32T) + sizeof(bucket*))
+// To get to the fist bucket, we use 
+//
+
+#define READ_UNS32T(VAL,TOKENSTR)   if(fread(VAL, sizeof(Uns32T), 1, dbFile) != 1){\
+    fclose(dbFile);error("Read error unserialize_hashtable_format2",TOKENSTR);}
+
+#define TEST_TOKEN(TEST, TESTSTR)   if(TEST){fclose(dbFile);error("State machine error: ", TESTSTR);}
+
+#define SKIP_BITS_LEFT_SHIFT_MSB (30)
+
+#define SKIP_BITS_RIGHT_SHIFT_MSB (28)
+#define SKIP_BITS_RIGHT_SHIFT_LSB (30)
+
+#define MAX_POINTS_IN_BUCKET_CORE_ARRAY (16)
+#define LSH_CORE_ARRAY_END_ROW_TOKEN (0xFFFFFFFD)
+
+// Encode the skip bits. Zero if only one point, MAX 8 (plus first == 9)
+#define ENCODE_POINT_SKIP_BITS  TEST_TOKEN(!numPointsThisBucket, "no points found");\
+    if(numPointsThisBucket==1){\
+             secondPtr=ap++;\
+             *secondPtr=0;\
+             numPoints++;\
+    }\
+    if(numPointsThisBucket>1){\
+      *firstPtr |=  ( (numPointsThisBucket-1) & 0x3 ) << SKIP_BITS_LEFT_SHIFT_MSB;\
+      *secondPtr |= ( ( (numPointsThisBucket-1) & 0xC) >> 2 ) << SKIP_BITS_LEFT_SHIFT_MSB;}
+
+Uns32T G::unserialize_hashtable_row_to_array(FILE* dbFile, bucket** rowPP, Uns32T numElements){
+  Uns32T numPointsThisBucket = 0;
+  Uns32T numBuckets = 0;
+  Uns32T numPoints = 0;
+  Uns32T* firstPtr = 0;
+  Uns32T* secondPtr = 0;
+
+  // Initialize new row
+  if(!*rowPP){
+    *rowPP = new bucket();
+#ifdef LSH_LIST_HEAD_COUNTERS
+    (*rowPP)->t2 = 0; // Use t2 as a collision counter for the row
+    (*rowPP)->next = 0;
+#endif
+  }
+  bucket* rowPtr = *rowPP;
+
+  READ_UNS32T(&(H::t2),"t2");
+  TEST_TOKEN(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2), "expected E or T2");
+  // Because we encode points in 16-point blocks, we sometimes allocate repeated t2 elements
+  // So over-allocate by a factor of two and realloc later to actual numElements
+  CR_ASSERT(rowPtr->next = (bucket*) malloc((2*numElements+1)*sizeof(Uns32T)+sizeof(bucket**)));
+  Uns32T* ap = reinterpret_cast<Uns32T*>(rowPtr->next); // Cast pointer to Uns32T* for array format
+  while( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1) ){
+    numPointsThisBucket = 0;// reset bucket point counter
+    secondPtr = 0; // reset second-point pointer
+    TEST_TOKEN(H::t2!=O2_SERIAL_TOKEN_T2, "expected T2");
+    READ_UNS32T(&(H::t2), "Read error t2");
+    *ap++ = H::t2; // Insert t2 value into array
+    numBuckets++;
+    READ_UNS32T(&(H::p), "Read error H::p"); 
+    while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){      
+      if(numPointsThisBucket==MAX_POINTS_IN_BUCKET_CORE_ARRAY){
+	ENCODE_POINT_SKIP_BITS;
+	*ap++ = H::t2; // Extra element
+	numBuckets++;  // record this as a new bucket
+	numPointsThisBucket=0; // reset bucket point counter
+	secondPtr = 0; // reset second-point pointer
+      }
+      if( ++numPointsThisBucket == 1 )
+	firstPtr = ap;  // store pointer to first point to insert skip bits later on
+      else if( numPointsThisBucket == 2 )
+	secondPtr = ap;  // store pointer to first point to insert skip bits later on
+      numPoints++;
+      *ap++ = H::p;
+      READ_UNS32T(&(H::p), "Read error H::p"); 
+    }
+    ENCODE_POINT_SKIP_BITS;    
+    H::t2 = H::p; // Copy last found token to t2
+  }    
+  // Reallocate the row to its actual size
+  CR_ASSERT(rowPtr->next = (bucket*) realloc(rowPtr->next, (numBuckets+numPoints+1)*sizeof(Uns32T)+sizeof(bucket**)));
+  // Record the sizes at the head of the row
+  rowPtr->snext = (sbucket*) numBuckets;
+  rowPtr->t2 = numPoints;
+  // Place end of row marker
+  *ap++ = LSH_CORE_ARRAY_END_ROW_TOKEN;
+  // Set the LSH_CORE_ARRAY_BIT to identify data structure for insertion and retrieval
+  rowPtr->t2 |= LSH_CORE_ARRAY_BIT;
+  // Allocate a new dynamic list head at the end of the array
+  bucket** listPtr = reinterpret_cast<bucket**> (ap);
+  *listPtr = 0;
+  // Return current token
+  return H::t2; // return H::t2 which holds current token [E or T1]
+}
+
+
+
+ // *p is a pointer to the beginning of a hashtable row array
+ // The array consists of t2 hash keys and one or more point identifiers p for each hash key
+ // Retrieval is performed by generating a hash key query_t2 for query point q
+ // We identify the row that t2 is stored in using a secondary hash t1, this row is the entry
+ // point for retrieve_from_core_hashtable_array
+#define SKIP_BITS (0xC0000000)
+void G::retrieve_from_core_hashtable_array(Uns32T* p, Uns32T qpos){
+  Uns32T skip;
+  Uns32T t2;
+  Uns32T p1;
+  Uns32T p2;
+
+  CR_ASSERT(p);
+
+  do{
+    t2 = *p++;
+    if( t2 > H::t2 )
+      return;
+    p1 = *p++;
+    p2 = *p++;
+    skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB);
+    if( t2 == H::t2 ){
+      add_point_callback(calling_instance, p1 ^ (p1 & SKIP_BITS), qpos, radius);
+      if(skip--){
+	add_point_callback(calling_instance, p2 ^ (p2 & SKIP_BITS), qpos, radius);
+	while(skip-- )
+	  add_point_callback(calling_instance, *p++, qpos, radius);
+      }
+    }
+    else 
+      if(*p != LSH_CORE_ARRAY_END_ROW_TOKEN)
+	p = p + skip;
+  }while( *p != LSH_CORE_ARRAY_END_ROW_TOKEN );
+}
+ 
 void G::dump_hashtable_row(bucket* p){
   while(p && p->t2!=IFLAG){
     sbucket* sbp = p->snext;