changeset 320:a995e5ad999a large_adb

working LARGE_ADB support. Activiate at creation time with -N --ntracks 20001 or greater, or with --datasize 1356 or greater, or both. LARGE_ADB blocks non-indexed QUERY.
author mas01mc
date Wed, 20 Aug 2008 13:50:58 +0000
parents b9eff6896943
children da2272e029b3
files audioDB.cpp audioDB.h create.cpp index.cpp insert.cpp query.cpp soap.cpp
diffstat 7 files changed, 143 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/audioDB.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/audioDB.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -5,15 +5,15 @@
 PointPair::PointPair(Uns32T a, Uns32T b, Uns32T c):trackID(a),qpos(b),spos(c){};
 
 bool operator<(const PointPair& a, const PointPair& b){
-  return ( (a.qpos<b.qpos) || 
-	   ((a.qpos==b.qpos) && 
-	    ( (a.trackID<b.trackID)) || ((a.trackID==b.trackID)&&(a.spos<b.spos)) ) );	    
+  return ( (a.trackID<b.trackID) ||
+	   ( (a.trackID==b.trackID) &&  
+	     ( (a.spos<b.spos) || ( (a.spos==b.spos) && (a.qpos < b.qpos) )) ) );
 }
 
 bool operator>(const PointPair& a, const PointPair& b){
-  return ( (a.qpos>b.qpos) || 
-	   ((a.qpos==b.qpos) && 
-	    ( (a.trackID>b.trackID)) || ((a.trackID==b.trackID)&&(a.spos>b.spos)) ) );
+  return ( (a.trackID>b.trackID) ||
+	   ( (a.trackID==b.trackID) &&  
+	     ( (a.spos>b.spos) || ( (a.spos==b.spos) && (a.qpos > b.qpos) )) ) );
 }
 
 bool operator==(const PointPair& a, const PointPair& b){
--- a/audioDB.h	Tue Aug 19 20:27:15 2008 +0000
+++ b/audioDB.h	Wed Aug 20 13:50:58 2008 +0000
@@ -107,7 +107,8 @@
 #define O2_MAXDOTPRODUCTMEMORY (sizeof(O2_REALTYPE)*O2_MAXSEQLEN*O2_MAXSEQLEN) // 512MB
 #define O2_DISTANCE_TOLERANCE (1e-6)
 #define O2_SERIAL_MAX_TRACKBATCH (1000000)
-#define O2_LARGE_ADB_SIZE (3500) // datasize at which features are kept externally (in Mbytes)
+#define O2_LARGE_ADB_SIZE (O2_DEFAULT_DATASIZE+1) // datasize at which features are kept externally (in Mbytes)
+#define O2_LARGE_ADB_NTRACKS (O2_DEFAULT_NTRACKS+1) // ntracks at which features are kept externally
 #define O2_MAX_VECTORS ( O2_MEANNUMVECTORS * O2_MAXTRACKS )
 
 // Flags
@@ -158,6 +159,9 @@
 #define INSERT_FILETABLE_STRING(OFFSET, STR) \
     strncpy((char*)((Uns32T)OFFSET) + dbH->numFiles*O2_FILETABLE_ENTRY_SIZE, STR, strlen(STR));
 
+#define SAFE_DELETE(PTR) delete PTR; PTR=0;
+#define SAFE_DELETE_ARRAY(PTR) delete[] PTR; PTR=0;
+
 extern LSH* SERVER_LSH_INDEX_SINGLETON;
 
 typedef struct dbTableHeader {
--- a/create.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/create.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -64,13 +64,13 @@
   // If database will fit in a single file the vectors are copied into the AudioDB instance
   // Else all the vectors are left on the FileSystem and we use the dataOffset as storage
   // for the location of the features, powers and times files (assuming that arbitrary keys are used for the fileTable)
-  if(datasize<O2_LARGE_ADB_SIZE){
+  if(ntracks<O2_LARGE_ADB_NTRACKS && datasize<O2_LARGE_ADB_SIZE){
     dbH->timesTableOffset = ALIGN_PAGE_UP(dbH->dataOffset + databytes);
     dbH->powerTableOffset = ALIGN_PAGE_UP(dbH->timesTableOffset + 2*auxbytes);
     dbH->l2normTableOffset = ALIGN_PAGE_UP(dbH->powerTableOffset + auxbytes);
     dbH->dbSize = ALIGN_PAGE_UP(dbH->l2normTableOffset + auxbytes);
   }
-  else{
+  else{ // Create LARGE_ADB, features and powers kept on filesystem 
     dbH->flags |= O2_FLAG_LARGE_ADB;
     dbH->timesTableOffset = ALIGN_PAGE_UP(dbH->dataOffset + O2_FILETABLE_ENTRY_SIZE*ntracks);
     dbH->powerTableOffset = ALIGN_PAGE_UP(dbH->timesTableOffset + O2_FILETABLE_ENTRY_SIZE*ntracks);
--- a/index.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/index.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -119,22 +119,28 @@
 
 /************************ LSH indexing ***********************************/
 void audioDB::index_index_db(const char* dbName){
-  
   char* newIndexName;
   double *fvp = 0, *sNorm = 0, *snPtr = 0, *sPower = 0, *spPtr = 0;
   Uns32T dbVectors = 0;
 
+
   printf("INDEX: initializing header\n");
   // Check if audioDB exists, initialize header and open database for read
   forWrite = false;
   initDBHeader(dbName);
 
+  if(dbH->flags & O2_FLAG_POWER)
+    usingPower = true;
+  
+  if(dbH->flags & O2_FLAG_TIMES)
+    usingTimes = true;
+
   newIndexName = index_get_name(dbName, radius, sequenceLength);
 
   // Set unit norming flag override
   audioDB::normalizedDistance = !audioDB::no_unit_norming;
 
-  printf("INDEX: dim %d\n", dbH->dim);
+  printf("INDEX: dim %d\n", (int)dbH->dim);
   printf("INDEX: R %f\n", radius);
   printf("INDEX: seqlen %d\n", sequenceLength);
   printf("INDEX: lsh_w %f\n", lsh_param_w);
@@ -164,12 +170,10 @@
     if( endTrack > dbH->numFiles)
       endTrack = dbH->numFiles;
     // Insert up to lsh_param_b tracks
-    if( dbH->flags & O2_FLAG_LARGE_ADB ){
+    if( ! (dbH->flags & O2_FLAG_LARGE_ADB) ){
+      index_initialize(&sNorm, &snPtr, &sPower, &spPtr, &dbVectors);  
     }
-    else{
-      index_initialize(&sNorm, &snPtr, &sPower, &spPtr, &dbVectors);  
-      index_insert_tracks(0, endTrack, &fvp, &sNorm, &snPtr, &sPower, &spPtr);
-    }
+    index_insert_tracks(0, endTrack, &fvp, &sNorm, &snPtr, &sPower, &spPtr);
     lsh->serialize(newIndexName, lsh_in_core?O2_SERIAL_FILEFORMAT2:O2_SERIAL_FILEFORMAT1);
     
     // Clean up
@@ -220,14 +224,9 @@
     exit(1);
   }
     
-
   delete[] newIndexName;
-  if(sNorm)
-    delete[] sNorm;
-  if(sPower)
-    delete[] sPower;
-
-
+  delete[] sNorm;
+  delete[] sPower;
 }
 
 
@@ -258,7 +257,7 @@
     
     if( (statbuf.st_size - sizeof(int)) / (sizeof(double)) != trackTable[trackID] )
       error("Dimension mismatch: numPowers != numVectors", powerFileNameTable+trackID*O2_FILETABLE_ENTRY_SIZE);
-    
+   
     *sPowerp = new double[trackTable[trackID]]; // Allocate memory for power values
     assert(*sPowerp);
     *spPtrp = *sPowerp;
@@ -308,8 +307,8 @@
       break;    
     if ( dbH->flags & O2_FLAG_LARGE_ADB ){
       close(infid);
-      delete *sNormpp;
-      delete *sPowerp;
+      delete[] *sNormpp;
+      delete[] *sPowerp;
       *sNormpp = *sPowerp = *snPtrp = *snPtrp = 0;
     }
   } // end for(trackID = start_track ; ... )
@@ -468,7 +467,7 @@
   if(lsh!=SERVER_LSH_INDEX_SINGLETON){  
     if( fabs(radius - lsh->get_radius())>fabs(O2_DISTANCE_TOLERANCE))
       printf("*** Warning: adb_radius (%f) != lsh_radius (%f) ***\n", radius, lsh->get_radius());
-    printf("INDEX: dim %d\n", dbH->dim);
+    printf("INDEX: dim %d\n", (int)dbH->dim);
     printf("INDEX: R %f\n", lsh->get_radius());
     printf("INDEX: seqlen %d\n", sequenceLength);
     printf("INDEX: w %f\n", lsh->get_lshHeader()->get_binWidth());
@@ -531,10 +530,10 @@
 // return nqv: if index exists
 int audioDB::index_query_loop(const char* dbName, Uns32T queryIndex) {
   
-  unsigned int numVectors;
-  double *query, *query_data;
-  double *qNorm, *qnPtr, *qPower = 0, *qpPtr = 0;
-  double meanQdur;
+  unsigned int numVectors = 0;
+  double *query = 0, *query_data = 0;
+  double *qNorm = 0, *qnPtr = 0, *qPower = 0, *qpPtr = 0;
+  double meanQdur = 0;
   void (*add_point_func)(void*,Uns32T,Uns32T,float);
 
   // Set the point-reporter callback based on the value of lsh_exact
--- a/insert.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/insert.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -157,14 +157,14 @@
 }
 
 void audioDB::insertPowerData(unsigned numVectors, int powerfd, double *powerdata) {
-  if (usingPower) {
+  if(usingPower){
     if (!(dbH->flags & O2_FLAG_POWER)) {
       error("Cannot insert power data on non-power DB", dbName);
     }
-
+    
     int one;
     unsigned int count;
-
+    
     count = read(powerfd, &one, sizeof(unsigned int));
     if (count != sizeof(unsigned int)) {
       error("powerfd read failed", "int", "read");
@@ -172,7 +172,7 @@
     if (one != 1) {
       error("dimensionality of power file not 1", powerFileName);
     }
-
+    
     // FIXME: should check that the powerfile is the right size for
     // this.  -- CSR, 2007-10-30
     count = read(powerfd, powerdata, numVectors * sizeof(double));
--- a/query.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/query.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -341,48 +341,64 @@
   
   VERB_LOG(1, "performing norms... ");
 
-  // Read query feature vectors from database
-  *qp = NULL;
-  lseek(dbfid, dbH->dataOffset + trackOffsetTable[queryIndex] * sizeof(double), SEEK_SET);
-  size_t allocatedSize = 0;
-  read_data(dbfid, queryIndex, qp, &allocatedSize);
-  // Consistency check on allocated memory and query feature size
-  if(*nvp*sizeof(double)*dbH->dim != allocatedSize)
-    error("Query memory allocation failed consitency check","set_up_query_from_key");
-
-  Uns32T trackIndexOffset = trackOffsetTable[queryIndex]/dbH->dim; // Convert num data elements to num vectors
-  // Copy L2 norm partial-sum coefficients
-  assert(*qnp = new double[*nvp]);
-  memcpy(*qnp, l2normTable+trackIndexOffset, *nvp*sizeof(double));
-  sequence_sum(*qnp, *nvp, sequenceLength);
-  sequence_sqrt(*qnp, *nvp, sequenceLength);
-
-  if( usingPower ){
-    // Copy Power partial-sum coefficients
-    assert(*qpp = new double[*nvp]);
-    memcpy(*qpp, powerTable+trackIndexOffset, *nvp*sizeof(double));
-    sequence_sum(*qpp, *nvp, sequenceLength);
-    sequence_average(*qpp, *nvp, sequenceLength);
+  // For LARGE_ADB load query features from file
+  if( dbH->flags & O2_FLAG_LARGE_ADB ){
+    if(infid>0)
+      close(infid);
+    initInputFile(featureFileNameTable+queryIndex*O2_FILETABLE_ENTRY_SIZE, false); // nommap, file pointer at correct position
+    size_t allocatedSize = 0;
+    read_data(infid, queryIndex, qp, &allocatedSize); // over-writes qp and allocatedSize
+    // Consistency check on allocated memory and query feature size
+    if(*nvp*sizeof(double)*dbH->dim != allocatedSize)
+      error("Query memory allocation failed consitency check","set_up_query_from_key");
+    // Allocated and calculate auxillary sequences: l2norm and power
+    init_track_aux_data(queryIndex, *qp, qnp, vqnp, qpp, vqpp);
+  }
+  else{ // Load from self-contained ADB database
+    // Read query feature vectors from database
+    *qp = NULL;
+    lseek(dbfid, dbH->dataOffset + trackOffsetTable[queryIndex] * sizeof(double), SEEK_SET);
+    size_t allocatedSize = 0;
+    read_data(dbfid, queryIndex, qp, &allocatedSize);
+    // Consistency check on allocated memory and query feature size
+    if(*nvp*sizeof(double)*dbH->dim != allocatedSize)
+      error("Query memory allocation failed consitency check","set_up_query_from_key");
+    
+    Uns32T trackIndexOffset = trackOffsetTable[queryIndex]/dbH->dim; // Convert num data elements to num vectors
+    // Copy L2 norm partial-sum coefficients
+    assert(*qnp = new double[*nvp]);
+    memcpy(*qnp, l2normTable+trackIndexOffset, *nvp*sizeof(double));
+    sequence_sum(*qnp, *nvp, sequenceLength);
+    sequence_sqrt(*qnp, *nvp, sequenceLength);
+    
+    if( usingPower ){
+      // Copy Power partial-sum coefficients
+      assert(*qpp = new double[*nvp]);
+      memcpy(*qpp, powerTable+trackIndexOffset, *nvp*sizeof(double));
+      sequence_sum(*qpp, *nvp, sequenceLength);
+      sequence_average(*qpp, *nvp, sequenceLength);
+    }
+    
+    if (usingTimes) {
+      unsigned int k;
+      *mqdp = 0.0;
+      double *querydurs = new double[*nvp];
+      double *timesdata = new double[*nvp*2];
+      assert(querydurs && timesdata);
+      memcpy(timesdata, timesTable+trackIndexOffset, *nvp*sizeof(double));    
+      for(k = 0; k < *nvp; k++) {
+	querydurs[k] = timesdata[2*k+1] - timesdata[2*k];
+	*mqdp += querydurs[k];
+      }
+      *mqdp /= k;
+      
+      VERB_LOG(1, "mean query file duration: %f\n", *mqdp);
+      
+      delete [] querydurs;
+      delete [] timesdata;
+    }
   }
 
-  if (usingTimes) {
-    unsigned int k;
-    *mqdp = 0.0;
-    double *querydurs = new double[*nvp];
-    double *timesdata = new double[*nvp*2];
-    assert(querydurs && timesdata);
-    memcpy(timesdata, timesTable+trackIndexOffset, *nvp*sizeof(double));    
-    for(k = 0; k < *nvp; k++) {
-      querydurs[k] = timesdata[2*k+1] - timesdata[2*k];
-      *mqdp += querydurs[k];
-    }
-    *mqdp /= k;
-    
-    VERB_LOG(1, "mean query file duration: %f\n", *mqdp);
-    
-    delete [] querydurs;
-    delete [] timesdata;
-  }
   // Defaults, for exhaustive search (!usingQueryPoint)
   *vqp = *qp;
   *vqnp = *qnp;
@@ -487,7 +503,8 @@
   // Compute database info
   // FIXME: we more than likely don't need very much of the database
   // so make a new method to build these values per-track or, even better, per-point
-  set_up_db(&sNorm, &snPtr, &sPower, &spPtr, &meanDBdur, &dbVectors);
+  if( !( dbH->flags & O2_FLAG_LARGE_ADB) )
+    set_up_db(&sNorm, &snPtr, &sPower, &spPtr, &meanDBdur, &dbVectors);
 
   VERB_LOG(1, "matching points...");
 
@@ -495,48 +512,76 @@
   assert(trackNN>0 && trackNN<=O2_MAXNN);
 
   // We are guaranteed that the order of points is sorted by:
-  // qpos, trackID, spos
+  // trackID, spos, qpos
   // so we can be relatively efficient in initialization of track data.
   // Here we assume that points don't overlap, so we will use exhaustive dot
-  // product evaluation over the sequence
+  // product evaluation instead of memoization of partial sums which is used
+  // for exhaustive brute-force evaluation from smaller databases: e.g. query_loop()
   double dist;
   size_t data_buffer_size = 0;
   double *data_buffer = 0;
-  Uns32T trackOffset;
-  Uns32T trackIndexOffset;
+  Uns32T trackOffset = 0;
+  Uns32T trackIndexOffset = 0;
   Uns32T currentTrack = 0x80000000; // Initialize with a value outside of track index range
   Uns32T npairs = exact_evaluation_queue->size();
   while(npairs--){
     PointPair pp = exact_evaluation_queue->top();
-    trackOffset=trackOffsetTable[pp.trackID]; // num data elements offset
-    trackIndexOffset=trackOffset/dbH->dim;    // num vectors offset
-    if((!(usingPower) || powers_acceptable(qpPtr[usingQueryPoint?0:pp.qpos], sPower[trackIndexOffset+pp.spos])) &&
-       ((usingQueryPoint?0:pp.qpos) < numVectors-sequenceLength+1 && pp.spos < trackTable[pp.trackID]-sequenceLength+1)){
+    // Large ADB track data must be loaded here for sPower
+    if(dbH->flags & O2_FLAG_LARGE_ADB){
+      trackOffset=0;
+      trackIndexOffset=0;
       if(currentTrack!=pp.trackID){
+	// On currentTrack change, allocate and load track data
 	currentTrack=pp.trackID;
-        lseek(dbfid, dbH->dataOffset + trackOffset * sizeof(double), SEEK_SET);
+	SAFE_DELETE_ARRAY(sNorm);
+	SAFE_DELETE_ARRAY(sPower);
+	if(infid>0)
+	  close(infid);
+	// Open and check dimensions of feature file
+	initInputFile(featureFileNameTable+pp.trackID*O2_FILETABLE_ENTRY_SIZE, false); // nommap, file pointer at correct position
+	// Load the feature vector data for current track into data_buffer
+	read_data(infid, pp.trackID, &data_buffer, &data_buffer_size);	
+	// Load power and calculate power and l2norm sequence sums
+	init_track_aux_data(pp.trackID, data_buffer, &sNorm, &snPtr, &sPower, &spPtr);
+      }
+    }
+    else{
+      // These offsets are w.r.t. the entire database of feature vectors and auxillary variables
+      trackOffset=trackOffsetTable[pp.trackID]; // num data elements offset
+      trackIndexOffset=trackOffset/dbH->dim;    // num vectors offset
+    }    
+    Uns32T qPos = usingQueryPoint?0:pp.qpos;// index for query point
+    Uns32T sPos = trackIndexOffset+pp.spos; // index into l2norm table
+    // Test power thresholds before computing distance
+    if( ( !usingPower || powers_acceptable(qpPtr[qPos], sPower[sPos])) &&
+	( qPos<numVectors-sequenceLength+1 && pp.spos<trackTable[pp.trackID]-sequenceLength+1 ) ){
+      // Non-large ADB track data is loaded inside power test for efficiency
+      if( !(dbH->flags & O2_FLAG_LARGE_ADB) && (currentTrack!=pp.trackID) ){
+	// On currentTrack change, allocate and load track data
+	currentTrack=pp.trackID;
+	lseek(dbfid, dbH->dataOffset + trackOffset * sizeof(double), SEEK_SET);
 	read_data(dbfid, currentTrack, &data_buffer, &data_buffer_size);
       }
-      dist = dot_product_points(query+(usingQueryPoint?0:pp.qpos*dbH->dim), data_buffer+pp.spos*dbH->dim, dbH->dim*sequenceLength);
+      // Compute distance
+      dist = dot_product_points(query+qPos*dbH->dim, data_buffer+pp.spos*dbH->dim, dbH->dim*sequenceLength);
+      double qn = qnPtr[qPos];
+      double sn = sNorm[sPos];
       if(normalizedDistance) 
-	dist = 2-(2/(qnPtr[usingQueryPoint?0:pp.qpos]*sNorm[trackIndexOffset+pp.spos]))*dist;
+	dist = 2 - (2/(qn*sn))*dist;
       else 
 	if(no_unit_norming)
-	  dist = qnPtr[usingQueryPoint?0:pp.qpos]*qnPtr[usingQueryPoint?0:pp.qpos]+sNorm[trackIndexOffset+pp.spos]*sNorm[trackIndexOffset+pp.spos] - 2*dist;
+	  dist = qn*qn + sn*sn - 2*dist;
       // else
       // dist = dist;      
       if((!radius) || dist <= (O2_LSH_EXACT_MULT*radius+O2_DISTANCE_TOLERANCE)) 
-	reporter->add_point(pp.trackID, pp.qpos, pp.spos, dist);
+	reporter->add_point(pp.trackID, pp.qpos, pp.spos, dist);    
     }
     exact_evaluation_queue->pop();
   }
   // Cleanup
-  if(sNorm)
-    delete sNorm;
-  if(sPower)
-    delete sPower;
-  if(meanDBdur)
-    delete meanDBdur;
+  SAFE_DELETE_ARRAY(sNorm);
+  SAFE_DELETE_ARRAY(sPower);
+  SAFE_DELETE_ARRAY(meanDBdur);
 }
 
 // A completely unprotected dot-product method
--- a/soap.cpp	Tue Aug 19 20:27:15 2008 +0000
+++ b/soap.cpp	Wed Aug 20 13:50:58 2008 +0000
@@ -128,8 +128,6 @@
     strncpy(queryType,"track", strlen("track"));
   else if(qType == O2_N_SEQUENCE_QUERY)
     strncpy(queryType,"nsequence", strlen("nsequence"));
-  else
-    strncpy(queryType, "", strlen(""));
 
   if(pointNN==0)
     pointNN=10;