comparison lshlib.cpp @ 754:9bd13c7819ae mkc_lsh_update

Adding mkc_lsh_update branch, trunk candidate with improved LSH: merged trunk 1095 and branch multiprobe_lsh
author mas01mc
date Thu, 25 Nov 2010 13:42:40 +0000
parents 828c1c4e25cc
children
comparison
equal deleted inserted replaced
747:fbf16508421f 754:9bd13c7819ae
1 #include <vector>
2 #include <queue>
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <sys/stat.h>
7 #if defined(WIN32)
8 #include <sys/locking.h>
9 #include <io.h>
10 #include <windows.h>
11 #endif
12 #include <fcntl.h>
13 #include <string.h>
14 #include <iostream>
15 #include <fstream>
16 #include <math.h>
17 #include <sys/time.h>
18 #include <assert.h>
19 #include <float.h>
20 #include <signal.h>
21 #include <time.h>
22 #include <limits.h>
23 #include <errno.h>
24 #ifdef MT19937
25 #include "mt19937/mt19937ar.h"
26 #endif
27
28 #include "lshlib.h" 1 #include "lshlib.h"
29 2
30 #define getpagesize() (64*1024) 3 #define getpagesize() (64*1024)
31 4
32 Uns32T get_page_logn() { 5 Uns32T get_page_logn(){
33 int pagesz = (int) getpagesize(); 6 int pagesz = (int)sysconf(_SC_PAGESIZE);
34 return (Uns32T) log2((double) pagesz); 7 return (Uns32T)log2((double)pagesz);
35 } 8 }
9
10 unsigned align_up(unsigned x, unsigned w) { return (((x) + ((1<<w)-1)) & ~((1<<w)-1)); }
36 11
37 void H::error(const char* a, const char* b, const char *sysFunc) { 12 void H::error(const char* a, const char* b, const char *sysFunc) {
38 cerr << a << ": " << b << endl; 13 cerr << a << ": " << b << endl;
39 if (sysFunc) { 14 if (sysFunc) {
40 perror(sysFunc); 15 perror(sysFunc);
41 } 16 }
42 exit(1); 17 exit(1);
43 } 18 }
44 19
45 H::H(){ 20 H::H():
21 multiProbePtr(new MultiProbe()),
22 boundaryDistances(0)
23 {
46 // Delay initialization of lsh functions until we know the parameters 24 // Delay initialization of lsh functions until we know the parameters
47 } 25 }
48 26
49 H::H(Uns32T kk, Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float ww, float rr): 27 H::H(Uns32T kk, Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float ww, float rr):
50 #ifdef USE_U_FUNCTIONS 28 #ifdef USE_U_FUNCTIONS
60 k(kk), 38 k(kk),
61 m(mm), 39 m(mm),
62 L((mm*(mm-1))/2), 40 L((mm*(mm-1))/2),
63 d(dd), 41 d(dd),
64 w(ww), 42 w(ww),
65 radius(rr) 43 radius(rr),
44 multiProbePtr(new MultiProbe()),
45 boundaryDistances(0)
66 { 46 {
67 47
68 if(m<2){ 48 if(m<2){
69 m=2; 49 m=2;
70 L=1; // check value of L 50 L=1; // check value of L
162 } 142 }
163 } 143 }
164 144
165 // Storage for whole or partial function evaluation depending on USE_U_FUNCTIONS 145 // Storage for whole or partial function evaluation depending on USE_U_FUNCTIONS
166 H::initialize_partial_functions(); 146 H::initialize_partial_functions();
147
148 // MultiProbe distance functions, there are 2*k per hashtable
149 H::boundaryDistances = new float*[ H::L ]; // L x 2k boundary distances
150 assert( H::boundaryDistances ); // failure
151 for( j = 0; j < H::L ; j++ ){ // 2*k functions x_i(q)
152 H::boundaryDistances[j] = new float[ 2*H::k ];
153 assert( H::boundaryDistances[j] ); // failure
154 for( kk = 0; kk < 2*H::k ; kk++ )
155 H::boundaryDistances[j][kk] = 0.0f; // initialize with zeros
156 }
167 } 157 }
168 158
169 void H::initialize_partial_functions(){ 159 void H::initialize_partial_functions(){
170 160
171 #ifdef USE_U_FUNCTIONS 161 #ifdef USE_U_FUNCTIONS
259 delete[] H::h[ j ]; 249 delete[] H::h[ j ];
260 } 250 }
261 delete[] H::r1; 251 delete[] H::r1;
262 delete[] H::r2; 252 delete[] H::r2;
263 delete[] H::h; 253 delete[] H::h;
254
255 // MultiProbe cleanup
256 for( j = 0 ; j < H::L ; j++ )
257 delete[] H::boundaryDistances[j];
258 delete[] H::boundaryDistances;
259 delete multiProbePtr;
264 } 260 }
265 261
266 262
267 // Compute all hash functions for vector v 263 // Compute all hash functions for vector v
268 // #ifdef USE_U_FUNCTIONS use Combination of m \times h_i \in R^{(k/2) \times d} 264 // #ifdef USE_U_FUNCTIONS use Combination of m \times h_i \in R^{(k/2) \times d}
271 float iw = 1. / H::w; // hash bucket width 267 float iw = 1. / H::w; // hash bucket width
272 Uns32T aa, kk; 268 Uns32T aa, kk;
273 if( v.size() != H::d ) 269 if( v.size() != H::d )
274 error("v.size != H::d","","compute_hash_functions"); // check input vector dimensionality 270 error("v.size != H::d","","compute_hash_functions"); // check input vector dimensionality
275 double tmp = 0; 271 double tmp = 0;
276 float *pA, *pb; 272 float *pA, *pb, *bd;
277 Uns32T *pg; 273 Uns32T *pg;
278 int dd; 274 int dd;
279 vector<float>::iterator vi; 275 vector<float>::iterator vi;
280 vector<Uns32T>::iterator ui; 276 vector<Uns32T>::iterator ui;
281 277
314 *pg++ = *ui++; // hash function g_j(v)=[x(k/2+1) x(k/2+2) ... xk]; xk \in Z 310 *pg++ = *ui++; // hash function g_j(v)=[x(k/2+1) x(k/2+2) ... xk]; xk \in Z
315 } 311 }
316 #else 312 #else
317 for( aa=0; aa < H::L ; aa++ ){ 313 for( aa=0; aa < H::L ; aa++ ){
318 pg= *( H::g + aa ); // L \times functions g_j(v) \in Z^k 314 pg= *( H::g + aa ); // L \times functions g_j(v) \in Z^k
315 bd= *( H::boundaryDistances + aa);
319 for( kk = 0 ; kk != H::k ; kk++ ){ 316 for( kk = 0 ; kk != H::k ; kk++ ){
320 pb = *( H::b + aa ) + kk; 317 pb = *( H::b + aa ) + kk;
321 pA = * ( * ( H::A + aa ) + kk ); 318 pA = * ( * ( H::A + aa ) + kk );
322 dd = H::d; 319 dd = H::d;
323 tmp = 0.; 320 tmp = 0.;
324 vi = v.begin(); 321 vi = v.begin();
325 while( dd-- ) 322 while( dd-- )
326 tmp += *pA++ * *vi++; // project 323 tmp += *pA++ * *vi++; // project
327 tmp += *pb; // translate 324 tmp += *pb; // translate
328 tmp *= iw; // scale 325 tmp *= iw; // scale
329 *pg++ = (Uns32T) (floor(tmp)); // hash function g_j(v)=[x1 x2 ... xk]; xk \in Z 326 tmp = floor(tmp); // handle negative values
327 while(tmp<0) // wrap around 0 to N
328 tmp += H::N;
329 *pg = (Uns32T) tmp; // hash function g_j(v)=[x1 x2 ... xk]; xk \in Z
330 *bd = (tmp - *pg++);//*w; // boundary distance -1
331 *(bd+1) = (1.0f - *bd); //*w; // boundary distance +1
332 bd+=2;
330 } 333 }
331 } 334 }
332 #endif 335 #endif
333 } 336 }
334 337
335 // make hash value \in Z 338 // make hash value \in Z
336 void H::generate_hash_keys(Uns32T*g, Uns32T* r1, Uns32T* r2){ 339 void H::generate_hash_keys(Uns32T*g, Uns32T* r1, Uns32T* r2){
337 H::t1 = computeProductModDefaultPrime( g, r1, H::k ) % H::N; 340 H::t1 = computeProductModDefaultPrime( g, r1, H::k ) % H::N;
338 H::t2 = computeProductModDefaultPrime( g, r2, H::k ); 341 H::t2 = computeProductModDefaultPrime( g, r2, H::k );
339 } 342 }
343
344 // make hash value by purturbating the given hash functions
345 // according the the boundary distances of the current query
346 void H::generate_multiprobe_keys(Uns32T*g, Uns32T* r1, Uns32T* r2){
347 assert(!multiProbePtr->empty()); // Test this for now, until all is stable
348 Uns32T* mpg = new Uns32T[H::k]; // temporary array storage
349
350 // Copy the hash bucket identifiers
351 Uns32T* mpgPtr = mpg;
352 Uns32T kk = H::k;
353 while(kk--)
354 *mpgPtr++ = *g++;
355
356 // Retrieve the next purturbation set
357 perturbation_set ps = multiProbePtr->getNextPerturbationSet();
358 perturbation_set::iterator it = ps.begin();
359
360 // Perturbate the hash functions g
361 while( it != ps.end() ){
362 *(mpg + multiProbePtr->getIndex(it)) += multiProbePtr->getBoundary(it);
363 it++;
364 }
365
366 H::t1 = computeProductModDefaultPrime( mpg, r1, H::k ) % H::N;
367 H::t2 = computeProductModDefaultPrime( mpg, r2, H::k );
368
369 delete[] mpg; // free up temporary storage
370 }
371
340 372
341 #define CR_ASSERT(b){if(!(b)){fprintf(stderr, "ASSERT failed on line %d, file %s.\n", __LINE__, __FILE__); exit(1);}} 373 #define CR_ASSERT(b){if(!(b)){fprintf(stderr, "ASSERT failed on line %d, file %s.\n", __LINE__, __FILE__); exit(1);}}
342 374
343 // Computes (a.b) mod UH_PRIME_DEFAULT 375 // Computes (a.b) mod UH_PRIME_DEFAULT
344 inline Uns32T H::computeProductModDefaultPrime(Uns32T *a, Uns32T *b, IntT size){ 376 inline Uns32T H::computeProductModDefaultPrime(Uns32T *a, Uns32T *b, IntT size){
397 429
398 if(p->t2 == H::t2){ 430 if(p->t2 == H::t2){
399 __sbucket_insert_point(p->snext.ptr); 431 __sbucket_insert_point(p->snext.ptr);
400 return; 432 return;
401 } 433 }
402 434
403 if(p->next){ 435 // insert bucket before current bucket
404 // Construct list in t2 order 436 if(H::t2 < p->t2){
405 if(H::t2 < p->next->t2){
406 bucket* tmp = new bucket(); 437 bucket* tmp = new bucket();
438 // copy current bucket contents into new bucket
407 tmp->next = p->next; 439 tmp->next = p->next;
440 tmp->t2 = p->t2;
441 tmp->snext.ptr = p->snext.ptr;
408 p->next = tmp; 442 p->next = tmp;
409 __bucket_insert_point(tmp); 443 p->t2 = IFLAG;
410 } 444 p->snext.ptr=0;
411 else 445 __bucket_insert_point(p);
412 __bucket_insert_point(p->next); 446 return;
413 } 447 }
448
449 if(p->next)
450 __bucket_insert_point(p->next);
414 else { 451 else {
415 p->next = new bucket(); 452 p->next = new bucket();
416 __bucket_insert_point(p->next); 453 __bucket_insert_point(p->next);
417 } 454 }
418 } 455 }
469 calling_instance(0), 506 calling_instance(0),
470 add_point_callback(0) 507 add_point_callback(0)
471 { 508 {
472 FILE* dbFile = 0; 509 FILE* dbFile = 0;
473 int dbfid = unserialize_lsh_header(filename); 510 int dbfid = unserialize_lsh_header(filename);
511
474 indexName = new char[O2_INDEX_MAXSTR]; 512 indexName = new char[O2_INDEX_MAXSTR];
475 strncpy(indexName, filename, O2_INDEX_MAXSTR); // COPY THE CONTENTS TO THE NEW POINTER 513 strncpy(indexName, filename, O2_INDEX_MAXSTR); // COPY THE CONTENTS TO THE NEW POINTER
476 H::initialize_lsh_functions(); // Base-class data-structure initialization 514 H::initialize_lsh_functions(); // Base-class data-structure initialization
477 unserialize_lsh_functions(dbfid); // populate with on-disk hashfunction values 515 unserialize_lsh_functions(dbfid); // populate with on-disk hashfunction values
478 516
523 insert_point(vv[point], basePointID+point); 561 insert_point(vv[point], basePointID+point);
524 } 562 }
525 563
526 // point retrieval routine 564 // point retrieval routine
527 void G::retrieve_point(vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){ 565 void G::retrieve_point(vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){
566 // assert(LSH_MULTI_PROBE_COUNT);
528 calling_instance = caller; 567 calling_instance = caller;
529 add_point_callback = add_point; 568 add_point_callback = add_point;
530 H::compute_hash_functions( v ); 569 H::compute_hash_functions( v );
531 for(Uns32T j = 0 ; j < H::L ; j++ ){ 570 for(Uns32T j = 0 ; j < H::L ; j++ ){
532 H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) ); 571 // MultiProbe loop
533 if( bucket* bPtr = *(get_bucket(j) + get_t1()) ) { 572 multiProbePtr->generatePerturbationSets( *( H::boundaryDistances + j ) , 2*H::k, (unsigned)LSH_MULTI_PROBE_COUNT);
573 for(Uns32T multiProbeIdx = 0 ; multiProbeIdx < multiProbePtr->size()+1 ; multiProbeIdx++ ){
574 if(!multiProbeIdx)
575 H::generate_hash_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) );
576 else
577 H::generate_multiprobe_keys( *( H::g + j ), *( H::r1 + j ), *( H::r2 + j ) );
578 if( bucket* bPtr = *(get_bucket(j) + get_t1()) ) {
534 #ifdef LSH_LIST_HEAD_COUNTERS 579 #ifdef LSH_LIST_HEAD_COUNTERS
535 if(bPtr->t2&LSH_CORE_ARRAY_BIT) { 580 if(bPtr->t2&LSH_CORE_ARRAY_BIT) {
536 retrieve_from_core_hashtable_array((Uns32T*)(bPtr->next), qpos); 581 retrieve_from_core_hashtable_array((Uns32T*)(bPtr->next), qpos);
537 } else { 582 } else {
538 bucket_chain_point( bPtr->next, qpos); 583 bucket_chain_point( bPtr->next, qpos);
584 }
585 #else
586 bucket_chain_point( bPtr , qpos);
587 #endif
539 } 588 }
540 #else
541 bucket_chain_point( bPtr , qpos);
542 #endif
543 } 589 }
544 } 590 }
545 } 591 }
546 592
547 void G::retrieve_point_set(vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller){ 593 void G::retrieve_point_set(vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller){
615 // O2_SERIAL_TOKEN_T2 = 0xFFFFFFFDU 661 // O2_SERIAL_TOKEN_T2 = 0xFFFFFFFDU
616 // O2_SERIAL_TOKEN_ENDTABLE = 0xFFFFFFFEU 662 // O2_SERIAL_TOKEN_ENDTABLE = 0xFFFFFFFEU
617 // 663 //
618 // T1 - T1 hash token 664 // T1 - T1 hash token
619 // t1 - t1 hash key (t1 range 0..2^29-1) 665 // t1 - t1 hash key (t1 range 0..2^29-1)
666 // %buckets+points% numElements in row for ARRAY encoding
620 // T2 - T2 token 667 // T2 - T2 token
621 // t2 - t2 hash key (range 1..2^32-6) 668 // t2 - t2 hash key (range 1..2^32-6)
622 // p - point identifier (range 0..2^32-1) 669 // p - point identifier (range 0..2^32-1)
623 // E - end hash table token 670 // E - end hash table token
624 // {...} required arguments 671 // {...} required arguments
626 // * - match zero or more occurences 673 // * - match zero or more occurences
627 // + - match one or more occurences 674 // + - match one or more occurences
628 // {...}^L - repeat argument L times 675 // {...}^L - repeat argument L times
629 // 676 //
630 // FORMAT2 Regular expression: 677 // FORMAT2 Regular expression:
631 // { [T1 t1 [T2 t2 p+]+ ]* E }^L 678 // { [T1 t1 %buckets+points% [T2 t2 p+]+ ]* E }^L
632 // 679 //
633 680
634 // Serial header constructors 681 // Serial header constructors
635 SerialHeader::SerialHeader(){;} 682 SerialHeader::SerialHeader(){;}
636 SerialHeader::SerialHeader(float W, Uns32T L, Uns32T N, Uns32T C, Uns32T k, Uns32T d, float r, Uns32T p, Uns32T FMT, Uns32T pc): 683 SerialHeader::SerialHeader(float W, Uns32T L, Uns32T N, Uns32T C, Uns32T k, Uns32T d, float r, Uns32T p, Uns32T FMT, Uns32T pc):
679 return 0; 726 return 0;
680 } 727 }
681 728
682 void G::serialize(char* filename, Uns32T serialFormat){ 729 void G::serialize(char* filename, Uns32T serialFormat){
683 int dbfid; 730 int dbfid;
731 char* db;
684 int dbIsNew=0; 732 int dbIsNew=0;
685 FILE* dbFile = 0; 733 FILE* dbFile = 0;
686 // Check requested serialFormat 734 // Check requested serialFormat
687 if(!(serialFormat==O2_SERIAL_FILEFORMAT1 || serialFormat==O2_SERIAL_FILEFORMAT2)) 735 if(!(serialFormat==O2_SERIAL_FILEFORMAT1 || serialFormat==O2_SERIAL_FILEFORMAT2))
688 error("Unrecognized serial file format request: ", "serialize()"); 736 error("Unrecognized serial file format request: ", "serialize()");
702 } 750 }
703 } 751 }
704 752
705 // Load the on-disk header into core 753 // Load the on-disk header into core
706 dbfid = serial_open(filename, 1); // open for write 754 dbfid = serial_open(filename, 1); // open for write
707 serial_get_header(dbfid); // read header 755 db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer
756 serial_get_header(db); // read header
757 serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap
708 758
709 // Check compatibility of core and disk data structures 759 // Check compatibility of core and disk data structures
710 if( !serial_can_merge(serialFormat) ) 760 if( !serial_can_merge(serialFormat) )
711 error("Incompatible core and serial LSH, data structure dimensions mismatch."); 761 error("Incompatible core and serial LSH, data structure dimensions mismatch.");
712 762
723 serialize_lsh_hashtables_format2(dbFile, !dbIsNew); 773 serialize_lsh_hashtables_format2(dbFile, !dbIsNew);
724 fflush(dbFile); 774 fflush(dbFile);
725 } 775 }
726 776
727 if(!dbIsNew) { 777 if(!dbIsNew) {
778 db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer
779 //serial_get_header(db); // read header
728 cout << "maxp = " << H::maxp << endl; 780 cout << "maxp = " << H::maxp << endl;
729 lshHeader->maxp=H::maxp; 781 lshHeader->maxp=H::maxp;
730 // Default to FILEFORMAT1 782 // Default to FILEFORMAT1
731 if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2)) 783 if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2))
732 lshHeader->flags|=O2_SERIAL_FILEFORMAT1; 784 lshHeader->flags|=O2_SERIAL_FILEFORMAT1;
733 serial_write_header(dbfid, lshHeader); 785 memcpy((char*)db, (char*)lshHeader, sizeof(SerialHeaderT));
786 serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap
734 } 787 }
735 serial_close(dbfid); 788 serial_close(dbfid);
736 if(dbFile){ 789 if(dbFile){
737 fclose(dbFile); 790 fclose(dbFile);
738 dbFile = 0; 791 dbFile = 0;
774 int G::serialize_lsh_hashfunctions(int fid){ 827 int G::serialize_lsh_hashfunctions(int fid){
775 float* pf; 828 float* pf;
776 Uns32T *pu; 829 Uns32T *pu;
777 Uns32T x,y,z; 830 Uns32T x,y,z;
778 831
779 void *db = calloc(get_serial_hashtable_offset() - O2_SERIAL_HEADER_SIZE, 1); 832 char* db = serial_mmap(fid, get_serial_hashtable_offset(), 1);// get database pointer
780 pf = (float *) db; 833 pf = get_serial_hashfunction_base(db);
834
781 // HASH FUNCTIONS 835 // HASH FUNCTIONS
782 // Write the random projectors A[][][] 836 // Write the random projectors A[][][]
783 #ifdef USE_U_FUNCTIONS 837 #ifdef USE_U_FUNCTIONS
784 for( x = 0 ; x < H::m ; x++ ) 838 for( x = 0 ; x < H::m ; x++ )
785 for( y = 0 ; y < H::k/2 ; y++ ) 839 for( y = 0 ; y < H::k/2 ; y++ )
810 // Write the Z projectors r2[][] 864 // Write the Z projectors r2[][]
811 for( x = 0 ; x < H::L ; x++) 865 for( x = 0 ; x < H::L ; x++)
812 for( y = 0; y < H::k ; y++) 866 for( y = 0; y < H::k ; y++)
813 *pu++ = H::r2[x][y]; 867 *pu++ = H::r2[x][y];
814 868
815 off_t cur = lseek(fid, 0, SEEK_CUR); 869 serial_munmap(db, get_serial_hashtable_offset());
816 lseek(fid, O2_SERIAL_HEADER_SIZE, SEEK_SET);
817 write(fid, db, get_serial_hashtable_offset() - O2_SERIAL_HEADER_SIZE);
818 lseek(fid, cur, SEEK_SET);
819
820 free(db);
821
822 return 1; 870 return 1;
823 }
824
825 void G::serial_get_table(int fd, int nth, void *buf, size_t count) {
826 off_t cur = lseek(fd, 0, SEEK_CUR);
827 /* FIXME: if hashTableSize isn't bigger than a page, this loses. */
828 lseek(fd, align_up(get_serial_hashtable_offset() + nth * count, get_page_logn()), SEEK_SET);
829 read(fd, buf, count);
830 lseek(fd, cur, SEEK_SET);
831 }
832
833 void G::serial_write_table(int fd, int nth, void *buf, size_t count) {
834 off_t cur = lseek(fd, 0, SEEK_CUR);
835 /* FIXME: see the comment in serial_get_table() */
836 lseek(fd, align_up(get_serial_hashtable_offset() + nth * count, get_page_logn()), SEEK_SET);
837 write(fd, buf, count);
838 lseek(fd, cur, SEEK_SET);
839 } 871 }
840 872
841 int G::serialize_lsh_hashtables_format1(int fid, int merge){ 873 int G::serialize_lsh_hashtables_format1(int fid, int merge){
842 SerialElementT *pe, *pt; 874 SerialElementT *pe, *pt;
843 Uns32T x,y; 875 Uns32T x,y;
844 876
845 if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT1) ) 877 if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT1) )
846 error("Cannot merge core and serial LSH, data structure dimensions mismatch."); 878 error("Cannot merge core and serial LSH, data structure dimensions mismatch.");
847 879
880 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
848 Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount; 881 Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount;
849 size_t hashTableSize = sizeof(SerialElementT) * lshHeader->numRows * lshHeader->numCols;
850 pt = (SerialElementT *) malloc(hashTableSize);
851 // Write the hash tables 882 // Write the hash tables
852 for( x = 0 ; x < H::L ; x++ ){ 883 for( x = 0 ; x < H::L ; x++ ){
853 std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT1..."; 884 std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT1...";
854 std::cout.flush(); 885 std::cout.flush();
855 // read a hash table's data from disk 886 // memory map a single hash table for sequential access
856 serial_get_table(fid, x, pt, hashTableSize); 887 // Align each hash table to page boundary
888 char* dbtable = serial_mmap(fid, hashTableSize, 1,
889 align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn()));
890 #ifdef __CYGWIN__
891 // No madvise in CYGWIN
892 #else
893 if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0)
894 error("could not advise hashtable memory","","madvise");
895 #endif
857 maxColCount=0; 896 maxColCount=0;
858 minColCount=O2_SERIAL_MAX_COLS; 897 minColCount=O2_SERIAL_MAX_COLS;
859 meanColCount=0; 898 meanColCount=0;
860 colCountN=0; 899 colCountN=0;
900 pt=(SerialElementT*)dbtable;
861 for( y = 0 ; y < H::N ; y++ ){ 901 for( y = 0 ; y < H::N ; y++ ){
862 // Move disk pointer to beginning of row 902 // Move disk pointer to beginning of row
863 pe=pt+y*lshHeader->numCols; 903 pe=pt+y*lshHeader->numCols;
864 904
865 colCount=0; 905 colCount=0;
887 } 927 }
888 if(colCountN) 928 if(colCountN)
889 std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 929 std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN
890 << ", min = " << minColCount << ", max = " << maxColCount 930 << ", min = " << minColCount << ", max = " << maxColCount
891 << endl; 931 << endl;
892 serial_write_table(fid, x, pt, hashTableSize); 932 serial_munmap(dbtable, hashTableSize);
893 } 933 }
894 934
895 // We're done writing 935 // We're done writing
896 free(pt);
897 return 1; 936 return 1;
898 } 937 }
899 938
900 void G::serial_merge_hashtable_row_format1(SerialElementT* pr, bucket* b, Uns32T& colCount){ 939 void G::serial_merge_hashtable_row_format1(SerialElementT* pr, bucket* b, Uns32T& colCount){
901 while(b && b->t2!=IFLAG){ 940 while(b && b->t2!=IFLAG){
975 std::cout.flush(); 1014 std::cout.flush();
976 maxColCount=0; 1015 maxColCount=0;
977 minColCount=O2_SERIAL_MAX_COLS; 1016 minColCount=O2_SERIAL_MAX_COLS;
978 meanColCount=0; 1017 meanColCount=0;
979 colCountN=0; 1018 colCountN=0;
1019 H::tablesPointCount = 0;
980 for( y = 0 ; y < H::N ; y++ ){ 1020 for( y = 0 ; y < H::N ; y++ ){
981 colCount=0; 1021 colCount=0;
982 if(bucket* bPtr = h[x][y]){ 1022 if(bucket* bPtr = h[x][y]){
983 // Check for empty row (even though row was allocated) 1023 // Check for empty row (even though row was allocated)
984 #ifdef LSH_LIST_HEAD_COUNTERS 1024 #ifdef LSH_LIST_HEAD_COUNTERS
1012 if(colCount>maxColCount) 1052 if(colCount>maxColCount)
1013 maxColCount=colCount; 1053 maxColCount=colCount;
1014 meanColCount+=colCount; 1054 meanColCount+=colCount;
1015 colCountN++; 1055 colCountN++;
1016 } 1056 }
1057 H::tablesPointCount+=colCount;
1017 } 1058 }
1018 // Write END of table marker 1059 // Write END of table marker
1019 t1 = O2_SERIAL_TOKEN_ENDTABLE; 1060 t1 = O2_SERIAL_TOKEN_ENDTABLE;
1020 WRITE_UNS32(&t1,"[end]"); 1061 WRITE_UNS32(&t1,"[end]");
1021 if(colCountN) 1062 if(colCountN)
1022 std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 1063 std::cout << "#points: " << H::tablesPointCount << " #rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN
1023 << ", min = " << minColCount << ", max = " << maxColCount 1064 << ", min = " << minColCount << ", max = " << maxColCount
1024 << endl; 1065 << endl;
1025 } 1066 }
1026 // We're done writing 1067 // We're done writing
1027 return 1; 1068 return 1;
1062 } 1103 }
1063 return point_count; 1104 return point_count;
1064 } 1105 }
1065 1106
1066 void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){ 1107 void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){
1108 #ifdef _LSH_DEBUG_
1109 Uns32T last_t2 = 0;
1110 #endif
1067 while(b && b->t2!=IFLAG){ 1111 while(b && b->t2!=IFLAG){
1068 if(!b->snext.ptr){ 1112 if(!b->snext.ptr){
1069 fclose(dbFile); 1113 fclose(dbFile);
1070 error("Empty collision chain in serial_write_hashtable_row_format2()"); 1114 error("Empty collision chain in serial_write_hashtable_row_format2()");
1071 } 1115 }
1072 t2 = O2_SERIAL_TOKEN_T2; 1116 t2 = O2_SERIAL_TOKEN_T2;
1117
1073 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){ 1118 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
1074 fclose(dbFile); 1119 fclose(dbFile);
1075 error("write error in serial_write_hashtable_row_format2()"); 1120 error("write error in serial_write_hashtable_row_format2()");
1076 } 1121 }
1077 t2 = b->t2; 1122 t2 = b->t2;
1123 #ifdef _LSH_DEBUG_
1124 if(t2 < last_t2){
1125 fclose(dbFile);
1126 error("t2<last_t2 in serial_write_hashtable_row_format2()");
1127 }
1128 last_t2 = t2;
1129 #endif
1130
1078 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){ 1131 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
1079 fclose(dbFile); 1132 fclose(dbFile);
1080 error("write error in serial_write_hashtable_row_format2()"); 1133 error("write error in serial_write_hashtable_row_format2()");
1081 } 1134 }
1082 serial_write_element_format2(dbFile, b->snext.ptr, colCount); 1135 serial_write_element_format2(dbFile, b->snext.ptr, colCount);
1134 error("lseek error in db file", "", "lseek"); 1187 error("lseek error in db file", "", "lseek");
1135 1188
1136 // write a dummy byte at the last location 1189 // write a dummy byte at the last location
1137 if (write (dbfid, "", 1) != 1) 1190 if (write (dbfid, "", 1) != 1)
1138 error("write error", "", "write"); 1191 error("write error", "", "write");
1139 1192
1140 serial_write_header(dbfid, lshHeader); 1193 char* db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);
1194
1195 memcpy (db, lshHeader, O2_SERIAL_HEADER_SIZE);
1196
1197 serial_munmap(db, O2_SERIAL_HEADER_SIZE);
1198
1199
1141 close(dbfid); 1200 close(dbfid);
1142 1201
1143 std::cout << "done initializing tables." << endl; 1202 std::cout << "done initializing tables." << endl;
1144 1203
1145 return 1; 1204 return 1;
1146 } 1205 }
1147 1206
1148 SerialHeaderT* G::serial_get_header(int fd) { 1207 char* G::serial_mmap(int dbfid, Uns32T memSize, Uns32T forWrite, off_t offset){
1149 off_t cur = lseek(fd, 0, SEEK_CUR); 1208 char* db;
1209 if(forWrite){
1210 if ((db = (char*) mmap(0, memSize, PROT_READ | PROT_WRITE,
1211 MAP_SHARED, dbfid, offset)) == (caddr_t) -1)
1212 error("mmap error in request for writable serialized database", "", "mmap");
1213 }
1214 else if ((db = (char*) mmap(0, memSize, PROT_READ, MAP_SHARED, dbfid, offset)) == (caddr_t) -1)
1215 error("mmap error in read-only serialized database", "", "mmap");
1216
1217 return db;
1218 }
1219
1220 SerialHeaderT* G::serial_get_header(char* db){
1150 lshHeader = new SerialHeaderT(); 1221 lshHeader = new SerialHeaderT();
1151 lseek(fd, 0, SEEK_SET); 1222 memcpy((char*)lshHeader, db, sizeof(SerialHeaderT));
1152 if(read(fd, lshHeader, sizeof(SerialHeaderT)) != (ssize_t) (sizeof(SerialHeaderT)))
1153 error("Bad return from read");
1154 1223
1155 if(lshHeader->lshMagic!=O2_SERIAL_MAGIC) 1224 if(lshHeader->lshMagic!=O2_SERIAL_MAGIC)
1156 error("Not an LSH database file"); 1225 error("Not an LSH database file");
1157 lseek(fd, cur, SEEK_SET); 1226
1158 return lshHeader; 1227 return lshHeader;
1159 } 1228 }
1160 1229
1161 void G::serial_write_header(int fd, SerialHeaderT *header) { 1230 void G::serial_munmap(char* db, Uns32T N){
1162 off_t cur = lseek(fd, 0, SEEK_CUR); 1231 munmap(db, N);
1163 lseek(fd, 0, SEEK_SET);
1164 if(write(fd, header, sizeof(SerialHeaderT)) != (ssize_t) (sizeof(SerialHeaderT)))
1165 error("Bad return from write");
1166 lseek(fd, cur, SEEK_SET);
1167 } 1232 }
1168 1233
1169 int G::serial_open(char* filename, int writeFlag){ 1234 int G::serial_open(char* filename, int writeFlag){
1170 int dbfid; 1235 int dbfid;
1171 if(writeFlag){ 1236 if(writeFlag){
1187 release_lock(dbfid); 1252 release_lock(dbfid);
1188 close(dbfid); 1253 close(dbfid);
1189 } 1254 }
1190 1255
1191 int G::unserialize_lsh_header(char* filename){ 1256 int G::unserialize_lsh_header(char* filename){
1192
1193 int dbfid; 1257 int dbfid;
1258 char* db;
1194 // Test to see if file exists 1259 // Test to see if file exists
1195 if((dbfid = open (filename, O_RDONLY)) < 0) 1260 if((dbfid = open (filename, O_RDONLY)) < 0)
1196 error("Can't open the file", filename, "open"); 1261 error("Can't open the file", filename, "open");
1197 close(dbfid); 1262 close(dbfid);
1198 dbfid = serial_open(filename, 0); // open for read 1263 dbfid = serial_open(filename, 0); // open for read
1199 serial_get_header(dbfid); 1264 db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1265 serial_get_header(db); // read header
1266 serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap
1200 1267
1201 // Unserialize header parameters 1268 // Unserialize header parameters
1202 H::L = lshHeader->numTables; 1269 H::L = lshHeader->numTables;
1203 H::m = (Uns32T)( (1.0 + sqrt(1 + 8.0*(int)H::L)) / 2.0); 1270 H::m = (Uns32T)( (1.0 + sqrt(1 + 8.0*(int)H::L)) / 2.0);
1204 H::N = lshHeader->numRows; 1271 H::N = lshHeader->numRows;
1220 Uns32T j, kk; 1287 Uns32T j, kk;
1221 float* pf; 1288 float* pf;
1222 Uns32T* pu; 1289 Uns32T* pu;
1223 1290
1224 // Load the hash functions into core 1291 // Load the hash functions into core
1225 off_t cur = lseek(dbfid, 0, SEEK_CUR); 1292 char* db = serial_mmap(dbfid, get_serial_hashtable_offset(), 0);// get database pointer again
1226 void *db = malloc(get_serial_hashtable_offset() - O2_SERIAL_HEADER_SIZE); 1293
1227 lseek(dbfid, O2_SERIAL_HEADER_SIZE, SEEK_SET); 1294 pf = get_serial_hashfunction_base(db);
1228 read(dbfid, db, get_serial_hashtable_offset() - O2_SERIAL_HEADER_SIZE);
1229 lseek(dbfid, cur, SEEK_SET);
1230 pf = (float *)db;
1231 1295
1232 #ifdef USE_U_FUNCTIONS 1296 #ifdef USE_U_FUNCTIONS
1233 for( j = 0 ; j < H::m ; j++ ){ // L functions gj(v) 1297 for( j = 0 ; j < H::m ; j++ ){ // L functions gj(v)
1234 for( kk = 0 ; kk < H::k/2 ; kk++ ){ // Normally distributed hash functions 1298 for( kk = 0 ; kk < H::k/2 ; kk++ ){ // Normally distributed hash functions
1235 #else 1299 #else
1236 for( j = 0 ; j < H::L ; j++ ){ // L functions gj(v) 1300 for( j = 0 ; j < H::L ; j++ ){ // L functions gj(v)
1237 for( kk = 0 ; kk < H::k ; kk++ ){ // Normally distributed hash functions 1301 for( kk = 0 ; kk < H::k ; kk++ ){ // Normally distributed hash functions
1238 #endif 1302 #endif
1239 for(Uns32T i = 0 ; i < H::d ; i++ ) 1303 for(Uns32T i = 0 ; i < H::d ; i++ )
1240 H::A[j][kk][i] = *pf++; // Normally distributed random vectors 1304 H::A[j][kk][i] = *pf++; // Normally distributed random vectors
1241 } 1305 }
1242 } 1306 }
1243 #ifdef USE_U_FUNCTIONS 1307 #ifdef USE_U_FUNCTIONS
1244 for( j = 0 ; j < H::m ; j++ ) // biases b 1308 for( j = 0 ; j < H::m ; j++ ) // biases b
1245 for( kk = 0 ; kk < H::k/2 ; kk++ ) 1309 for( kk = 0 ; kk < H::k/2 ; kk++ )
1246 #else 1310 #else
1247 for( j = 0 ; j < H::L ; j++ ) // biases b 1311 for( j = 0 ; j < H::L ; j++ ) // biases b
1248 for( kk = 0 ; kk < H::k ; kk++ ) 1312 for( kk = 0 ; kk < H::k ; kk++ )
1249 #endif 1313 #endif
1250 H::b[j][kk] = *pf++; 1314 H::b[j][kk] = *pf++;
1251 1315
1252 pu = (Uns32T*)pf; 1316 pu = (Uns32T*)pf;
1253 for( j = 0 ; j < H::L ; j++ ) // Z projectors r1 1317 for( j = 0 ; j < H::L ; j++ ) // Z projectors r1
1254 for( kk = 0 ; kk < H::k ; kk++ ) 1318 for( kk = 0 ; kk < H::k ; kk++ )
1255 H::r1[j][kk] = *pu++; 1319 H::r1[j][kk] = *pu++;
1256 1320
1257 for( j = 0 ; j < H::L ; j++ ) // Z projectors r2 1321 for( j = 0 ; j < H::L ; j++ ) // Z projectors r2
1258 for( kk = 0 ; kk < H::k ; kk++ ) 1322 for( kk = 0 ; kk < H::k ; kk++ )
1259 H::r2[j][kk] = *pu++; 1323 H::r2[j][kk] = *pu++;
1260 1324
1261 free(db); 1325 serial_munmap(db, get_serial_hashtable_offset());
1262 } 1326 }
1263 1327
1264 void G::unserialize_lsh_hashtables_format1(int fid){ 1328 void G::unserialize_lsh_hashtables_format1(int fid){
1265 SerialElementT *pe, *pt; 1329 SerialElementT *pe, *pt;
1266 Uns32T x,y; 1330 Uns32T x,y;
1267 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1331 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
1268 pt = (SerialElementT *) malloc(hashTableSize);
1269 // Read the hash tables into core 1332 // Read the hash tables into core
1270 for( x = 0 ; x < H::L ; x++ ){ 1333 for( x = 0 ; x < H::L ; x++ ){
1271 // memory map a single hash table 1334 // memory map a single hash table
1272 // Align each hash table to page boundary 1335 // Align each hash table to page boundary
1273 serial_get_table(fid, x, pt, hashTableSize); 1336 char* dbtable = serial_mmap(fid, hashTableSize, 0,
1337 align_up(get_serial_hashtable_offset()+x*hashTableSize, get_page_logn()));
1338 #ifdef __CYGWIN__
1339 // No madvise in CYGWIN
1340 #else
1341 if(madvise(dbtable, hashTableSize, MADV_SEQUENTIAL)<0)
1342 error("could not advise hashtable memory","","madvise");
1343 #endif
1344 pt=(SerialElementT*)dbtable;
1274 for( y = 0 ; y < H::N ; y++ ){ 1345 for( y = 0 ; y < H::N ; y++ ){
1275 // Move disk pointer to beginning of row 1346 // Move disk pointer to beginning of row
1276 pe=pt+y*lshHeader->numCols; 1347 pe=pt+y*lshHeader->numCols;
1277 unserialize_hashtable_row_format1(pe, h[x]+y); 1348 unserialize_hashtable_row_format1(pe, h[x]+y);
1278 #ifdef LSH_DUMP_CORE_TABLES 1349 #ifdef LSH_DUMP_CORE_TABLES
1280 serial_bucket_dump(pe); 1351 serial_bucket_dump(pe);
1281 printf("C[%d,%d]", x, y); 1352 printf("C[%d,%d]", x, y);
1282 dump_hashtable_row(h[x][y]); 1353 dump_hashtable_row(h[x][y]);
1283 #endif 1354 #endif
1284 } 1355 }
1356 serial_munmap(dbtable, hashTableSize);
1285 } 1357 }
1286 free(pt);
1287 } 1358 }
1288 1359
1289 void G::unserialize_hashtable_row_format1(SerialElementT* pe, bucket** b){ 1360 void G::unserialize_hashtable_row_format1(SerialElementT* pe, bucket** b){
1290 Uns32T colCount = 0; 1361 Uns32T colCount = 0;
1291 while(colCount!=lshHeader->numCols && pe->hashValue !=IFLAG){ 1362 while(colCount!=lshHeader->numCols && pe->hashValue !=IFLAG){
1297 } 1368 }
1298 } 1369 }
1299 1370
1300 void G::unserialize_lsh_hashtables_format2(FILE* dbFile, bool forMerge){ 1371 void G::unserialize_lsh_hashtables_format2(FILE* dbFile, bool forMerge){
1301 Uns32T x=0,y=0; 1372 Uns32T x=0,y=0;
1302 1373 #ifdef _LSH_DEBUG_
1374 cout << "Loading hashtables..." << endl;
1375 cout << "header pointCount = " << pointCount << endl;
1376 cout << "forMerge = " << forMerge << endl;
1377 Uns32T sumTablesPointCount = 0;
1378 #endif
1303 // Seek to hashtable base offset 1379 // Seek to hashtable base offset
1304 if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){ 1380 if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
1305 fclose(dbFile); 1381 fclose(dbFile);
1306 error("fSeek error in unserialize_lsh_hashtables_format2"); 1382 error("fSeek error in unserialize_lsh_hashtables_format2");
1307 } 1383 }
1308 1384
1309 // Read the hash tables into core (structure is given in header) 1385 // Read the hash tables into core (structure is given in header)
1310 while( x < H::L){ 1386 while( x < H::L){
1387 tablesPointCount=0;
1311 if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){ 1388 if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){
1312 fclose(dbFile); 1389 fclose(dbFile);
1313 error("Read error","unserialize_lsh_hashtables_format2()"); 1390 error("Read error","unserialize_lsh_hashtables_format2()");
1314 } 1391 }
1315 if(H::t1==O2_SERIAL_TOKEN_ENDTABLE) 1392 if(H::t1==O2_SERIAL_TOKEN_ENDTABLE)
1335 Uns32T numElements; 1412 Uns32T numElements;
1336 if(fread(&numElements, sizeof(Uns32T), 1, dbFile) != 1){ 1413 if(fread(&numElements, sizeof(Uns32T), 1, dbFile) != 1){
1337 fclose(dbFile); 1414 fclose(dbFile);
1338 error("Read error: numElements","unserialize_lsh_hashtables_format2()"); 1415 error("Read error: numElements","unserialize_lsh_hashtables_format2()");
1339 } 1416 }
1340 1417
1418 /*
1419 #ifdef _LSH_DEBUG_
1420 cout << "[" << x << "," << y << "] numElements(disk) = " << numElements;
1421 #endif
1422 */
1341 // BACKWARD COMPATIBILITY: check to see if T2 or END token was read 1423 // BACKWARD COMPATIBILITY: check to see if T2 or END token was read
1342 if(numElements==O2_SERIAL_TOKEN_T2 || numElements==O2_SERIAL_TOKEN_ENDTABLE ){ 1424 if(numElements==O2_SERIAL_TOKEN_T2 || numElements==O2_SERIAL_TOKEN_ENDTABLE ){
1343 forMerge=true; // Force use of dynamic linked list core format 1425 forMerge=true; // Force use of dynamic linked list core format
1344 token = numElements; 1426 token = numElements;
1345 } 1427 }
1346 1428
1347 if(forMerge) 1429 if(forMerge) // FOR INDEXING use dynamic linked list data structure
1348 // Use linked list CORE format 1430 // Use linked list CORE format
1349 token = unserialize_hashtable_row_format2(dbFile, h[x]+y, token); 1431 token = unserialize_hashtable_row_format2(dbFile, h[x]+y, token);
1350 else 1432 else // FOR QUERY use static array data structure
1351 // Use ARRAY CORE format with numElements counter 1433 // Use ARRAY CORE format with numElements counter
1352 token = unserialize_hashtable_row_to_array(dbFile, h[x]+y, numElements); 1434 token = unserialize_hashtable_row_to_array(dbFile, h[x]+y, numElements);
1353 #else 1435 #else
1354 token = unserialize_hashtable_row_format2(dbFile, h[x]+y); 1436 token = unserialize_hashtable_row_format2(dbFile, h[x]+y);
1355 #endif 1437 #endif
1356 // Check that token is valid 1438 // Check that token is valid
1357 if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){ 1439 if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){
1358 fclose(dbFile); 1440 fclose(dbFile);
1359 error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()"); 1441 error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()");
1365 } 1447 }
1366 // Check for new row flag 1448 // Check for new row flag
1367 if(token==O2_SERIAL_TOKEN_T1) 1449 if(token==O2_SERIAL_TOKEN_T1)
1368 H::t1 = token; 1450 H::t1 = token;
1369 } 1451 }
1370 } 1452 #ifdef _LSH_DEBUG_
1453 cout << "[T " << x-1 << "] pointCount = " << tablesPointCount << endl;
1454 sumTablesPointCount+=tablesPointCount;
1455 #endif
1456 }
1457 #ifdef _LSH_DEBUG_
1458 cout << "TOTAL pointCount = " << sumTablesPointCount << endl;
1459 #endif
1371 #ifdef LSH_DUMP_CORE_TABLES 1460 #ifdef LSH_DUMP_CORE_TABLES
1372 dump_hashtables(); 1461 dump_hashtables();
1373 #endif 1462 #endif
1374 } 1463 }
1375 1464
1405 error("Read error H::p","unserialize_hashtable_row_format2"); 1494 error("Read error H::p","unserialize_hashtable_row_format2");
1406 } 1495 }
1407 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){ 1496 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){
1408 pointFound=true; 1497 pointFound=true;
1409 bucket_insert_point(b); 1498 bucket_insert_point(b);
1499 tablesPointCount++;
1410 if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){ 1500 if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){
1411 fclose(dbFile); 1501 fclose(dbFile);
1412 error("Read error H::p","unserialize_hashtable_row_format2"); 1502 error("Read error H::p","unserialize_hashtable_row_format2");
1413 } 1503 }
1414 } 1504 }
1432 // ASSUME: that LSH_LIST_HEAD_COUNTERS is set so that the first hashtable bucket is used to count 1522 // ASSUME: that LSH_LIST_HEAD_COUNTERS is set so that the first hashtable bucket is used to count
1433 // point and bucket entries 1523 // point and bucket entries
1434 // 1524 //
1435 // We store the values of numPoints and numBuckets in separate fields of the first bucket 1525 // We store the values of numPoints and numBuckets in separate fields of the first bucket
1436 // rowPtr->t2 // numPoints 1526 // rowPtr->t2 // numPoints
1437 // (Uns32T)(rowPtr->snext) // numBuckets 1527 // (rowPtr->snext.numBuckets) // numBuckets
1438 // 1528 //
1439 // We cast the rowPtr->next pointer to (Uns32*) malloc(numElements*sizeof(Uns32T) + sizeof(bucket*)) 1529 // We cast the rowPtr->next pointer to (Uns32*) malloc(numElements*sizeof(Uns32T) + sizeof(bucket*))
1440 // To get to the fist bucket, we use 1530 // To get to the fist bucket, we use
1441 // 1531 //
1442 1532
1457 #define ENCODE_POINT_SKIP_BITS TEST_TOKEN(!numPointsThisBucket, "no points found");\ 1547 #define ENCODE_POINT_SKIP_BITS TEST_TOKEN(!numPointsThisBucket, "no points found");\
1458 if(numPointsThisBucket==1){\ 1548 if(numPointsThisBucket==1){\
1459 secondPtr=ap++;\ 1549 secondPtr=ap++;\
1460 *secondPtr=0;\ 1550 *secondPtr=0;\
1461 numPoints++;\ 1551 numPoints++;\
1552 numSingletons++;\
1462 }\ 1553 }\
1463 if(numPointsThisBucket>1){\ 1554 if(numPointsThisBucket>1){\
1464 *firstPtr |= ( (numPointsThisBucket-1) & 0x3 ) << SKIP_BITS_LEFT_SHIFT_MSB;\ 1555 *firstPtr |= ( (numPointsThisBucket-1) & 0x3 ) << SKIP_BITS_LEFT_SHIFT_MSB;\
1465 *secondPtr |= ( ( (numPointsThisBucket-1) & 0xC) >> 2 ) << SKIP_BITS_LEFT_SHIFT_MSB;} 1556 *secondPtr |= ( ( (numPointsThisBucket-1) & 0xC) >> 2 ) << SKIP_BITS_LEFT_SHIFT_MSB;}
1466 1557
1468 Uns32T numPointsThisBucket = 0; 1559 Uns32T numPointsThisBucket = 0;
1469 Uns32T numBuckets = 0; 1560 Uns32T numBuckets = 0;
1470 Uns32T numPoints = 0; 1561 Uns32T numPoints = 0;
1471 Uns32T* firstPtr = 0; 1562 Uns32T* firstPtr = 0;
1472 Uns32T* secondPtr = 0; 1563 Uns32T* secondPtr = 0;
1564 Uns32T numSingletons = 0; // Count single point puckets because we encode them with 2 points (for skip)
1473 1565
1474 // Initialize new row 1566 // Initialize new row
1475 if(!*rowPP){ 1567 if(!*rowPP){
1476 *rowPP = new bucket(); 1568 *rowPP = new bucket();
1477 #ifdef LSH_LIST_HEAD_COUNTERS 1569 #ifdef LSH_LIST_HEAD_COUNTERS
1478 (*rowPP)->t2 = 0; // Use t2 as a collision counter for the row 1570 (*rowPP)->t2 = 0; // Use t2 as a collision counter for the row
1479 (*rowPP)->next = 0; 1571 (*rowPP)->next = 0;
1480 #endif 1572 #endif
1481 } 1573 }
1482 bucket* rowPtr = *rowPP; 1574 bucket* rowPtr = *rowPP;
1483 1575 Uns32T last_t2 = 0;
1484 READ_UNS32T(&(H::t2),"t2"); 1576 READ_UNS32T(&(H::t2),"t2");
1485 TEST_TOKEN(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2), "expected E or T2"); 1577 TEST_TOKEN(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2), "expected E or T2");
1486 // Because we encode points in 16-point blocks, we sometimes allocate repeated t2 elements 1578 // Because we encode points in 16-point blocks, we sometimes allocate repeated t2 elements
1487 // So over-allocate by a factor of two and realloc later to actual numElements 1579 // So over-allocate by a factor of two and realloc later to actual numElements
1488 CR_ASSERT(rowPtr->next = (bucket*) malloc((2*numElements+1)*sizeof(Uns32T)+sizeof(bucket**))); 1580 CR_ASSERT(rowPtr->next = (bucket*) malloc((2*numElements+1)*sizeof(Uns32T)+sizeof(bucket**)));
1490 while( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1) ){ 1582 while( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1) ){
1491 numPointsThisBucket = 0;// reset bucket point counter 1583 numPointsThisBucket = 0;// reset bucket point counter
1492 secondPtr = 0; // reset second-point pointer 1584 secondPtr = 0; // reset second-point pointer
1493 TEST_TOKEN(H::t2!=O2_SERIAL_TOKEN_T2, "expected T2"); 1585 TEST_TOKEN(H::t2!=O2_SERIAL_TOKEN_T2, "expected T2");
1494 READ_UNS32T(&(H::t2), "Read error t2"); 1586 READ_UNS32T(&(H::t2), "Read error t2");
1587 if(H::t2<last_t2)
1588 cout << "last_t2=" << last_t2 << ", t2=" << H::t2 << endl;
1589 TEST_TOKEN(H::t2<last_t2, "t2 tokens not in ascending order");
1590 last_t2 = H::t2;
1591 /*
1592 #ifdef _LSH_DEBUG_
1593 cout << "+" << H::t2 << "+";
1594 #endif
1595 */
1495 *ap++ = H::t2; // Insert t2 value into array 1596 *ap++ = H::t2; // Insert t2 value into array
1496 numBuckets++; 1597 numBuckets++;
1497 READ_UNS32T(&(H::p), "Read error H::p"); 1598 READ_UNS32T(&(H::p), "Read error H::p");
1498 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){ 1599 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){
1499 if(numPointsThisBucket==MAX_POINTS_IN_BUCKET_CORE_ARRAY){ 1600 if(numPointsThisBucket==MAX_POINTS_IN_BUCKET_CORE_ARRAY){
1500 ENCODE_POINT_SKIP_BITS; 1601 ENCODE_POINT_SKIP_BITS;
1602 /*
1603 #ifdef _LSH_DEBUG_
1604 cout << "*" << H::t2 << "*";
1605 #endif
1606 */
1501 *ap++ = H::t2; // Extra element 1607 *ap++ = H::t2; // Extra element
1502 numBuckets++; // record this as a new bucket 1608 numBuckets++; // record this as a new bucket
1503 numPointsThisBucket=0; // reset bucket point counter 1609 numPointsThisBucket=0; // reset bucket point counter
1504 secondPtr = 0; // reset second-point pointer 1610 secondPtr = 0; // reset second-point pointer
1505 } 1611 }
1506 if( ++numPointsThisBucket == 1 ) 1612 if( ++numPointsThisBucket == 1 )
1507 firstPtr = ap; // store pointer to first point to insert skip bits later on 1613 firstPtr = ap; // store pointer to first point to insert skip bits later on
1508 else if( numPointsThisBucket == 2 ) 1614 else if( numPointsThisBucket == 2 )
1509 secondPtr = ap; // store pointer to first point to insert skip bits later on 1615 secondPtr = ap; // store pointer to first point to insert skip bits later on
1510 numPoints++; 1616 numPoints++;
1617 /*
1618 #ifdef _LSH_DEBUG_
1619 cout << "(" << H::p << ":" << numPoints << ")";
1620 #endif
1621 */
1622
1511 *ap++ = H::p; 1623 *ap++ = H::p;
1512 READ_UNS32T(&(H::p), "Read error H::p"); 1624 READ_UNS32T(&(H::p), "Read error H::p");
1513 } 1625 }
1514 ENCODE_POINT_SKIP_BITS; 1626 ENCODE_POINT_SKIP_BITS;
1515 H::t2 = H::p; // Copy last found token to t2 1627 H::t2 = H::p; // Copy last found token to t2
1524 // Set the LSH_CORE_ARRAY_BIT to identify data structure for insertion and retrieval 1636 // Set the LSH_CORE_ARRAY_BIT to identify data structure for insertion and retrieval
1525 rowPtr->t2 |= LSH_CORE_ARRAY_BIT; 1637 rowPtr->t2 |= LSH_CORE_ARRAY_BIT;
1526 // Allocate a new dynamic list head at the end of the array 1638 // Allocate a new dynamic list head at the end of the array
1527 bucket** listPtr = reinterpret_cast<bucket**> (ap); 1639 bucket** listPtr = reinterpret_cast<bucket**> (ap);
1528 *listPtr = 0; 1640 *listPtr = 0;
1641 /*
1642 #ifdef _LSH_DEBUG_
1643 cout << " numBuckets=" << numBuckets << " numPoints=" << numPoints - numSingletons << " numElements(array) " << numBuckets+numPoints - numSingletons << " " << endl;
1644 #endif
1645 */
1646 H::tablesPointCount += numPoints - numSingletons;
1529 // Return current token 1647 // Return current token
1530 return H::t2; // return H::t2 which holds current token [E or T1] 1648 return H::t2; // return H::t2 which holds current token [E or T1]
1531 } 1649 }
1532 1650
1533 1651
1535 // *p is a pointer to the beginning of a hashtable row array 1653 // *p is a pointer to the beginning of a hashtable row array
1536 // The array consists of t2 hash keys and one or more point identifiers p for each hash key 1654 // The array consists of t2 hash keys and one or more point identifiers p for each hash key
1537 // Retrieval is performed by generating a hash key query_t2 for query point q 1655 // Retrieval is performed by generating a hash key query_t2 for query point q
1538 // We identify the row that t2 is stored in using a secondary hash t1, this row is the entry 1656 // We identify the row that t2 is stored in using a secondary hash t1, this row is the entry
1539 // point for retrieve_from_core_hashtable_array 1657 // point for retrieve_from_core_hashtable_array
1540 #define SKIP_BITS (0xC0000000) 1658 #define SKIP_BITS (0xC0000000U)
1541 void G::retrieve_from_core_hashtable_array(Uns32T* p, Uns32T qpos){ 1659 void G::retrieve_from_core_hashtable_array(Uns32T* p, Uns32T qpos){
1542 Uns32T skip; 1660 Uns32T skip;
1543 Uns32T t2; 1661 Uns32T t2;
1544 Uns32T p1; 1662 Uns32T p1;
1545 Uns32T p2; 1663 Uns32T p2;
1552 return; 1670 return;
1553 p1 = *p++; 1671 p1 = *p++;
1554 p2 = *p++; 1672 p2 = *p++;
1555 skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB); 1673 skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB);
1556 if( t2 == H::t2 ){ 1674 if( t2 == H::t2 ){
1557 add_point_callback(calling_instance, p1 ^ (p1 & SKIP_BITS), qpos, radius); 1675 add_point_callback(calling_instance, p1 & ~SKIP_BITS, qpos, radius);
1558 if(skip--){ 1676 if(skip--){
1559 add_point_callback(calling_instance, p2 ^ (p2 & SKIP_BITS), qpos, radius); 1677 add_point_callback(calling_instance, p2 & ~SKIP_BITS, qpos, radius);
1560 while(skip-- ) 1678 while(skip-- )
1561 add_point_callback(calling_instance, *p++, qpos, radius); 1679 add_point_callback(calling_instance, *p++, qpos, radius);
1562 } 1680 }
1563 } 1681 }
1564 else 1682 else
1588 fflush(stdout); 1706 fflush(stdout);
1589 } 1707 }
1590 } 1708 }
1591 } 1709 }
1592 1710
1711 void G::dump_core_row(Uns32T n){
1712 if(!(n<H::N)){
1713 printf("ROW OUT OF RANGE:%d (MAX:%d)\n", n, H::N-1);
1714 return;
1715 }
1716 for(Uns32T j = 0 ; j < H::L ; j++ ){
1717 bucket* bPtr = h[j][n];
1718 if(bPtr){
1719 printf("C[%d,%d]", j, n);
1720 #ifdef LSH_LIST_HEAD_COUNTERS
1721 printf("[numBuckets=%d]",bPtr->snext.numBuckets);
1722 if(bPtr->t2&LSH_CORE_ARRAY_BIT) {
1723 dump_core_hashtable_array((Uns32T*)(bPtr->next));
1724 }
1725 else {
1726 dump_hashtable_row(bPtr->next);
1727 }
1728 #else
1729 dump_hashtable_row(bPtr);
1730 #endif
1731 printf("\n");
1732 }
1733 }
1734 }
1735
1736 void G::dump_disk_row(char* filename, Uns32T n){
1737 int dbfid = unserialize_lsh_header(filename);
1738 if(dbfid<0){
1739 cerr << "Could not read header from " << filename << endl;
1740 return;
1741 }
1742 FILE* dbFile = 0;
1743 dbFile = fdopen(dbfid, "rb");
1744 if(!dbFile){
1745 cerr << "Could not create FILE pointer from file:" << filename << " with fid:" << dbfid << endl;
1746 close(dbfid);
1747 return;
1748 }
1749
1750 Uns32T x=0,y=0;
1751
1752 // Seek to hashtable base offset
1753 if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
1754 fclose(dbFile);
1755 error("fSeek error in unserialize_lsh_hashtables_format2");
1756 }
1757 Uns32T token = 0;
1758 Uns32T pointID;
1759
1760 // Read the hash tables into core (structure is given in header)
1761 while( x < H::L){
1762 y=0;
1763 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1764 fclose(dbFile);
1765 error("Read error T1","unserialize_lsh_hashtables_format2()");
1766 }
1767 while(token != O2_SERIAL_TOKEN_ENDTABLE){
1768 if(token == O2_SERIAL_TOKEN_T1){
1769 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1770 fclose(dbFile);
1771 error("Read error t1","unserialize_lsh_hashtables_format2()");
1772 }
1773 y=token;
1774 if(y==n){
1775 printf("D[%d,%d]", x, y);
1776 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1777 fclose(dbFile);
1778 error("Read error 2","unserialize_lsh_hashtables_format2()");
1779 }
1780 printf("[numElements=%d]", token);
1781 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1782 fclose(dbFile);
1783 error("Read error 3","unserialize_lsh_hashtables_format2()");
1784 }
1785 while(!(token==O2_SERIAL_TOKEN_ENDTABLE || token==O2_SERIAL_TOKEN_T1)){
1786 // Check for T2 token
1787 if(token!=O2_SERIAL_TOKEN_T2){
1788 printf("t2=%d",token);
1789 fclose(dbFile);
1790 error("State machine error T2 token", "unserialize_hashtable_row_format2()");
1791 }
1792 // Read t2 value
1793 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1794 fclose(dbFile);
1795 error("Read error t2","unserialize_hashtable_row_format2");
1796 }
1797 if(fread(&pointID, sizeof(Uns32T), 1, dbFile) != 1){
1798 fclose(dbFile);
1799 error("Read error pointID","unserialize_hashtable_row_format2");
1800 }
1801 while(!(pointID==O2_SERIAL_TOKEN_ENDTABLE || pointID==O2_SERIAL_TOKEN_T1 || pointID==O2_SERIAL_TOKEN_T2 )){
1802 printf("(%0X,%u)", token, pointID);
1803 if(fread(&pointID, sizeof(Uns32T), 1, dbFile) != 1){
1804 fclose(dbFile);
1805 error("Read error H::p","unserialize_hashtable_row_format2");
1806 }
1807 }
1808 token = pointID; // Copy last found token
1809 }
1810 printf("\n");
1811 }
1812 else{ // gobble up rest of row
1813 while(!(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE)){
1814 if(fread(&token, sizeof(Uns32T), 1, dbFile) != 1){
1815 fclose(dbFile);
1816 error("Read error 4","unserialize_lsh_hashtables_format2()");
1817 }
1818 }
1819 }
1820 }
1821 }
1822 if(token==O2_SERIAL_TOKEN_ENDTABLE){
1823 x++;
1824 }
1825 }
1826 close(dbfid);
1827 }
1828
1829
1593 void G::dump_core_hashtable_array(Uns32T* p){ 1830 void G::dump_core_hashtable_array(Uns32T* p){
1594 Uns32T skip; 1831 Uns32T skip;
1595 Uns32T t2; 1832 Uns32T t2;
1596 Uns32T p1; 1833 Uns32T p1;
1597 Uns32T p2; 1834 Uns32T p2;
1599 do{ 1836 do{
1600 t2 = *p++; 1837 t2 = *p++;
1601 p1 = *p++; 1838 p1 = *p++;
1602 p2 = *p++; 1839 p2 = *p++;
1603 skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB); 1840 skip = (( p1 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_LSB) + (( p2 & SKIP_BITS ) >> SKIP_BITS_RIGHT_SHIFT_MSB);
1604 printf("(%0x, %0x)", t2, p1 ^ (p1 & SKIP_BITS)); 1841 printf("(%0X, %u)", t2, p1 & ~SKIP_BITS);
1605 if(skip--){ 1842 if(skip--){
1606 printf("(%0x, %0x)", t2, p2 ^ (p2 & SKIP_BITS)); 1843 printf("(%0X, %u)", t2, p2 & ~SKIP_BITS);
1607 while(skip-- ) 1844 while(skip-- )
1608 printf("(%0x, %0x)", t2, *p++); 1845 printf("(%0X, %u)", t2, *p++);
1609 } 1846 }
1610 }while( *p != LSH_CORE_ARRAY_END_ROW_TOKEN ); 1847 }while( *p != LSH_CORE_ARRAY_END_ROW_TOKEN );
1611 } 1848 }
1612 1849
1613 void G::dump_hashtable_row(bucket* p){ 1850 void G::dump_hashtable_row(bucket* p){
1634 // outputs: 1871 // outputs:
1635 // inserts retrieved points into add_point() callback method 1872 // inserts retrieved points into add_point() callback method
1636 void G::serial_retrieve_point_set(char* filename, vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller) 1873 void G::serial_retrieve_point_set(char* filename, vector<vector<float> >& vv, ReporterCallbackPtr add_point, void* caller)
1637 { 1874 {
1638 int dbfid = serial_open(filename, 0); // open for read 1875 int dbfid = serial_open(filename, 0); // open for read
1639 serial_get_header(dbfid); 1876 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1877 serial_get_header(dbheader); // read header
1878 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
1640 1879
1641 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ 1880 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
1642 serial_close(dbfid); 1881 serial_close(dbfid);
1643 error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only"); 1882 error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only");
1644 } 1883 }
1646 // size of each hash table 1885 // size of each hash table
1647 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1886 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
1648 calling_instance = caller; // class instance variable used in ...bucket_chain_point() 1887 calling_instance = caller; // class instance variable used in ...bucket_chain_point()
1649 add_point_callback = add_point; 1888 add_point_callback = add_point;
1650 1889
1651 SerialElementT *pe = (SerialElementT *)malloc(hashTableSize);
1652 for(Uns32T j=0; j<L; j++){ 1890 for(Uns32T j=0; j<L; j++){
1653 // read a single hash table for random access 1891 // memory map a single hash table for random access
1654 serial_get_table(dbfid, j, pe, hashTableSize); 1892 char* db = serial_mmap(dbfid, hashTableSize, 0,
1893 align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
1894 #ifdef __CYGWIN__
1895 // No madvise in CYGWIN
1896 #else
1897 if(madvise(db, hashTableSize, MADV_RANDOM)<0)
1898 error("could not advise local hashtable memory","","madvise");
1899 #endif
1900 SerialElementT* pe = (SerialElementT*)db ;
1655 for(Uns32T qpos=0; qpos<vv.size(); qpos++){ 1901 for(Uns32T qpos=0; qpos<vv.size(); qpos++){
1656 H::compute_hash_functions(vv[qpos]); 1902 H::compute_hash_functions(vv[qpos]);
1657 H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); 1903 H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j));
1658 serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row 1904 serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row
1659 } 1905 }
1906 serial_munmap(db, hashTableSize); // drop hashtable mmap
1660 } 1907 }
1661 free(pe);
1662 serial_close(dbfid); 1908 serial_close(dbfid);
1663 } 1909 }
1664 1910
1665 void G::serial_retrieve_point(char* filename, vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){ 1911 void G::serial_retrieve_point(char* filename, vector<float>& v, Uns32T qpos, ReporterCallbackPtr add_point, void* caller){
1666 int dbfid = serial_open(filename, 0); // open for read 1912 int dbfid = serial_open(filename, 0); // open for read
1667 serial_get_header(dbfid); 1913 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1914 serial_get_header(dbheader); // read header
1915 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
1668 1916
1669 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ 1917 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
1670 serial_close(dbfid); 1918 serial_close(dbfid);
1671 error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only"); 1919 error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only");
1672 } 1920 }
1674 // size of each hash table 1922 // size of each hash table
1675 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1923 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
1676 calling_instance = caller; 1924 calling_instance = caller;
1677 add_point_callback = add_point; 1925 add_point_callback = add_point;
1678 H::compute_hash_functions(v); 1926 H::compute_hash_functions(v);
1679
1680 SerialElementT *pe = (SerialElementT *)malloc(hashTableSize);
1681 for(Uns32T j=0; j<L; j++){ 1927 for(Uns32T j=0; j<L; j++){
1682 // read a single hash table for random access 1928 // memory map a single hash table for random access
1683 serial_get_table(dbfid, j, pe, hashTableSize); 1929 char* db = serial_mmap(dbfid, hashTableSize, 0,
1930 align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
1931 #ifdef __CYGWIN__
1932 // No madvise in CYGWIN
1933 #else
1934 if(madvise(db, hashTableSize, MADV_RANDOM)<0)
1935 error("could not advise local hashtable memory","","madvise");
1936 #endif
1937 SerialElementT* pe = (SerialElementT*)db ;
1684 H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j)); 1938 H::generate_hash_keys(*(g+j),*(r1+j),*(r2+j));
1685 serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row 1939 serial_bucket_chain_point(pe+t1*lshHeader->numCols, qpos); // Point to correct row
1686 } 1940 serial_munmap(db, hashTableSize); // drop hashtable mmap
1687 free(pe); 1941 }
1688 serial_close(dbfid); 1942 serial_close(dbfid);
1689 } 1943 }
1690 1944
1691 void G::serial_dump_tables(char* filename){ 1945 void G::serial_dump_tables(char* filename){
1692 int dbfid = serial_open(filename, 0); // open for read 1946 int dbfid = serial_open(filename, 0); // open for read
1693 serial_get_header(dbfid); 1947 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1948 serial_get_header(dbheader); // read header
1949 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
1694 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1950 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
1695 SerialElementT *db = (SerialElementT *)malloc(hashTableSize);
1696 for(Uns32T j=0; j<L; j++){ 1951 for(Uns32T j=0; j<L; j++){
1697 // read a single hash table for random access 1952 // memory map a single hash table for random access
1698 serial_get_table(dbfid, j, db, hashTableSize); 1953 char* db = serial_mmap(dbfid, hashTableSize, 0,
1699 SerialElementT *pe = db; 1954 align_up(get_serial_hashtable_offset()+j*hashTableSize,get_page_logn()));
1955 #ifdef __CYGWIN__
1956 // No madvise in CYGWIN
1957 #else
1958 if(madvise(db, hashTableSize, MADV_SEQUENTIAL)<0)
1959 error("could not advise local hashtable memory","","madvise");
1960 #endif
1961 SerialElementT* pe = (SerialElementT*)db ;
1700 printf("*********** TABLE %d ***************\n", j); 1962 printf("*********** TABLE %d ***************\n", j);
1701 fflush(stdout); 1963 fflush(stdout);
1702 int count=0; 1964 int count=0;
1703 do{ 1965 do{
1704 printf("[%d,%d]", j, count++); 1966 printf("[%d,%d]", j, count++);
1705 fflush(stdout); 1967 fflush(stdout);
1706 serial_bucket_dump(pe); 1968 serial_bucket_dump(pe);
1707 pe+=lshHeader->numCols; 1969 pe+=lshHeader->numCols;
1708 }while(pe<(SerialElementT*)db+lshHeader->numRows*lshHeader->numCols); 1970 }while(pe<(SerialElementT*)db+lshHeader->numRows*lshHeader->numCols);
1709 } 1971 }
1710 free(db);
1711 } 1972 }
1712 1973
1713 void G::serial_bucket_dump(SerialElementT* pe){ 1974 void G::serial_bucket_dump(SerialElementT* pe){
1714 SerialElementT* pend = pe+lshHeader->numCols; 1975 SerialElementT* pend = pe+lshHeader->numCols;
1715 while( !(pe->hashValue==IFLAG || pe==pend ) ){ 1976 while( !(pe->hashValue==IFLAG || pe==pend ) ){