comparison lshlib.cpp @ 336:fe4d5b763086

converted read/write into fread/fwrite for LSH hashtable serialize and unserialize. INDEXING is now faster.
author mas01mc
date Fri, 05 Sep 2008 14:16:21 +0000
parents c93be2f3a674
children a6edbe97fddf
comparison
equal deleted inserted replaced
335:69d5649d3e1c 336:fe4d5b763086
434 unserialize_lsh_hashtables_format1(dbfid); 434 unserialize_lsh_hashtables_format1(dbfid);
435 } 435 }
436 436
437 // Format2 always needs unserializing 437 // Format2 always needs unserializing
438 if(lshHeader->flags&O2_SERIAL_FILEFORMAT2 && lshInCoreFlag){ 438 if(lshHeader->flags&O2_SERIAL_FILEFORMAT2 && lshInCoreFlag){
439 unserialize_lsh_hashtables_format2(dbfid); 439 FILE* dbFile = fdopen(dbfid, "rb");
440 } 440 if(!dbFile)
441 441 error("Cannot open LSH file for reading", filename);
442 close(dbfid);} 442 unserialize_lsh_hashtables_format2(dbFile);
443 }
444 serial_close(dbfid);
445 }
443 446
444 G::~G(){ 447 G::~G(){
445 delete lshHeader; 448 delete lshHeader;
446 } 449 }
447 450
656 if(dbIsNew) 659 if(dbIsNew)
657 serialize_lsh_hashfunctions(dbfid); 660 serialize_lsh_hashfunctions(dbfid);
658 // Write the hashtables in the requested format 661 // Write the hashtables in the requested format
659 if(serialFormat == O2_SERIAL_FILEFORMAT1) 662 if(serialFormat == O2_SERIAL_FILEFORMAT1)
660 serialize_lsh_hashtables_format1(dbfid, !dbIsNew); 663 serialize_lsh_hashtables_format1(dbfid, !dbIsNew);
661 else 664 else{
662 serialize_lsh_hashtables_format2(dbfid, !dbIsNew); 665 FILE* dbFile = fdopen(dbfid, "r+b");
663 666 if(!dbFile)
664 if(!dbIsNew){ 667 error("Cannot open LSH file for writing",filename);
668 serialize_lsh_hashtables_format2(dbFile, !dbIsNew);
669 fflush(dbFile);
670 }
671
672 if(!dbIsNew) {
665 db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer 673 db = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 1);// get database pointer
666 //serial_get_header(db); // read header 674 //serial_get_header(db); // read header
667 cout << "maxp = " << H::maxp << endl; 675 cout << "maxp = " << H::maxp << endl;
668 lshHeader->maxp=H::maxp; 676 lshHeader->maxp=H::maxp;
669 // Default to FILEFORMAT1 677 // Default to FILEFORMAT1
670 if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2)) 678 if(!(lshHeader->flags&O2_SERIAL_FILEFORMAT2))
671 lshHeader->flags|=O2_SERIAL_FILEFORMAT1; 679 lshHeader->flags|=O2_SERIAL_FILEFORMAT1;
672 memcpy((char*)db, (char*)lshHeader, sizeof(SerialHeaderT)); 680 memcpy((char*)db, (char*)lshHeader, sizeof(SerialHeaderT));
673 serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap 681 serial_munmap(db, O2_SERIAL_HEADER_SIZE); // drop mmap
674 } 682 }
675 683 serial_close(dbfid);
676 serial_close(dbfid);
677 } 684 }
678 685
679 // Test to see if core structure and requested format is 686 // Test to see if core structure and requested format is
680 // compatible with currently opened database 687 // compatible with currently opened database
681 int G::serial_can_merge(Uns32T format){ 688 int G::serial_can_merge(Uns32T format){
872 } 879 }
873 pe->hashValue=IFLAG; 880 pe->hashValue=IFLAG;
874 return; 881 return;
875 } 882 }
876 883
877 int G::serialize_lsh_hashtables_format2(int fid, int merge){ 884 int G::serialize_lsh_hashtables_format2(FILE* dbFile, int merge){
878 Uns32T x,y; 885 Uns32T x,y;
879 886
880 if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT2) ) 887 if( merge && !serial_can_merge(O2_SERIAL_FILEFORMAT2) )
881 error("Cannot merge core and serial LSH, data structure dimensions mismatch."); 888 error("Cannot merge core and serial LSH, data structure dimensions mismatch.");
882 889
883 // We must pereform FORMAT2 merges in core 890 // We must pereform FORMAT2 merges in core
884 if(merge) 891 if(merge)
885 unserialize_lsh_hashtables_format2(fid); 892 unserialize_lsh_hashtables_format2(dbFile);
886 893
887 Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount, t1; 894 Uns32T colCount, meanColCount, colCountN, maxColCount, minColCount, t1;
888 lseek(fid, get_serial_hashtable_offset(), SEEK_SET); 895 if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
896 fclose(dbFile);
897 error("fSeek error in serialize_lsh_hashtables_format2");
898 }
889 899
890 // Write the hash tables 900 // Write the hash tables
891 for( x = 0 ; x < H::L ; x++ ){ 901 for( x = 0 ; x < H::L ; x++ ){
892 std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT2..."; 902 std::cout << (merge ? "merging":"writing") << " hash table " << x << " FORMAT2...";
893 std::cout.flush(); 903 std::cout.flush();
899 colCount=0; 909 colCount=0;
900 if(bucket* bPtr = h[x][y]){ 910 if(bucket* bPtr = h[x][y]){
901 // Check for empty row (even though row was allocated) 911 // Check for empty row (even though row was allocated)
902 #ifdef LSH_BLOCK_FULL_ROWS 912 #ifdef LSH_BLOCK_FULL_ROWS
903 if(bPtr->next->t2==IFLAG){ 913 if(bPtr->next->t2==IFLAG){
904 close(fid); 914 fclose(dbFile);
905 error("b->next->t2==IFLAG","serialize_lsh_hashtables_format2()"); 915 error("b->next->t2==IFLAG","serialize_lsh_hashtables_format2()");
906 } 916 }
907 #else 917 #else
908 if(bPtr->t2==IFLAG){ 918 if(bPtr->t2==IFLAG){
909 close(fid); 919 fclose(dbFile);
910 error("b->t2==IFLAG","serialize_lsh_hashtables_format2()"); 920 error("b->t2==IFLAG","serialize_lsh_hashtables_format2()");
911 } 921 }
912 #endif 922 #endif
913 t1 = O2_SERIAL_TOKEN_T1; 923 t1 = O2_SERIAL_TOKEN_T1;
914 if( write(fid, &t1, sizeof(Uns32T)) != sizeof(Uns32T) ){ 924 if( fwrite(&t1, sizeof(Uns32T), 1, dbFile) != 1 ){
915 close(fid); 925 fclose(dbFile);
916 error("write error in serial_write_hashtable_format2() [T1]"); 926 error("write error in serial_write_hashtable_format2() [T1]");
917 } 927 }
918 t1 = y; 928 t1 = y;
919 if( write(fid, &t1, sizeof(Uns32T)) != sizeof(Uns32T) ){ 929 if( fwrite(&t1, sizeof(Uns32T), 1, dbFile) != 1 ){
920 close(fid); 930 fclose(dbFile);
921 error("write error in serial_write_hashtable_format2() [t1]"); 931 error("write error in serial_write_hashtable_format2() [t1]");
922 } 932 }
923 #ifdef LSH_BLOCK_FULL_ROWS 933 #ifdef LSH_BLOCK_FULL_ROWS
924 serial_write_hashtable_row_format2(fid, bPtr->next, colCount); // skip collision counter bucket 934 serial_write_hashtable_row_format2(dbFile, bPtr->next, colCount); // skip collision counter bucket
925 #else 935 #else
926 serial_write_hashtable_row_format2(fid, bPtr, colCount); 936 serial_write_hashtable_row_format2(dbFile, bPtr, colCount);
927 #endif 937 #endif
928 } 938 }
929 if(colCount){ 939 if(colCount){
930 if(colCount<minColCount) 940 if(colCount<minColCount)
931 minColCount=colCount; 941 minColCount=colCount;
935 colCountN++; 945 colCountN++;
936 } 946 }
937 } 947 }
938 // Write END of table marker 948 // Write END of table marker
939 t1 = O2_SERIAL_TOKEN_ENDTABLE; 949 t1 = O2_SERIAL_TOKEN_ENDTABLE;
940 if( write(fid, &t1, sizeof(Uns32T)) != sizeof(Uns32T) ){ 950 if( fwrite(&t1, sizeof(Uns32T), 1, dbFile ) != 1 ){
941 close(fid); 951 fclose(dbFile);
942 error("write error in serial_write_hashtable_format2() [end]"); 952 error("write error in serial_write_hashtable_format2() [end]");
943 } 953 }
944 954
945 if(colCountN) 955 if(colCountN)
946 std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN 956 std::cout << "#rows with collisions =" << colCountN << ", mean = " << meanColCount/(float)colCountN
950 960
951 // We're done writing 961 // We're done writing
952 return 1; 962 return 1;
953 } 963 }
954 964
955 void G::serial_write_hashtable_row_format2(int fid, bucket* b, Uns32T& colCount){ 965 void G::serial_write_hashtable_row_format2(FILE* dbFile, bucket* b, Uns32T& colCount){
956 while(b && b->t2!=IFLAG){ 966 while(b && b->t2!=IFLAG){
957 if(!b->snext){ 967 if(!b->snext){
958 close(fid); 968 fclose(dbFile);
959 error("Empty collision chain in serial_write_hashtable_row_format2()"); 969 error("Empty collision chain in serial_write_hashtable_row_format2()");
960 } 970 }
961 t2 = O2_SERIAL_TOKEN_T2; 971 t2 = O2_SERIAL_TOKEN_T2;
962 if( write(fid, &t2, sizeof(Uns32T)) != sizeof(Uns32T) ){ 972 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
963 close(fid); 973 fclose(dbFile);
964 error("write error in serial_write_hashtable_row_format2()"); 974 error("write error in serial_write_hashtable_row_format2()");
965 } 975 }
966 t2 = b->t2; 976 t2 = b->t2;
967 if( write(fid, &t2, sizeof(Uns32T)) != sizeof(Uns32T) ){ 977 if( fwrite(&t2, sizeof(Uns32T), 1, dbFile) != 1 ){
968 close(fid); 978 fclose(dbFile);
969 error("write error in serial_write_hashtable_row_format2()"); 979 error("write error in serial_write_hashtable_row_format2()");
970 } 980 }
971 serial_write_element_format2(fid, b->snext, colCount); 981 serial_write_element_format2(dbFile, b->snext, colCount);
972 b=b->next; 982 b=b->next;
973 } 983 }
974 } 984 }
975 985
976 void G::serial_write_element_format2(int fid, sbucket* sb, Uns32T& colCount){ 986 void G::serial_write_element_format2(FILE* dbFile, sbucket* sb, Uns32T& colCount){
977 while(sb){ 987 while(sb){
978 if(write(fid, &sb->pointID, sizeof(Uns32T))!=sizeof(Uns32T)){ 988 if(fwrite(&sb->pointID, sizeof(Uns32T), 1, dbFile) != 1){
979 close(fid); 989 fclose(dbFile);
980 error("Write error in serial_write_element_format2()"); 990 error("Write error in serial_write_element_format2()");
981 } 991 }
982 colCount++; 992 colCount++;
983 sb=sb->snext; 993 sb=sb->snext;
984 } 994 }
1195 pe++; 1205 pe++;
1196 colCount++; 1206 colCount++;
1197 } 1207 }
1198 } 1208 }
1199 1209
1200 void G::unserialize_lsh_hashtables_format2(int fid){ 1210 void G::unserialize_lsh_hashtables_format2(FILE* dbFile){
1201 Uns32T x=0,y=0; 1211 Uns32T x=0,y=0;
1202 1212
1203 // Seek to hashtable base offset 1213 // Seek to hashtable base offset
1204 if(lseek(fid, get_serial_hashtable_offset(), SEEK_SET)!=get_serial_hashtable_offset()){ 1214 if(fseek(dbFile, get_serial_hashtable_offset(), SEEK_SET)){
1205 close(fid); 1215 fclose(dbFile);
1206 error("Seek error in unserialize_lsh_hashtables_format2"); 1216 error("fSeek error in unserialize_lsh_hashtables_format2");
1207 } 1217 }
1208 1218
1209 // Read the hash tables into core (structure is given in header) 1219 // Read the hash tables into core (structure is given in header)
1210 while( x < H::L){ 1220 while( x < H::L){
1211 if(read(fid, &(H::t1), sizeof(Uns32T))!=sizeof(Uns32T)){ 1221 if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){
1212 close(fid); 1222 fclose(dbFile);
1213 error("Read error","unserialize_lsh_hashtables_format2()"); 1223 error("Read error","unserialize_lsh_hashtables_format2()");
1214 } 1224 }
1215 if(H::t1==O2_SERIAL_TOKEN_ENDTABLE) 1225 if(H::t1==O2_SERIAL_TOKEN_ENDTABLE)
1216 x++; // End of table 1226 x++; // End of table
1217 else 1227 else
1218 while(y < H::N){ 1228 while(y < H::N){
1219 // Read a row and move file pointer to beginning of next row or _bittable 1229 // Read a row and move file pointer to beginning of next row or _bittable
1220 if(!(H::t1==O2_SERIAL_TOKEN_T1)){ 1230 if(!(H::t1==O2_SERIAL_TOKEN_T1)){
1221 close(fid); 1231 fclose(dbFile);
1222 error("State matchine error T1","unserialize_lsh_hashtables_format2()"); 1232 error("State matchine error T1","unserialize_lsh_hashtables_format2()");
1223 } 1233 }
1224 if(read(fid, &(H::t1), sizeof(Uns32T))!=sizeof(Uns32T)){ 1234 if(fread(&(H::t1), sizeof(Uns32T), 1, dbFile) != 1){
1225 close(fid); 1235 fclose(dbFile);
1226 error("Read error: t1","unserialize_lsh_hashtables_format2()"); 1236 error("Read error: t1","unserialize_lsh_hashtables_format2()");
1227 } 1237 }
1228 y = H::t1; 1238 y = H::t1;
1229 if(y>=H::N){ 1239 if(y>=H::N){
1230 close(fid); 1240 fclose(dbFile);
1231 error("Unserialized hashtable row pointer out of range","unserialize_lsh_hashtables_format2()"); 1241 error("Unserialized hashtable row pointer out of range","unserialize_lsh_hashtables_format2()");
1232 } 1242 }
1233 Uns32T token = unserialize_hashtable_row_format2(fid, h[x]+y); 1243 Uns32T token = unserialize_hashtable_row_format2(dbFile, h[x]+y);
1234 1244
1235 #ifdef __LSH_DUMP_CORE_TABLES__ 1245 #ifdef __LSH_DUMP_CORE_TABLES__
1236 printf("C[%d,%d]", x, y); 1246 printf("C[%d,%d]", x, y);
1237 dump_hashtable_row(h[x][y]); 1247 dump_hashtable_row(h[x][y]);
1238 #endif 1248 #endif
1239 // Check that token is valid 1249 // Check that token is valid
1240 if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){ 1250 if( !(token==O2_SERIAL_TOKEN_T1 || token==O2_SERIAL_TOKEN_ENDTABLE) ){
1241 close(fid); 1251 fclose(dbFile);
1242 error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()"); 1252 error("State machine error end of row/table", "unserialize_lsh_hashtables_format2()");
1243 } 1253 }
1244 // Check for end of table flag 1254 // Check for end of table flag
1245 if(token==O2_SERIAL_TOKEN_ENDTABLE){ 1255 if(token==O2_SERIAL_TOKEN_ENDTABLE){
1246 x++; 1256 x++;
1251 H::t1 = token; 1261 H::t1 = token;
1252 } 1262 }
1253 } 1263 }
1254 } 1264 }
1255 1265
1256 Uns32T G::unserialize_hashtable_row_format2(int fid, bucket** b){ 1266 Uns32T G::unserialize_hashtable_row_format2(FILE* dbFile, bucket** b){
1257 bool pointFound = false; 1267 bool pointFound = false;
1258 if(read(fid, &(H::t2), sizeof(Uns32T)) != sizeof(Uns32T)){ 1268 if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
1259 close(fid); 1269 fclose(dbFile);
1260 error("Read error T2 token","unserialize_hashtable_row_format2"); 1270 error("Read error T2 token","unserialize_hashtable_row_format2");
1261 } 1271 }
1262 if( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2)){ 1272 if( !(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T2)){
1263 close(fid); 1273 fclose(dbFile);
1264 error("State machine error: expected E or T2"); 1274 error("State machine error: expected E or T2");
1265 } 1275 }
1266 while(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1)){ 1276 while(!(H::t2==O2_SERIAL_TOKEN_ENDTABLE || H::t2==O2_SERIAL_TOKEN_T1)){
1267 pointFound=false; 1277 pointFound=false;
1268 // Check for T2 token 1278 // Check for T2 token
1269 if(H::t2!=O2_SERIAL_TOKEN_T2){ 1279 if(H::t2!=O2_SERIAL_TOKEN_T2){
1270 close(fid); 1280 fclose(dbFile);
1271 error("State machine error T2 token", "unserialize_hashtable_row_format2()"); 1281 error("State machine error T2 token", "unserialize_hashtable_row_format2()");
1272 } 1282 }
1273 // Read t2 value 1283 // Read t2 value
1274 if(read(fid, &(H::t2), sizeof(Uns32T)) != sizeof(Uns32T)){ 1284 if(fread(&(H::t2), sizeof(Uns32T), 1, dbFile) != 1){
1275 close(fid); 1285 fclose(dbFile);
1276 error("Read error t2","unserialize_hashtable_row_format2"); 1286 error("Read error t2","unserialize_hashtable_row_format2");
1277 } 1287 }
1278 if(read(fid, &(H::p), sizeof(Uns32T)) != sizeof(Uns32T)){ 1288 if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){
1279 close(fid); 1289 fclose(dbFile);
1280 error("Read error H::p","unserialize_hashtable_row_format2"); 1290 error("Read error H::p","unserialize_hashtable_row_format2");
1281 } 1291 }
1282 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){ 1292 while(!(H::p==O2_SERIAL_TOKEN_ENDTABLE || H::p==O2_SERIAL_TOKEN_T1 || H::p==O2_SERIAL_TOKEN_T2 )){
1283 pointFound=true; 1293 pointFound=true;
1284 bucket_insert_point(b); 1294 bucket_insert_point(b);
1285 if(read(fid, &(H::p), sizeof(Uns32T)) != sizeof(Uns32T)){ 1295 if(fread(&(H::p), sizeof(Uns32T), 1, dbFile) != 1){
1286 close(fid); 1296 fclose(dbFile);
1287 error("Read error H::p","unserialize_hashtable_row_format2"); 1297 error("Read error H::p","unserialize_hashtable_row_format2");
1288 } 1298 }
1289 } 1299 }
1290 if(!pointFound) 1300 if(!pointFound)
1291 error("State machine error: point", "unserialize_hashtable_row_format2()"); 1301 error("State machine error: point", "unserialize_hashtable_row_format2()");
1323 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer 1333 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1324 serial_get_header(dbheader); // read header 1334 serial_get_header(dbheader); // read header
1325 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap 1335 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
1326 1336
1327 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ 1337 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
1328 close(dbfid); 1338 serial_close(dbfid);
1329 error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only"); 1339 error("serial_retrieve_point_set is for SERIAL_FILEFORMAT1 only");
1330 } 1340 }
1331 1341
1332 // size of each hash table 1342 // size of each hash table
1333 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1343 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;
1360 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer 1370 char* dbheader = serial_mmap(dbfid, O2_SERIAL_HEADER_SIZE, 0);// get database pointer
1361 serial_get_header(dbheader); // read header 1371 serial_get_header(dbheader); // read header
1362 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap 1372 serial_munmap(dbheader, O2_SERIAL_HEADER_SIZE); // drop header mmap
1363 1373
1364 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){ 1374 if((lshHeader->flags & O2_SERIAL_FILEFORMAT2)){
1365 close(dbfid); 1375 serial_close(dbfid);
1366 error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only"); 1376 error("serial_retrieve_point is for SERIAL_FILEFORMAT1 only");
1367 } 1377 }
1368 1378
1369 // size of each hash table 1379 // size of each hash table
1370 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols; 1380 Uns32T hashTableSize=sizeof(SerialElementT)*lshHeader->numRows*lshHeader->numCols;