Mercurial > hg > audiodb
view lshlib.cpp @ 340:a6edbe97fddf
Added LSH_CORE_ARRAY structure for hashtables instead of linked lists. Maintained Backwards Compatibiliity with indexes build for linked list format. Added tests for indexing and merging. Tested backwards compatibility OK.\n\n The purpose of the LSH_CORE_ARRAY data structure is greater space efficiency and L1/2 cache usage. Essential for multiple indexes with multiple hashtables in RAM
author | mas01mc |
---|---|
date | Wed, 10 Sep 2008 18:55:16 +0000 |
parents | fe4d5b763086 |
children | fdeb8db97678 |
line wrap: on
line source
#include "lshlib.h" //#define LSH_DUMP_CORE_TABLES //#define USE_U_FUNCTIONS // Use backward-compatible CORE ARRAY lsh index #define LSH_CORE_ARRAY // Set to use arrays for hashtables rather than linked-lists #define LSH_LIST_HEAD_COUNTERS // Enable counters in hashtable list heads // Critical path logic #if defined LSH_CORE_ARRAY && !defined LSH_LIST_HEAD_COUNTERS #define LSH_LIST_HEAD_COUNTERS #endif #define LSH_CORE_ARRAY_BIT (0x80000000) // LSH_CORE_ARRAY test bit for list head void err(char*s){cout << s << endl;exit(2);} Uns32T get_page_logn(){ int pagesz = (int)sysconf(_SC_PAGESIZE); return (Uns32T)log2((double)pagesz); } unsigned align_up(unsigned x, unsigned w){ return ((x) + ((1<<w)-1) & ~((1<<w)-1)); } void H::error(const char* a, const char* b, const char *sysFunc) { cerr << a << ": " << b << endl; if (sysFunc) { perror(sysFunc); } exit(1); } H::H(){ // Delay initialization of lsh functions until we know the parameters } H::H(Uns32T kk, Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float ww, float rr): #ifdef USE_U_FUNCTIONS use_u_functions(true), #else use_u_functions(false), #endif maxp(0), bucketCount(0), pointCount(0), N(NN), C(CC), k(kk), m(mm), L((mm*(mm-1))/2), d(dd), w(ww), radius(rr) { if(m<2){ m=2; L=1; // check value of L cout << "warning: setting m=2, L=1" << endl; } if(use_u_functions && k%2){ k++; // make sure k is even cout << "warning: setting k even" << endl; } // We have the necessary parameters, so construct hashfunction datastructures initialize_lsh_functions(); } void H::initialize_lsh_functions(){ H::P = UH_PRIME_DEFAULT; /* FIXME: don't use time(); instead use /dev/random or similar */ /* FIXME: write out the seed somewhere, so that we can get repeatability */ #ifdef MT19937 init_genrand(time(NULL)); #else srand(time(NULL)); // seed random number generator #endif Uns32T i,j, kk; #ifdef USE_U_FUNCTIONS H::A = new float**[ H::m ]; // m x k x d random projectors H::b = new float*[ H::m ]; // m x k random biases #else H::A = new float**[ H::L ]; // m x k x d random projectors H::b = new float*[ H::L ]; // m x k random biases #endif H::g = new Uns32T*[ H::L ]; // L x k random projections assert( H::g && H::A && H::b ); // failure #ifdef USE_U_FUNCTIONS // Use m \times u_i functions \in R^{(k/2) \times (d)} // Combine to make L=m(m-1)/2 hash functions \in R^{k \times d} for( j = 0; j < H::m ; j++ ){ // m functions u_i(v) H::A[j] = new float*[ H::k/2 ]; // k/2 x d 2-stable distribution coefficients H::b[j] = new float[ H::k/2 ]; // bias assert( H::A[j] && H::b[j] ); // failure for( kk = 0; kk < H::k/2 ; kk++ ){ H::A[j][kk] = new float[ H::d ]; assert( H::A[j][kk] ); // failure for(Uns32T i = 0 ; i < H::d ; i++ ) H::A[j][kk][i] = H::randn(); // Normal H::b[j][kk] = H::ranf()*H::w; // Uniform } } #else // Use m \times u_i functions \in R^{k \times (d)} // Combine to make L=m(m-1)/2 hash functions \in R^{k \times d} for( j = 0; j < H::L ; j++ ){ // m functions u_i(v) H::A[j] = new float*[ H::k ]; // k x d 2-stable distribution coefficients H::b[j] = new float[ H::k ]; // bias assert( H::A[j] && H::b[j] ); // failure for( kk = 0; kk < H::k ; kk++ ){ H::A[j][kk] = new float[ H::d ]; assert( H::A[j][kk] ); // failure for(Uns32T i = 0 ; i < H::d ; i++ ) H::A[j][kk][i] = H::randn(); // Normal H::b[j][kk] = H::ranf()*H::w; // Uniform } } #endif // Storage for LSH hash function output (Uns32T) for( j = 0 ; j < H::L ; j++ ){ // L functions g_j(u_a, u_b) a,b \in nchoosek(m,2) H::g[j] = new Uns32T[ H::k ]; // k x 32-bit hash values, gj(v)=[x0 x1 ... xk-1] xk \in Z assert( H::g[j] ); } // LSH Hash tables H::h = new bucket**[ H::L ]; assert( H::h ); for( j = 0 ; j < H::L ; j++ ){ H::h[j] = new bucket*[ H::N ]; assert( H::h[j] ); for( i = 0 ; i < H::N ; i++) H::h[j][i] = 0; } // Standard hash functions H::r1 = new Uns32T*[ H::L ]; H::r2 = new Uns32T*[ H::L ]; assert( H::r1 && H::r2 ); // failure for( j = 0 ; j < H::L ; j++ ){ H::r1[ j ] = new Uns32T[ H::k ]; H::r2[ j ] = new Uns32T[ H::k ]; assert( H::r1[j] && H::r2[j] ); // failure for( i = 0; i<H::k; i++){ H::r1[j][i] = randr(); H::r2[j][i] = randr(); } } // Storage for whole or partial function evaluation depdenting on USE_U_FUNCTIONS H::initialize_partial_functions(); } void H::initialize_partial_functions(){ #ifdef USE_U_FUNCTIONS H::uu = vector<vector<Uns32T> >(H::m); for( Uns32T aa=0 ; aa < H::m ; aa++ ) H::uu[aa] = vector<Uns32T>( H::k/2 ); #else H::uu = vector<vector<Uns32T> >(H::L); for( Uns32T aa=0 ; aa < H::L ; aa++ ) H::uu[aa] = vector<Uns32T>( H::k ); #endif } // Generate z ~ N(0,1) float H::randn(){ // Box-Muller float x1, x2; do{ x1 = ranf(); } while (x1 == 0); // cannot take log of 0 x2 = ranf(); float z; z = sqrtf(-2.0 * logf(x1)) * cosf(2.0 * M_PI * x2); return z; } float H::ranf(){ #ifdef MT19937 return (float) genrand_real2(); #else return (float)( (double)rand() / ((double)(RAND_MAX)+(double)(1)) ); #endif } // range is 1..2^29 /* FIXME: that looks like an ... odd range. Still. */ Uns32T H::randr(){ #ifdef MT19937 return (Uns32T)((genrand_int32() >> 3) + 1); #else return (Uns32T) ((rand() >> 2) + 1); #endif } // Destruct hash tables H::~H(){ Uns32T i,j,kk; bucket** pp; #ifdef USE_U_FUNCTIONS for( j = 0 ; j < H::m ; j++ ){ for( kk = 0 ; kk < H::k/2 ; kk++ ) delete[] A[j][kk]; delete[] A[j]; } delete[] A; for( j = 0 ; j < H::m ; j++ ) delete[] b[j]; delete[] b; #else for( j = 0 ; j < H::L ; j++ ){ for( kk = 0 ; kk < H::k ; kk++ ) delete[] A[j][kk]; delete[] A[j]; } delete[] A; for( j = 0 ; j < H::L ; j++ ) delete[] b[j]; delete[] b; #endif for( j = 0 ; j < H::L ; j++ ) delete[] g[j]; delete[] g; for( j=0 ; j < H::L ; j++ ){ delete[] H::r1[ j ]; delete[] H::r2[ j ]; for(i = 0; i< H::N ; i++){ pp = 0; #ifdef LSH_CORE_ARRAY if(H::h[ j ][ i ]) if(H::h[ j ][ i ]->t2 & LSH_CORE_ARRAY_BIT){ pp = get_pointer_to_bucket_linked_list(H::h[ j ][ i ]); if(*pp){ (*pp)->snext=0; // Head of list uses snext as a non-pointer value delete *pp; // Now the destructor can do its work properly } free(H::h[ j ][ i ]->next); H::h[ j ][ i ]->next = 0; // Zero next pointer H::h[ j ][ i ]->snext = 0; // Zero head-of-list pointer as above } #endif delete H::h[ j ][ i ]; } delete[] H::h[ j ]; } delete[] H::r1; delete[] H::r2; delete[] H::h; } // Compute all hash functions for vector v // #ifdef USE_U_FUNCTIONS use Combination of m \times h_i \in R^{(k/2) \times d} // to make L \times g_j functions \in Z^k void H::compute_hash_functions(vector<float>& v){ // v \in R^d float iw = 1. / H::w; // hash bucket width Uns32T aa, kk; if( v.size() != H::d ) error("v.size != H::d","","compute_hash_functions"); // check input vector dimensionality double tmp = 0; float *pA, *pb; Uns32T *pg; int dd; vector<float>::iterator vi; vector<Uns32T>::iterator ui; #ifdef USE_U_FUNCTIONS Uns32T bb; // Store m dot products to expand for( aa=0; aa < H::m ; aa++ ){ ui = H::uu[aa].begin(); for( kk = 0 ; kk < H::k/2 ; kk++ ){ pb = *( H::b + aa ) + kk; pA = * ( * ( H::A + aa ) + kk ); dd = H::d; tmp = 0.; vi = v.begin(); while( dd-- ) tmp += *pA++ * *vi++; // project tmp += *pb; // translate tmp *= iw; // scale *ui++ = (Uns32T) floor(tmp); // floor } } // Binomial combinations of functions u_{a,b} \in Z^{(k/2) \times d} Uns32T j; for( aa=0, j=0 ; aa < H::m-1 ; aa++ ) for( bb = aa + 1 ; bb < H::m ; bb++, j++ ){ pg= *( H::g + j ); // L \times functions g_j(v) \in Z^k // u_1 \in Z^{(k/2) \times d} ui = H::uu[aa].begin(); kk=H::k/2; while( kk-- ) *pg++ = *ui++; // hash function g_j(v)=[x1 x2 ... x(k/2)]; xk \in Z // u_2 \in Z^{(k/2) \times d} ui = H::uu[bb].begin(); kk=H::k/2; while( kk--) *pg++ = *ui++; // hash function g_j(v)=[x(k/2+1) x(k/2+2) ... xk]; xk \in Z } #else for( aa=0; aa < H::L ; aa++ ){ ui = H::uu[aa].begin(); for( kk = 0 ; kk < H::k ; kk++ ){ pb = *( H::b + aa ) + kk; pA = * ( * ( H::A + aa ) + kk ); dd = H::d; tmp = 0.; vi = v.begin(); while( dd-- ) tmp += *pA++ * *vi++; // project tmp += *pb; // translate tmp *= iw; // scale *ui++ = (Uns32T) (floor(tmp)); // floor } } // Compute hash functions for( aa=0 ; aa < H::L ; aa++ ){ pg= *( H::g + aa ); // L \times functions g_j(v) \in Z^k // u_1 \in Z^{k \times d} ui = H::uu[aa].begin(); kk=H::k; while( kk-- ) *pg++ = *ui++; // hash function g_j(v)=[x1 x2 ... xk]; xk \in Z } #endif } // make hash value \in Z void H::generate_hash_keys(Uns32T*g, Uns32T* r1, Uns32T* r2){ H::t1 = computeProductModDefaultPrime( g, r1, H::k ) % H::N; H::t2 = computeProductModDefaultPrime( g, r2, H::k ); } #define CR_ASSERT(b){if(!(b)){fprintf(stderr, "ASSERT failed on line %d, file %s.\n", __LINE__, __FILE__); exit(1);}} // Computes (a.b) mod UH_PRIME_DEFAULT inline Uns32T H::computeProductModDefaultPrime(Uns32T *a, Uns32T *b, IntT size){ LongUns64T h = 0; for(IntT i = 0; i < size; i++){ h = h + (LongUns64T)a[i] * (LongUns64T)b[i]; h = (h & TWO_TO_32_MINUS_1) + 5 * (h >> 32); if (h >= UH_PRIME_DEFAULT) { h = h - UH_PRIME_DEFAULT; } CR_ASSERT(h < UH_PRIME_DEFAULT); } return h; } Uns32T H::bucket_insert_point(bucket **pp){ collisionCount = 0; #ifdef LSH_LIST_HEAD_COUNTERS if(!*pp){ *pp = new bucket(); (*pp)->t2 = 0; // Use t2 as a collision counter for the row (*pp)->next = new bucket(); } // The list head holds point collision count if( (*pp)->t2 & LSH_CORE_ARRAY_BIT ) bucket_insert_point(get_pointer_to_bucket_linked_list(*pp)); // recurse else{ collisionCount = (*pp)->t2; if(collisionCount < H::C){ // Block if row is full (*pp)->t2++; // Increment collision counter (numPoints in row) pointCount++; collisionCount++; // Locate the bucket linked list __bucket_insert_point( (*pp)->next ); } } #else // NOT USING LSH_LIST_HEAD_COUNTERS if(!*pp) *pp = new bucket(); pointCount++; __bucket_insert_point(*pp); // No collision count storage #endif return collisionCount; } // insert points into hashtable row collision chain void H::__bucket_insert_point(bucket* p){ if(p->t2 == IFLAG){ // initialization flag, is it in the domain of t2? p->t2 = H::t2; bucketCount++; // Record start of new point-locale collision chain p->snext = new sbucket(); __sbucket_insert_point(p->snext); return; } if(p->t2 == H::t2){ __sbucket_insert_point(p->snext); return; } if(p->next){ // Construct list in t2 order if(H::t2 < p->next->t2){ bucket* tmp = new bucket(); tmp->next = p->next; p->next = tmp; __bucket_insert_point(tmp); } else __bucket_insert_point(p->next); } else { p->next = new bucket(); __bucket_insert_point(p->next); } } // insert points into point-locale collision chain void H::__sbucket_insert_point(sbucket* p){ if(p->pointID==IFLAG){ p->pointID = H::p; return; } // Search for pointID if(p->snext){ __sbucket_insert_point(p->snext); } else{ // Make new point collision bucket at end of list p->snext = new sbucket(); __sbucket_insert_point(p->snext); } } inline bucket** H::get_bucket(int j){ return *(h+j); } // Find the linked-list pointer at the end of the CORE_ARRAY bucket** H::get_pointer_to_bucket_linked_list(bucket* rowPtr){ Uns32T numBuckets = (Uns32T) rowPtr->snext; // Cast pointer to unsigned int Uns32T numPoints = rowPtr->t2 & 0x7FFFFFFF; // Value is stored in low 31 bits of t2 field bucket** listPtr = reinterpret_cast<bucket**> (reinterpret_cast<unsigned int*>(rowPtr->next)+numPoints+numBuckets+1); return listPtr; } // Interface to Locality Sensitive Hashing G G::G(float ww, Uns32T kk,Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float rr): H(kk,mm,dd,NN,CC,ww,rr), // constructor to initialize data structures indexName(0), lshHeader(0), calling_instance(0), add_point_callback(0) { } // Serialize from file LSH constructor // Read parameters from database file // Load the hash functions, close the database // Optionally load the LSH tables into head-allocated lists in core G::G(char* filename, bool lshInCoreFlag): H(), // default base-class constructor call delays data-structure initialization indexName(0), lshHeader(0), calling_instance(0), add_point_callback(0) { int dbfid = unserialize_lsh_header(filename); indexName = new char[O2_INDEX_MAXSTR]; strncpy(indexName, filename, O2_INDEX_MAXSTR); // COPY THE CONTENTS TO THE NEW POINTER H::initialize_lsh_functions(); // Base-class data-structure initialization unserialize_lsh_functions(dbfid); // populate with on-disk hashfunction values // Format1 only needs unserializing if specifically requested if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2) && lshInCoreFlag){ unserialize_lsh_hashtables_format1(dbfid); } // Format2 always needs unserializing if(lshHeader->flags&O2_SERIAL_FILEFORMAT2 && lshInCoreFlag){ FILE* dbFile = fdopen(dbfid, "rb"); if(!dbFile) error("Cannot open LSH file for reading", filename); unserialize_lsh_hashtables_format2(dbFile); } serial_close(dbfid); } G::~G(){ delete lshHeader; } // single point insertion; inserted values are hash value and pointID Uns32T G::insert_point(vector<float>& v, Uns32T pp){ Uns32T collisionCount = 0; H::p = pp; if(H::maxp && pp<=H::maxp) error("points must be indexed in strict ascending order", "LSH::insert_point(vector<float>&, Uns32T pointID)"); H::maxp=pp; // Store highest pointID in database H::compute_hash_functions( v ); for(Uns32T j = 0 ; j < H::L ; j++ ){ // insertion H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); collisionCount += bucket_insert_point( *(h + j) + t1 ); } return collisionCount; } // batch insert for a point set // inserted values are vector hash value and pointID starting at basePointID void G::insert_point_set(vector<vector<float> >& vv, Uns32T basePointID){ for(Uns32T point=0; point<vv.size(); point++) insert_point(vv[point], basePointID+point); } // point retrieval routine void G::retrieve_point(vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){ calling_instance = caller; add_point_callback = add_point; H::compute_hash_functions( v ); for(Uns32T j = 0 ; j < H::L ; j++ ){ H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); if( bucket* bPtr = *(get_bucket(j) + get_t1()) ) #ifdef LSH_LIST_HEAD_COUNTERS if(bPtr->t2&LSH_CORE_ARRAY_BIT) retrieve_from_core_hashtable_array((Uns32T*)(bPtr->next), qpos); else bucket_chain_point( bPtr->next, qpos); #else bucket_chain_point( bPtr , qpos); #endif } } void G::retrieve_point_set(vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller){ for(Uns32T qpos = 0 ; qpos < vv.size() ; qpos++ ) retrieve_point(vv[qpos], qpos, add_point, caller); } // export lsh tables to table structure on disk // // LSH TABLE STRUCTURE // ---header 64 bytes --- // [magic #tables #rows #cols elementSize databaseSize version flags dim #funs 0 0 0 0 0 0] // // ---random projections L x k x d float --- // A[0][0][0] A[0][0][1] ... A[0][0][d-1] // A[0][1][0] A[0][1][1] ... A[1][1][d-1] // ... // A[0][K-1][0] A[0][1][1] ... A[0][k-1][d-1] // ... // ... // A[L-1][0][0] A[M-1][0][1] ... A[L-1][0][d-1] // A[L-1][1][0] A[M-1][1][1] ... A[L-1][1][d-1] // ... // A[L-1][k-1][0] A[M-1][1][1] ... A[L-1][k-1][d-1] // // ---bias L x k float --- // b[0][0] b[0][1] ... b[0][k-1] // b[1][0] b[1][1] ... b[1][k-1] // ... // b[L-1][0] b[L-1][1] ... b[L-1][k-1] // // ---random r1 L x k float --- // r1[0][0] r1[0][1] ... r1[0][k-1] // r1[1][0] r1[1][1] ... r1[1][k-1] // ... // r1[L-1][0] r1[L-1][1] ... r1[L-1][k-1] // // ---random r2 L x k float --- // r2[0][0] r2[0][1] ... r2[0][k-1] // r2[1][0] r2[1][1] ... r2[1][k-1] // ... // r2[L-1][0] r2[L-1][1] ... r2[L-1][k-1] // // ******* HASHTABLES FORMAT1 (optimized for LSH_ON_DISK retrieval) ******* // ---hash table 0: N x C x 8 --- // [t2 pointID][t2 pointID]...[t2 pointID] // [t2 pointID][t2 pointID]...[t2 pointID] // ... // [t2 pointID][t2 pointID]...[t2 pointID] // // ---hash table 1: N x C x 8 --- // [t2 pointID][t2 pointID]...[t2 pointID] // [t2 pointID][t2 pointID]...[t2 pointID] // ... // [t2 pointID][t2 pointID]...[t2 pointID] // // ... // // ---hash table L-1: N x C x 8 --- // [t2 pointID][t2 pointID]...[t2 pointID] // [t2 pointID][t2 pointID]...[t2 pointID] // ... // [t2 pointID][t2 pointID]...[t2 pointID] // // ******* HASHTABLES FORMAT2 (optimized for LSH_IN_CORE retrieval) ******* // // State machine controlled by regular expression. // legend: // // O2_SERIAL_TOKEN_T1 = 0xFFFFFFFCU // O2_SERIAL_TOKEN_T2 = 0xFFFFFFFDU // O2_SERIAL_TOKEN_ENDTABLE = 0xFFFFFFFEU // // T1 - T1 hash token // t1 - t1 hash key (t1 range 0..2^29-1) // T2 - T2 token // t2 - t2 hash key (range 1..2^32-6) // p - point identifier (range 0..2^32-1) // E - end hash table token // {...} required arguments // [...] optional arguments // * - match zero or more occurences // + - match one or more occurences // {...}^L - repeat argument L times // // FORMAT2 Regular expression: // { [T1 t1 [T2 t2 p+]+ ]* E }^L // // Serial header constructors SerialHeader::SerialHeader(){;} SerialHeader::SerialHeader(float W, Uns32T L, Uns32T N, Uns32T C, Uns32T k, Uns32T d, float r, Uns32T p, Uns32T FMT, Uns32T pc): lshMagic(O2_SERIAL_MAGIC), binWidth(W), numTables(L), numRows(N), numCols(C), elementSize(O2_SERIAL_ELEMENT_SIZE), version(O2_SERIAL_VERSION), size(0), // we are deprecating this value flags(FMT), dataDim(d), numFuns(k), radius(r), maxp(p), size_long((unsigned long long)L * align_up((unsigned long long)N * C * O2_SERIAL_ELEMENT_SIZE, get_page_logn()) // hash tables + align_up(O2_SERIAL_HEADER_SIZE + // header + hash functions (unsigned long long)L*k*( sizeof(float)*d+2*sizeof(Uns32T)+sizeof(float)),get_page_logn())), pointCount(pc){ if(FMT==O2_SERIAL_FILEFORMAT2) size_long = (unsigned long long)align_up(O2_SERIAL_HEADER_SIZE + (unsigned long long)L*k*(sizeof(float)*d+2+sizeof(Uns32T) +sizeof(float)) + (unsigned long long)pc*16UL,get_page_logn()); } // header float* G::get_serial_hashfunction_base(char* db){ if(db&&lshHeader) return (float*)(db+O2_SERIAL_HEADER_SIZE); else return NULL; } SerialElementT* G::get_serial_hashtable_base(char* db){ if(db&&lshHeader) return (SerialElementT*)(db+get_serial_hashtable_offset()); else return NULL; } Uns32T G::get_serial_hashtable_offset(){ if(lshHeader) return align_up(O2_SERIAL_HEADER_SIZE + L*lshHeader->numFuns*( sizeof(float)*lshHeader->dataDim+2*sizeof(Uns32T)+sizeof(float)),get_page_logn()); else return 0; } void G::serialize(char* filename, Uns32T serialFormat){ int dbfid; char* db; int dbIsNew=0; // Check requested serialFormat if(!(serialFormat==O2_SERIAL_FILEFORMAT1 || serialFormat==O2_SERIAL_FILEFORMAT2)) error("Unrecognized serial file format request: ", "serialize()"); // Test to see if file exists if((dbfid = open (filename, O_RDONLY)) < 0) // If it doesn't, then create the file (CREATE) if(errno == ENOENT){ // Create the file std::cout << "Creating new serialized LSH database:" << filename << "..."; std::cout.flush(); serial_create(filename, serialFormat); dbIsNew=1; } else // The file can't be opened error("Can't open the file", filename, "open"); // Load the on-disk header into core dbfid = serial_open(filename, 1); // open for write db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer serial_get_header(db); // read header serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap // Check compatibility of core and disk data structures if( !serial_can_merge(serialFormat) ) error("Incompatible core and serial LSH, data structure dimensions mismatch."); // For new LSH databases write the hashfunctions if(dbIsNew) serialize_lsh_hashfunctions(dbfid); // Write the hashtables in the requested format if(serialFormat == O2_SERIAL_FILEFORMAT1) serialize_lsh_hashtables_format1(dbfid, !dbIsNew); else{ FILE* dbFile = fdopen(dbfid, "r+b"); if(!dbFile) error("Cannot open LSH file for writing",filename); serialize_lsh_hashtables_format2(dbFile, !dbIsNew); fflush(dbFile); } if(!dbIsNew) { db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer //serial_get_header(db); // read header cout << "maxp = " << H::maxp << endl; lshHeader->maxp=H::maxp; // Default to FILEFORMAT1 if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2)) lshHeader->flags|=O2_SERIAL_FILEFORMAT1; memcpy((char*)db, (char*)lshHeader, sizeof(SerialHeaderT)); serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap } serial_close(dbfid); } // Test to see if core structure and requested format is // compatible with currently opened database int G::serial_can_merge(Uns32T format){ SerialHeaderT* that = lshHeader; if( (format==O2_SERIAL_FILEFORMAT2 && !that->flags&O2_SERIAL_FILEFORMAT2) || (format!=O2_SERIAL_FILEFORMAT2 && that->flags&O2_SERIAL_FILEFORMAT2) || !( this->w == that->binWidth && this->L == that->numTables && this->N == that->numRows && this->k == that->numFuns && this->d == that->dataDim && sizeof(SerialElementT) == that->elementSize && this->radius == that->radius)){ serial_print_header(format); return 0; } else return 1; } // Used as an error message for serial_can_merge() void G::serial_print_header(Uns32T format){ std::cout << "Fc:" << format << " Fs:" << lshHeader->flags << endl; std::cout << "Wc:" << w << " Ls:" << lshHeader->binWidth << endl; std::cout << "Lc:" << L << " Ls:" << lshHeader->numTables << endl; std::cout << "Nc:" << N << " Ns:" << lshHeader->numRows << endl; std::cout << "kc:" << k << " ks:" << lshHeader->numFuns << endl; std::cout << "dc:" << d << " ds:" << lshHeader->dataDim << endl; std::cout << "sc:" << sizeof(SerialElementT) << " ss:" << lshHeader->elementSize << endl; std::cout << "rc:" << this->radius << " rs:" << lshHeader->radius << endl; } int G::serialize_lsh_hashfunctions(int fid){ float* pf; Uns32T *pu; Uns32T x,y,z; char* db = serial_mmap(fid, get_serial_hashtable_offset(), 1);// get database pointer pf = get_serial_hashfunction_base(db); // HASH FUNCTIONS // Write the random projectors A[][][] #ifdef USE_U_FUNCTIONS for( x = 0 ; x < H::m ; x++ ) for( y = 0 ; y < H::k/2 ; y++ ) #else for( x = 0 ; x < H::L ; x++ ) for( y = 0 ; y < H::k ; y++ ) #endif for( z = 0 ; z < d ; z++ ) *pf++ = H::A[x][y][z]; // Write the random biases b[][] #ifdef USE_U_FUNCTIONS for( x = 0 ; x < H::m ; x++ ) for( y = 0 ; y < H::k/2 ; y++ ) #else for( x = 0 ; x < H::L ; x++ ) for( y = 0 ; y < H::k ; y++ ) #endif *pf++ = H::b[x][y]; pu = (Uns32T*)pf; // Write the Z projectors r1[][] for( x = 0 ; x < H::L ; x++) for( y = 0 ; y < H::k ; y++) *pu++ = H::r1[x][y]; // Write the Z projectors r2[][] for( x = 0 ; x < H::L ; x++) for( y = 0; y < H::k ; y++) *pu++ = H::r2[x][y]; serial_munmap(db, get_serial_hashtable_offset()); return 1; } int G::serialize_lsh_hashtables_format1(int fid, int merge){ SerialElementT *pe, *pt; Uns32T x,y; if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT1) ) error("Cannot merge core and serial LSH, data structure dimensions mismatch."); Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount; // Write the hash tables for( x = 0 ; x < H::L ; x++ ){ std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT1..."; std::cout.flush(); // memory map a single hash table for sequential access // Align each hash table to page boundary char* dbtable = serial_mmap(fid, hashTableSize, 1, align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn())); #ifdef __CYGWIN__ // No madvise in CYGWIN #else if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0) error("could not advise hashtable memory","","madvise"); #endif maxColCount=0; minColCount=O2_SERIAL_MAX_COLS; meanColCount=0; colCountN=0; pt=(SerialElementT*)dbtable; for( y = 0 ; y < H::N ; y++ ){ // Move disk pointer to beginning of row pe=pt+y*lshHeader->numCols; colCount=0; if(bucket* bPtr = h[x][y]) if(merge) #ifdef LSH_LIST_HEAD_COUNTERS serial_merge_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket else serial_write_hashtable_row_format1(pe, bPtr->next, colCount); // skip collision counter bucket #else serial_merge_hashtable_row_format1(pe, bPtr, colCount); else serial_write_hashtable_row_format1(pe, bPtr, colCount); #endif if(colCount){ if(colCount<minColCount) minColCount=colCount; if(colCount>maxColCount) maxColCount=colCount; meanColCount+=colCount; colCountN++; } } if(colCountN) std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN << ", min = " << minColCount << ", max = " << maxColCount << endl; serial_munmap(dbtable, hashTableSize); } // We're done writing return 1; } void G::serial_merge_hashtable_row_format1(SerialElementT* pr, bucket* b, Uns32T& colCount){ while(b && b->t2!=IFLAG){ SerialElementT*pe=pr; // reset disk pointer to beginning of row serial_merge_element_format1(pe, b->snext, b->t2, colCount); b=b->next; } } void G::serial_merge_element_format1(SerialElementT* pe, sbucket* sb, Uns32T t2, Uns32T& colCount){ while(sb){ if(colCount==lshHeader->numCols){ std::cout << "!point-chain full " << endl; return; } Uns32T c=0; // Merge collision chains while(c<lshHeader->numCols){ if( (pe+c)->hashValue==IFLAG){ (pe+c)->hashValue=t2; (pe+c)->pointID=sb->pointID; colCount=c+1; if(c+1<lshHeader->numCols) (pe+c+1)->hashValue=IFLAG; break; } c++; } sb=sb->snext; } return; } void G::serial_write_hashtable_row_format1(SerialElementT*& pe, bucket* b, Uns32T& colCount){ pe->hashValue=IFLAG; while(b && b->t2!=IFLAG){ serial_write_element_format1(pe, b->snext, b->t2, colCount); b=b->next; } } void G::serial_write_element_format1(SerialElementT*& pe, sbucket* sb, Uns32T t2, Uns32T& colCount){ while(sb){ if(colCount==lshHeader->numCols){ std::cout << "!point-chain full " << endl; return; } pe->hashValue=t2; pe->pointID=sb->pointID; pe++; colCount++; sb=sb->snext; } pe->hashValue=IFLAG; return; } int G::serialize_lsh_hashtables_format2(FILE* dbFile, int merge){ Uns32T x,y; if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT2) ) error("Cannot merge core and serial LSH, data structure dimensions mismatch."); // We must pereform FORMAT2 merges in core FORMAT1 (dynamic list structure) if(merge) unserialize_lsh_hashtables_format2(dbFile, merge); Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount, t1; if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){ fclose(dbFile); error("fSeek error in serialize_lsh_hashtables_format2"); } // Write the hash tables for( x = 0 ; x < H::L ; x++ ){ std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT2..."; std::cout.flush(); maxColCount=0; minColCount=O2_SERIAL_MAX_COLS; meanColCount=0; colCountN=0; for( y = 0 ; y < H::N ; y++ ){ colCount=0; if(bucket* bPtr = h[x][y]){ // Check for empty row (even though row was allocated) #ifdef LSH_LIST_HEAD_COUNTERS if(bPtr->next->t2==IFLAG){ fclose(dbFile); error("b->next->t2==IFLAG","serialize_lsh_hashtables_format2()"); } #else if(bPtr->t2==IFLAG){ fclose(dbFile); error("b->t2==IFLAG","serialize_lsh_hashtables_format2()"); } #endif t1 = O2_SERIAL_TOKEN_T1; WRITE_UNS32(&t1, "[T1]"); t1 = y; WRITE_UNS32(&t1, "[t1]"); #ifdef LSH_CORE_ARRAY t1 = count_buckets_and_points_hashtable_row(bPtr); WRITE_UNS32(&t1,"[count]"); // write numElements #endif #ifdef LSH_LIST_HEAD_COUNTERS serial_write_hashtable_row_format2(dbFile, bPtr->next, colCount); // skip collision counter bucket #else serial_write_hashtable_row_format2(dbFile, bPtr, colCount); #endif } if(colCount){ if(colCount<minColCount) minColCount=colCount; if(colCount>maxColCount) maxColCount=colCount; meanColCount+=colCount; colCountN++; } } // Write END of table marker t1 = O2_SERIAL_TOKEN_ENDTABLE; WRITE_UNS32(&t1,"[end]"); if(colCountN) std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN << ", min = " << minColCount << ", max = " << maxColCount << endl; } // We're done writing return 1; } Uns32T G::count_buckets_and_points_hashtable_row(bucket* bPtr){ Uns32T total_count = 0; bucket* p = 0; // count points #ifdef LSH_LIST_HEAD_COUNTERS total_count = bPtr->t2; // points already counted p = bPtr->next; #else total_count = count_points_hashtable_row(bPtr); p = bPtr; #endif // count buckets do{ total_count++; }while((p=p->next)); return total_count; } Uns32T G::count_points_hashtable_row(bucket* bPtr){ Uns32T point_count = 0; bucket* p = bPtr; sbucket* s = 0; while(p){ s = p->snext; while(s){ point_count++; s=s->snext; } p=p->next; } return point_count; } void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){ while(b && b->t2!=IFLAG){ if(!b->snext){ fclose(dbFile); error("Empty collision chain in serial_write_hashtable_row_format2()"); } t2 = O2_SERIAL_TOKEN_T2; if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){ fclose(dbFile); error("write error in serial_write_hashtable_row_format2()"); } t2 = b->t2; if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){ fclose(dbFile); error("write error in serial_write_hashtable_row_format2()"); } serial_write_element_format2(dbFile, b->snext, colCount); b=b->next; } } void G::serial_write_element_format2(FILE* dbFile, sbucket* sb, Uns32T& colCount){ while(sb){ if(fwrite(&sb->pointID, sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Write error in serial_write_element_format2()"); } colCount++; sb=sb->snext; } } int G::serial_create(char* filename, Uns32T FMT){ return serial_create(filename, w, L, N, C, k, d, FMT); } int G::serial_create(char* filename, float binWidth, Uns32T numTables, Uns32T numRows, Uns32T numCols, Uns32T numFuns, Uns32T dim, Uns32T FMT){ if(numTables > O2_SERIAL_MAX_TABLES || numRows > O2_SERIAL_MAX_ROWS || numCols > O2_SERIAL_MAX_COLS || numFuns > O2_SERIAL_MAX_FUNS || dim>O2_SERIAL_MAX_DIM){ error("LSH parameters out of bounds for serialization"); } int dbfid; if ((dbfid = open (filename, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)) < 0) error("Can't create serial file", filename, "open"); get_lock(dbfid, 1); // Make header first to get size of serialized database lshHeader = new SerialHeaderT(binWidth, numTables, numRows, numCols, numFuns, dim, radius, maxp, FMT, pointCount); cout << "file size: <=" << lshHeader->get_size()/1024UL << "KB" << endl; if(lshHeader->get_size()>O2_SERIAL_MAXFILESIZE) error("Maximum size of LSH file exceded: > 4000MB"); // go to the location corresponding to the last byte if (lseek (dbfid, lshHeader->get_size() - 1, SEEK_SET) == -1) error("lseek error in db file", "", "lseek"); // write a dummy byte at the last location if (write (dbfid, "", 1) != 1) error("write error", "", "write"); char* db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1); memcpy (db, lshHeader, O2_SERIAL_HEADER_SIZE); serial_munmap(db, O2_SERIAL_HEADER_SIZE); close(dbfid); std::cout << "done initializing tables." << endl; return 1; } char* G::serial_mmap(int dbfid, Uns32T memSize, Uns32T forWrite, off_t offset){ char* db; if(forWrite){ if ((db = (char*) mmap(0, memSize, PROT_READ | PROT_WRITE, MAP_SHARED, dbfid, offset)) == (caddr_t) -1) error("mmap error in request for writable serialized database", "", "mmap"); } else if ((db = (char*) mmap(0, memSize, PROT_READ, MAP_SHARED, dbfid, offset)) == (caddr_t) -1) error("mmap error in read-only serialized database", "", "mmap"); return db; } SerialHeaderT* G::serial_get_header(char* db){ lshHeader = new SerialHeaderT(); memcpy((char*)lshHeader, db, sizeof(SerialHeaderT)); if(lshHeader->lshMagic!=O2_SERIAL_MAGIC) error("Not an LSH database file"); return lshHeader; } void G::serial_munmap(char* db, Uns32T N){ munmap(db, N); } int G::serial_open(char* filename, int writeFlag){ int dbfid; if(writeFlag){ if ((dbfid = open (filename, O_RDWR)) < 0) error("Can't open serial file for read/write", filename, "open"); get_lock(dbfid, writeFlag); } else{ if ((dbfid = open (filename, O_RDONLY)) < 0) error("Can't open serial file for read", filename, "open"); get_lock(dbfid, 0); } return dbfid; } void G::serial_close(int dbfid){ release_lock(dbfid); close(dbfid); } int G::unserialize_lsh_header(char* filename){ int dbfid; char* db; // Test to see if file exists if((dbfid = open (filename, O_RDONLY)) < 0) error("Can't open the file", filename, "open"); close(dbfid); dbfid = serial_open(filename, 0); // open for read db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer serial_get_header(db); // read header serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap // Unserialize header parameters H::L = lshHeader->numTables; H::m = (Uns32T)( (1.0 + sqrt(1 + 8.0*(int)H::L)) / 2.0); H::N = lshHeader->numRows; H::C = lshHeader->numCols; H::k = lshHeader->numFuns; H::d = lshHeader->dataDim; H::w = lshHeader->binWidth; H::radius = lshHeader->radius; H::maxp = lshHeader->maxp; H::pointCount = lshHeader->pointCount; return dbfid; } // unserialize the LSH parameters // we leave the LSH tree on disk as a flat file // it is this flat file that we search by memory mapping void G::unserialize_lsh_functions(int dbfid){ Uns32T j, kk; float* pf; Uns32T* pu; // Load the hash functions into core char* db = serial_mmap(dbfid, get_serial_hashtable_offset(), 0);// get database pointer again pf = get_serial_hashfunction_base(db); #ifdef USE_U_FUNCTIONS for( j = 0 ; j < H::m ; j++ ){ // L functions gj(v) for( kk = 0 ; kk < H::k/2 ; kk++ ){ // Normally distributed hash functions #else for( j = 0 ; j < H::L ; j++ ){ // L functions gj(v) for( kk = 0 ; kk < H::k ; kk++ ){ // Normally distributed hash functions #endif for(Uns32T i = 0 ; i < H::d ; i++ ) H::A[j][kk][i] = *pf++; // Normally distributed random vectors } } #ifdef USE_U_FUNCTIONS for( j = 0 ; j < H::m ; j++ ) // biases b for( kk = 0 ; kk < H::k/2 ; kk++ ) #else for( j = 0 ; j < H::L ; j++ ) // biases b for( kk = 0 ; kk < H::k ; kk++ ) #endif H::b[j][kk] = *pf++; pu = (Uns32T*)pf; for( j = 0 ; j < H::L ; j++ ) // Z projectors r1 for( kk = 0 ; kk < H::k ; kk++ ) H::r1[j][kk] = *pu++; for( j = 0 ; j < H::L ; j++ ) // Z projectors r2 for( kk = 0 ; kk < H::k ; kk++ ) H::r2[j][kk] = *pu++; serial_munmap(db, get_serial_hashtable_offset()); } void G::unserialize_lsh_hashtables_format1(int fid){ SerialElementT *pe, *pt; Uns32T x,y; Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; // Read the hash tables into core for( x = 0 ; x < H::L ; x++ ){ // memory map a single hash table // Align each hash table to page boundary char* dbtable = serial_mmap(fid, hashTableSize, 0, align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn())); #ifdef __CYGWIN__ // No madvise in CYGWIN #else if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0) error("could not advise hashtable memory","","madvise"); #endif pt=(SerialElementT*)dbtable; for( y = 0 ; y < H::N ; y++ ){ // Move disk pointer to beginning of row pe=pt+y*lshHeader->numCols; unserialize_hashtable_row_format1(pe, h[x]+y); #ifdef LSH_DUMP_CORE_TABLES printf("S[%d,%d]", x, y); serial_bucket_dump(pe); printf("C[%d,%d]", x, y); dump_hashtable_row(h[x][y]); #endif } serial_munmap(dbtable, hashTableSize); } } void G::unserialize_hashtable_row_format1(SerialElementT* pe, bucket** b){ Uns32T colCount = 0; while(colCount!=lshHeader->numCols && pe->hashValue !=IFLAG){ H::p = pe->pointID; // current point ID t2 = pe->hashValue; bucket_insert_point(b); pe++; colCount++; } } void G::unserialize_lsh_hashtables_format2(FILE* dbFile, bool forMerge){ Uns32T x=0,y=0; // Seek to hashtable base offset if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){ fclose(dbFile); error("fSeek error in unserialize_lsh_hashtables_format2"); } // Read the hash tables into core (structure is given in header) while( x < H::L){ if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error","unserialize_lsh_hashtables_format2()"); } if(H::t1==O2_SERIAL_TOKEN_ENDTABLE) x++; // End of table else while(y < H::N){ // Read a row and move file pointer to beginning of next row or _bittable if(!(H::t1==O2_SERIAL_TOKEN_T1)){ fclose(dbFile); error("State matchine error T1","unserialize_lsh_hashtables_format2()"); } if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error: t1","unserialize_lsh_hashtables_format2()"); } y = H::t1; if(y>=H::N){ fclose(dbFile); error("Unserialized hashtable row pointer out of range","unserialize_lsh_hashtables_format2()"); } Uns32T token = 0; #ifdef LSH_CORE_ARRAY Uns32T numElements; if(fread(&numElements, sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error: numElements","unserialize_lsh_hashtables_format2()"); } // BACKWARD COMPATIBILITY: check to see if T2 or END token was read if(numElements==O2_SERIAL_TOKEN_T2 || numElements==O2_SERIAL_TOKEN_ENDTABLE ){ forMerge=true; // Force use of dynamic linked list core format token = numElements; } if(forMerge) // Use linked list CORE format token = unserialize_hashtable_row_format2(dbFile, h[x]+y, token); else // Use ARRAY CORE format with numElements counter token = unserialize_hashtable_row_to_array(dbFile, h[x]+y, numElements); #else token = unserialize_hashtable_row_format2(dbFile, h[x]+y); #endif #ifdef LSH_DUMP_CORE_TABLES printf("C[%d,%d]", x, y); dump_hashtable_row(h[x][y]); #endif // Check that token is valid if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){ fclose(dbFile); error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()"); } // Check for end of table flag if(token==O2_SERIAL_TOKEN_ENDTABLE){ x++; break; } // Check for new row flag if(token==O2_SERIAL_TOKEN_T1) H::t1 = token; } } } Uns32T G::unserialize_hashtable_row_format2(FILE* dbFile, bucket** b, Uns32T token){ bool pointFound = false; if(token) H::t2 = token; else if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error T2 token","unserialize_hashtable_row_format2"); } if( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2)){ fclose(dbFile); error("State machine error: expected E or T2"); } while(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1)){ pointFound=false; // Check for T2 token if(H::t2!=O2_SERIAL_TOKEN_T2){ fclose(dbFile); error("State machine error T2 token", "unserialize_hashtable_row_format2()"); } // Read t2 value if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error t2","unserialize_hashtable_row_format2"); } if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error H::p","unserialize_hashtable_row_format2"); } while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){ pointFound=true; bucket_insert_point(b); if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){ fclose(dbFile); error("Read error H::p","unserialize_hashtable_row_format2"); } } if(!pointFound) error("State machine error: point", "unserialize_hashtable_row_format2()"); H::t2 = H::p; // Copy last found token to t2 } return H::t2; // holds current token } // Unserialize format2 hashtable row to a CORE_ARRAY pointed to // by the hashtable row pointer: rowPtr // // numElements is the total number of t2 buckets plus points // memory required is numElements+1+sizeof(hashtable ptr) // // numElements = numPoints + numBuckets // // During inserts (merges) new hashtable entries are appened at rowPtr+numPoints+numBuckets+1 // // ASSUME: that LSH_LIST_HEAD_COUNTERS is set so that the first hashtable bucket is used to count // point and bucket entries // // We store the values of numPoints and numBuckets in separate fields of the first bucket // rowPtr->t2 // numPoints // (Uns32T)(rowPtr->snext) // numBuckets // // We cast the rowPtr->next pointer to (Uns32*) malloc(numElements*sizeof(Uns32T) + sizeof(bucket*)) // To get to the fist bucket, we use // #define READ_UNS32T(VAL,TOKENSTR) if(fread(VAL, sizeof(Uns32T), 1, dbFile) != 1){\ fclose(dbFile);error("Read error unserialize_hashtable_format2",TOKENSTR);} #define TEST_TOKEN(TEST, TESTSTR) if(TEST){fclose(dbFile);error("State machine error: ", TESTSTR);} #define SKIP_BITS_LEFT_SHIFT_MSB (30) #define SKIP_BITS_RIGHT_SHIFT_MSB (28) #define SKIP_BITS_RIGHT_SHIFT_LSB (30) #define MAX_POINTS_IN_BUCKET_CORE_ARRAY (16) #define LSH_CORE_ARRAY_END_ROW_TOKEN (0xFFFFFFFD) // Encode the skip bits. Zero if only one point, MAX 8 (plus first == 9) #define ENCODE_POINT_SKIP_BITS TEST_TOKEN(!numPointsThisBucket, "no points found");\ if(numPointsThisBucket==1){\ secondPtr=ap++;\ *secondPtr=0;\ numPoints++;\ }\ if(numPointsThisBucket>1){\ *firstPtr |= ( (numPointsThisBucket-1) & 0x3 ) << SKIP_BITS_LEFT_SHIFT_MSB;\ *secondPtr |= ( ( (numPointsThisBucket-1) & 0xC) >> 2 ) << SKIP_BITS_LEFT_SHIFT_MSB;} Uns32T G::unserialize_hashtable_row_to_array(FILE* dbFile, bucket** rowPP, Uns32T numElements){ Uns32T numPointsThisBucket = 0; Uns32T numBuckets = 0; Uns32T numPoints = 0; Uns32T* firstPtr = 0; Uns32T* secondPtr = 0; // Initialize new row if(!*rowPP){ *rowPP = new bucket(); #ifdef LSH_LIST_HEAD_COUNTERS (*rowPP)->t2 = 0; // Use t2 as a collision counter for the row (*rowPP)->next = 0; #endif } bucket* rowPtr = *rowPP; READ_UNS32T(&(H::t2),"t2"); TEST_TOKEN(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2), "expected E or T2"); // Because we encode points in 16-point blocks, we sometimes allocate repeated t2 elements // So over-allocate by a factor of two and realloc later to actual numElements CR_ASSERT(rowPtr->next = (bucket*) malloc((2*numElements+1)*sizeof(Uns32T)+sizeof(bucket**))); Uns32T* ap = reinterpret_cast<Uns32T*>(rowPtr->next); // Cast pointer to Uns32T* for array format while( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1) ){ numPointsThisBucket = 0;// reset bucket point counter secondPtr = 0; // reset second-point pointer TEST_TOKEN(H::t2!=O2_SERIAL_TOKEN_T2, "expected T2"); READ_UNS32T(&(H::t2), "Read error t2"); *ap++ = H::t2; // Insert t2 value into array numBuckets++; READ_UNS32T(&(H::p), "Read error H::p"); while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){ if(numPointsThisBucket==MAX_POINTS_IN_BUCKET_CORE_ARRAY){ ENCODE_POINT_SKIP_BITS; *ap++ = H::t2; // Extra element numBuckets++; // record this as a new bucket numPointsThisBucket=0; // reset bucket point counter secondPtr = 0; // reset second-point pointer } if( ++numPointsThisBucket == 1 ) firstPtr = ap; // store pointer to first point to insert skip bits later on else if( numPointsThisBucket == 2 ) secondPtr = ap; // store pointer to first point to insert skip bits later on numPoints++; *ap++ = H::p; READ_UNS32T(&(H::p), "Read error H::p"); } ENCODE_POINT_SKIP_BITS; H::t2 = H::p; // Copy last found token to t2 } // Reallocate the row to its actual size CR_ASSERT(rowPtr->next = (bucket*) realloc(rowPtr->next, (numBuckets+numPoints+1)*sizeof(Uns32T)+sizeof(bucket**))); // Record the sizes at the head of the row rowPtr->snext = (sbucket*) numBuckets; rowPtr->t2 = numPoints; // Place end of row marker *ap++ = LSH_CORE_ARRAY_END_ROW_TOKEN; // Set the LSH_CORE_ARRAY_BIT to identify data structure for insertion and retrieval rowPtr->t2 |= LSH_CORE_ARRAY_BIT; // Allocate a new dynamic list head at the end of the array bucket** listPtr = reinterpret_cast<bucket**> (ap); *listPtr = 0; // Return current token return H::t2; // return H::t2 which holds current token [E or T1] } // *p is a pointer to the beginning of a hashtable row array // The array consists of t2 hash keys and one or more point identifiers p for each hash key // Retrieval is performed by generating a hash key query_t2 for query point q // We identify the row that t2 is stored in using a secondary hash t1, this row is the entry // point for retrieve_from_core_hashtable_array #define SKIP_BITS (0xC0000000) void G::retrieve_from_core_hashtable_array(Uns32T* p, Uns32T qpos){ Uns32T skip; Uns32T t2; Uns32T p1; Uns32T p2; CR_ASSERT(p); do{ t2 = *p++; if( t2 > H::t2 ) return; p1 = *p++; p2 = *p++; skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB); if( t2 == H::t2 ){ add_point_callback(calling_instance, p1 ^ (p1 & SKIP_BITS), qpos, radius); if(skip--){ add_point_callback(calling_instance, p2 ^ (p2 & SKIP_BITS), qpos, radius); while(skip-- ) add_point_callback(calling_instance, *p++, qpos, radius); } } else if(*p != LSH_CORE_ARRAY_END_ROW_TOKEN) p = p + skip; }while( *p != LSH_CORE_ARRAY_END_ROW_TOKEN ); } void G::dump_hashtable_row(bucket* p){ while(p && p->t2!=IFLAG){ sbucket* sbp = p->snext; while(sbp){ printf("(%0X,%u)", p->t2, sbp->pointID); fflush(stdout); sbp=sbp->snext; } p=p->next; } printf("\n"); } // G::serial_retrieve_point( ... ) // retrieves (pointID) from a serialized LSH database // // inputs: // filename - file name of serialized LSH database // vv - query point set // // outputs: // inserts retrieved points into add_point() callback method void G::serial_retrieve_point_set(char* filename, vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller) { int dbfid = serial_open(filename, 0); // open for read char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer serial_get_header(dbheader); // read header serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ serial_close(dbfid); error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only"); } // size of each hash table Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; calling_instance = caller; // class instance variable used in ...bucket_chain_point() add_point_callback = add_point; for(Uns32T j=0; j<L; j++){ // memory map a single hash table for random access char* db = serial_mmap(dbfid, hashTableSize, 0, align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn())); #ifdef __CYGWIN__ // No madvise in CYGWIN #else if(madvise(db, hashTableSize, MADV_RANDOM)<0) error("could not advise local hashtable memory","","madvise"); #endif SerialElementT* pe = (SerialElementT*)db ; for(Uns32T qpos=0; qpos<vv.size(); qpos++){ H::compute_hash_functions(vv[qpos]); H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row } serial_munmap(db, hashTableSize); // drop hashtable mmap } serial_close(dbfid); } void G::serial_retrieve_point(char* filename, vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){ int dbfid = serial_open(filename, 0); // open for read char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer serial_get_header(dbheader); // read header serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ serial_close(dbfid); error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only"); } // size of each hash table Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; calling_instance = caller; add_point_callback = add_point; H::compute_hash_functions(v); for(Uns32T j=0; j<L; j++){ // memory map a single hash table for random access char* db = serial_mmap(dbfid, hashTableSize, 0, align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn())); #ifdef __CYGWIN__ // No madvise in CYGWIN #else if(madvise(db, hashTableSize, MADV_RANDOM)<0) error("could not advise local hashtable memory","","madvise"); #endif SerialElementT* pe = (SerialElementT*)db ; H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row serial_munmap(db, hashTableSize); // drop hashtable mmap } serial_close(dbfid); } void G::serial_dump_tables(char* filename){ int dbfid = serial_open(filename, 0); // open for read char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer serial_get_header(dbheader); // read header serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; for(Uns32T j=0; j<L; j++){ // memory map a single hash table for random access char* db = serial_mmap(dbfid, hashTableSize, 0, align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn())); #ifdef __CYGWIN__ // No madvise in CYGWIN #else if(madvise(db, hashTableSize, MADV_SEQUENTIAL)<0) error("could not advise local hashtable memory","","madvise"); #endif SerialElementT* pe = (SerialElementT*)db ; printf("*********** TABLE %d ***************\n", j); fflush(stdout); int count=0; do{ printf("[%d,%d]", j, count++); fflush(stdout); serial_bucket_dump(pe); pe+=lshHeader->numCols; }while(pe<(SerialElementT*)db+lshHeader->numRows*lshHeader->numCols); } } void G::serial_bucket_dump(SerialElementT* pe){ SerialElementT* pend = pe+lshHeader->numCols; while( !(pe->hashValue==IFLAG || pe==pend ) ){ printf("(%0X,%u)",pe->hashValue,pe->pointID); pe++; } printf("\n"); fflush(stdout); } void G::serial_bucket_chain_point(SerialElementT* pe, Uns32T qpos){ SerialElementT* pend = pe+lshHeader->numCols; while( !(pe->hashValue==IFLAG || pe==pend ) ){ if(pe->hashValue==t2){ // new match add_point_callback(calling_instance, pe->pointID, qpos, radius); } pe++; } } void G::bucket_chain_point(bucket* p, Uns32T qpos){ if(!p || p->t2==IFLAG) return; if(p->t2==t2){ // match sbucket_chain_point(p->snext, qpos); // add to reporter } if(p->next){ bucket_chain_point(p->next, qpos); // recurse } } void G::sbucket_chain_point(sbucket* p, Uns32T qpos){ add_point_callback(calling_instance, p->pointID, qpos, radius); if(p->snext){ sbucket_chain_point(p->snext, qpos); } } void G::get_lock(int fd, bool exclusive) { struct flock lock; int status; lock.l_type = exclusive ? F_WRLCK : F_RDLCK; lock.l_whence = SEEK_SET; lock.l_start = 0; lock.l_len = 0; /* "the whole file" */ retry: do { status = fcntl(fd, F_SETLKW, &lock); } while (status != 0 && errno == EINTR); if (status) { if (errno == EAGAIN) { sleep(1); goto retry; } else { error("fcntl lock error", "", "fcntl"); } } } void G::release_lock(int fd) { struct flock lock; int status; lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; lock.l_start = 0; lock.l_len = 0; status = fcntl(fd, F_SETLKW, &lock); if (status) error("fcntl unlock error", "", "fcntl"); }