changeset 308:896679d8cc39

Added server-side loading of persistent index (LSH hashtables) via --load_index -d dbName -R radius -l sequenceLength. Queries using these parameters will lookup the memory-resident hashtable instead of loading one from disk.
author mas01mc
date Thu, 07 Aug 2008 01:53:38 +0000
parents d1b8b2dec37e
children 81ad865402e7
files audioDB.cpp audioDB.h create.cpp gengetopt.in index.cpp lshlib.cpp lshlib.h query.cpp soap.cpp
diffstat 9 files changed, 86 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/audioDB.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/audioDB.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -1,5 +1,7 @@
 #include "audioDB.h"
 
+LSH* SERVER_LSH_INDEX_SINGLETON;
+
 PointPair::PointPair(Uns32T a, Uns32T b, Uns32T c):trackID(a),qpos(b),spos(c){};
 
 bool operator<(const PointPair& a, const PointPair& b){
@@ -137,6 +139,8 @@
     close(infid);
   if(dbH)
     delete dbH;
+  if(lsh!=SERVER_LSH_INDEX_SINGLETON)
+    delete lsh;
 }
 
 audioDB::~audioDB(){
@@ -246,6 +250,17 @@
     sa.sa_flags = SA_SIGINFO | SA_RESTART | SA_NODEFER;
     sigaction(SIGHUP, &sa, NULL);
 #endif
+    if(args_info.load_index_given){
+      if(!args_info.database_given)
+	error("load_index requires a --database argument");
+      else
+	dbName=args_info.database_arg;
+      if(!args_info.radius_given)
+	error("load_index requires a --radius argument");
+      if(!args_info.sequencelength_given)
+	error("load_index requires a --sequenceLength argument");
+      WS_load_index = true;
+    }
     return 0;
   }
 
@@ -603,6 +618,9 @@
   VERB_LOG(2, " done.");
 }
 
+// This entry point is visited once per instance
+// so it is a good place to set any global state variables
 int main(const unsigned argc, char* const argv[]){
+  SERVER_LSH_INDEX_SINGLETON = 0; // Initialize global variables
   audioDB(argc, argv);
 }
--- a/audioDB.h	Wed Aug 06 21:23:14 2008 +0000
+++ b/audioDB.h	Thu Aug 07 01:53:38 2008 +0000
@@ -137,6 +137,8 @@
     fflush(stderr); \
   }
 
+extern LSH* SERVER_LSH_INDEX_SINGLETON;
+
 typedef struct dbTableHeader {
   uint32_t magic;
   uint32_t version;
@@ -165,8 +167,7 @@
 
 bool operator<(const PointPair& a, const PointPair& b);
 
-class audioDB{
-  
+class audioDB{  
  private:
   gengetopt_args_info args_info;
   unsigned dim;
@@ -286,7 +287,6 @@
   void get_lock(int fd, bool exclusive);
   void release_lock(int fd);
   void create(const char* dbName);
-  void drop();
   bool enough_per_file_space_free();
   bool enough_data_space_free(off_t size);
   void insert_data_vectors(off_t offset, void *buffer, size_t size);
@@ -296,9 +296,6 @@
   void status(const char* dbName, adb__statusResponse *adbStatusResponse=0);
   unsigned random_track(unsigned *propTable, unsigned total);
   void sample(const char *dbName);
-  void ws_status(const char*dbName, char* hostport);
-  void ws_query(const char*dbName, const char *featureFileName, const char* hostport);
-  void ws_query_by_key(const char*dbName, const char *trackKey, const char* hostport);
   void l2norm(const char* dbName);
   void power_flag(const char *dbName);
   bool powers_acceptable(double p1, double p2);
@@ -309,6 +306,7 @@
   bool lsh_in_core;     // load LSH tables for query into core (true) or keep on disk (false)
   bool lsh_use_u_functions;
   bool lsh_exact;      // flag to indicate use exact evaluation of points returned by LSH
+  bool WS_load_index; // flag to indicate that we want to make a Web Services index memory resident
   double lsh_param_w; // Width of LSH hash-function bins
   Uns32T lsh_param_k; // Number of independent hash functions
   Uns32T lsh_param_m; // Combinatorial parameter for m(m-1)/2 hash tables
@@ -340,9 +338,13 @@
   static Uns32T index_from_trackInfo(Uns32T, Uns32T); // Convert audioDB trackID and trackPos to an lsh point index
   void initialize_exact_evalutation_queue();
   void index_insert_exact_evaluation_queue(Uns32T trackID, Uns32T qpos, Uns32T spos);
+  LSH* index_allocate(char* indexName, bool load_hashTables);
 
   // Web Services
   void startServer();
+  void ws_status(const char*dbName, char* hostport);
+  void ws_query(const char*dbName, const char *featureFileName, const char* hostport);
+  void ws_query_by_key(const char*dbName, const char *trackKey, const char* hostport);
   
 };
 
@@ -413,6 +415,7 @@
     lsh_in_core(false),				\
     lsh_use_u_functions(false),                 \
     lsh_exact(false),                           \
+    WS_load_index(false),                       \
     lsh_param_k(0),				\
     lsh_param_m(0),				\
     lsh_param_N(0),				\
--- a/create.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/create.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -59,7 +59,3 @@
   VERB_LOG(0, "%s %s\n", COM_CREATE, dbName);
 }
 
-void audioDB::drop(){
-  // FIXME: drop something?  Should we even allow this?
-}
-
--- a/gengetopt.in	Wed Aug 06 21:23:14 2008 +0000
+++ b/gengetopt.in	Thu Aug 07 01:53:38 2008 +0000
@@ -80,7 +80,7 @@
 
 option "SERVER" s "run as standalone web service on named port." int typestr="port" default="14475" optional
 option "client" c "run as a client using named host service." string typestr="hostname:port" optional
-
+option "load_index" - "make web service with memory-resident hashtables" flag off dependon="radius" optional
 
 text "
 Copyright (c) 2007 Michael Casey, Christophe Rhodes
--- a/index.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/index.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -52,6 +52,19 @@
     return true;
 }
 
+LSH* audioDB::index_allocate(char* indexName, bool load_hashTables){
+  LSH* gIndx=SERVER_LSH_INDEX_SINGLETON;
+  if(isServer && gIndx && (strncmp(gIndx->get_indexName(), indexName, MAXSTR)==0) )
+    audioDB::lsh = gIndx; // Use the global SERVER resident index
+  else{
+    if(audioDB::lsh)
+      delete audioDB::lsh;
+    audioDB::lsh = new LSH(indexName, load_hashTables);
+  }
+  assert(audioDB::lsh);  
+  return audioDB::lsh;
+}
+
 vector<vector<float> >* audioDB::index_initialize_shingles(Uns32T sz){
   if(vv)
     delete vv;
@@ -152,6 +165,7 @@
     
     // Clean up
     delete lsh;
+    lsh = 0;
     close(lshfid);
   }
   
@@ -165,6 +179,7 @@
     assert(lsh);
     Uns32T maxs = index_to_trackID(lsh->get_maxp())+1;
     delete lsh;
+    lsh = 0;
 
     // This allows for updating index after more tracks are inserted into audioDB
     for(Uns32T startTrack = maxs; startTrack < dbH->numFiles; startTrack+=lsh_param_b){
@@ -183,6 +198,7 @@
       // Serialize to file
       lsh->serialize(newIndexName, lsh_in_core?O2_SERIAL_FILEFORMAT2:O2_SERIAL_FILEFORMAT1); // Serialize core LSH heap to disk
       delete lsh;
+      lsh = 0;
     }    
     
     close(lshfid);    
@@ -362,32 +378,31 @@
     return false;  
   }
 
-  printf("INDEX: initializing header\n");
-  
-  lsh = new LSH(indexName, false); // Get the header only here
-  assert(lsh);
+  lsh = index_allocate(indexName, false); // Get the header only here
   sequenceLength = lsh->get_lshHeader()->dataDim / dbH->dim; // shingleDim / vectorDim
   
-
-  if( fabs(radius - lsh->get_radius())>fabs(O2_DISTANCE_TOLERANCE))
-    printf("*** Warning: adb_radius (%f) != lsh_radius (%f) ***\n", radius, lsh->get_radius());
-
-  printf("INDEX: dim %d\n", dbH->dim);
-  printf("INDEX: R %f\n", lsh->get_radius());
-  printf("INDEX: seqlen %d\n", sequenceLength);
-  printf("INDEX: w %f\n", lsh->get_lshHeader()->get_binWidth());
-  printf("INDEX: k %d\n", lsh->get_lshHeader()->get_numFuns());
-  printf("INDEX: L (m*(m-1))/2 %d\n", lsh->get_lshHeader()->get_numTables());
-  printf("INDEX: N %d\n", lsh->get_lshHeader()->get_numRows());
-  printf("INDEX: s %d\n", index_to_trackID(lsh->get_maxp()));
-  printf("INDEX: Opened LSH index file %s\n", indexName);
-  fflush(stdout);
+  if(!SERVER_LSH_INDEX_SINGLETON){  
+    if( fabs(radius - lsh->get_radius())>fabs(O2_DISTANCE_TOLERANCE))
+      printf("*** Warning: adb_radius (%f) != lsh_radius (%f) ***\n", radius, lsh->get_radius());
+    printf("INDEX: dim %d\n", dbH->dim);
+    printf("INDEX: R %f\n", lsh->get_radius());
+    printf("INDEX: seqlen %d\n", sequenceLength);
+    printf("INDEX: w %f\n", lsh->get_lshHeader()->get_binWidth());
+    printf("INDEX: k %d\n", lsh->get_lshHeader()->get_numFuns());
+    printf("INDEX: L (m*(m-1))/2 %d\n", lsh->get_lshHeader()->get_numTables());
+    printf("INDEX: N %d\n", lsh->get_lshHeader()->get_numRows());
+    printf("INDEX: s %d\n", index_to_trackID(lsh->get_maxp()));
+    printf("INDEX: Opened LSH index file %s\n", indexName);
+    fflush(stdout);
+  }
 
   // Check to see if we are loading hash tables into core, and do so if true
   if((lsh->get_lshHeader()->flags&O2_SERIAL_FILEFORMAT2) || lsh_in_core){
-    printf("INDEX: loading hash tables into core %s\n", (lsh->get_lshHeader()->flags&O2_SERIAL_FILEFORMAT2)?"FORMAT2":"FORMAT1");
-    delete lsh;
-    lsh = new LSH(indexName, true);
+    if(SERVER_LSH_INDEX_SINGLETON)
+      fprintf(stderr,"INDEX: using persistent hash tables: %s\n", lsh->get_indexName());
+    else
+      printf("INDEX: loading hash tables into core %s\n", (lsh->get_lshHeader()->flags&O2_SERIAL_FILEFORMAT2)?"FORMAT2":"FORMAT1");
+    lsh = index_allocate(indexName, true);
   }
   
   delete[] indexName;
--- a/lshlib.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/lshlib.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -404,6 +404,7 @@
 // Interface to Locality Sensitive Hashing G
 G::G(float ww, Uns32T kk,Uns32T mm, Uns32T dd, Uns32T NN, Uns32T CC, float rr):
   H(kk,mm,dd,NN,CC,ww,rr), // constructor to initialize data structures
+  indexName(0),
   lshHeader(0),
   calling_instance(0),
   add_point_callback(0)
@@ -417,6 +418,7 @@
 // Optionally load the LSH tables into head-allocated lists in core 
 G::G(char* filename, bool lshInCoreFlag):
   H(), // default base-class constructor call delays data-structure initialization 
+  indexName(filename),
   lshHeader(0),
   calling_instance(0),
   add_point_callback(0)
--- a/lshlib.h	Wed Aug 06 21:23:14 2008 +0000
+++ b/lshlib.h	Thu Aug 07 01:53:38 2008 +0000
@@ -259,6 +259,8 @@
 // Interface for indexing and retrieval
 class G: public H{
  private:
+  char* indexName;
+
   // LSH serial data structure file handling
   void get_lock(int fd, bool exclusive);
   void release_lock(int fd);
@@ -340,6 +342,7 @@
   SerialHeaderT* get_lshHeader(){return lshHeader;}
   void serial_dump_tables(char* filename);
   float get_mean_collision_rate(){ return (float) pointCount / bucketCount ; }
+  char* get_indexName(){return indexName;}
 };
 
 typedef class G LSH;
--- a/query.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/query.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -45,10 +45,8 @@
     } else {
       if(index_exists(dbName, radius, sequenceLength)){
 	char* indexName = index_get_name(dbName, radius, sequenceLength);
-	lsh = new LSH(indexName);
-	assert(lsh);
+	lsh = index_allocate(indexName, false);
 	reporter = new trackSequenceQueryRadReporter(trackNN, index_to_trackID(lsh->get_maxp())+1);
-	delete lsh;
 	delete[] indexName;
       }
       else
@@ -63,10 +61,8 @@
     } else {
       if(index_exists(dbName, radius, sequenceLength)){
 	char* indexName = index_get_name(dbName, radius, sequenceLength);
-	lsh = new LSH(indexName);
-	assert(lsh);
+	lsh = index_allocate(indexName, false);
 	reporter = new trackSequenceQueryRadNNReporter(pointNN,trackNN, index_to_trackID(lsh->get_maxp())+1);
-	delete lsh;
 	delete[] indexName;
       }
       else
--- a/soap.cpp	Wed Aug 06 21:23:14 2008 +0000
+++ b/soap.cpp	Thu Aug 07 01:53:38 2008 +0000
@@ -28,6 +28,7 @@
   soap_done(&soap);
 }
 
+// WS_QUERY (CLIENT SIDE)
 void audioDB::ws_query(const char*dbName, const char *featureFileName, const char* hostport){
   struct soap soap;
   adb__queryResponse adbQueryResponse;  
@@ -49,6 +50,7 @@
   soap_done(&soap);
 }
 
+// WS_QUERY_BY_KEY (CLIENT SIDE)
 void audioDB::ws_query_by_key(const char*dbName, const char *trackKey, const char* hostport){
   struct soap soap;
   adb__queryResponse adbQueryResponse;  
@@ -95,9 +97,8 @@
     return SOAP_FAULT;
   }
 }
-
+ 
 // Literal translation of command line to web service
-
 int adb__query(struct soap* soap, xsd__string dbName, xsd__string qKey, xsd__string keyList, xsd__string timesFileName, xsd__int qType, xsd__int qPos, xsd__int pointNN, xsd__int trackNN, xsd__int seqLen, adb__queryResponse &adbQueryResponse){
   char queryType[256];
   for(int k=0; k<256; k++)
@@ -246,6 +247,18 @@
   else
     {
       fprintf(stderr, "Socket connection successful: master socket = %d\n", m);
+      // Make a global Web Services LSH Index (SINGLETON)
+      if(WS_load_index && dbName && index_exists(dbName, radius, sequenceLength)){
+	char* indexName = index_get_name(dbName, radius, sequenceLength);
+	fprintf(stderr, "Loading LSH hashtables: %s...\n", indexName);
+	lsh = new LSH(indexName, true);
+	assert(lsh);
+	SERVER_LSH_INDEX_SINGLETON = lsh;
+	fprintf(stderr, "LSH INDEX READY\n");
+	fflush(stderr);
+	delete[] indexName;
+      }
+
       for (int i = 1; ; i++)
 	{
 	  s = soap_accept(&soap);