view insert.cpp @ 404:1fb8bee777e5 api-inversion

Begin working towards inverting audioDB::insert() / audiodb_insert(). New data type audiodb_datum_t, roughly corresponding to a "track" in current audioDB parlance; it contains exactly the feature information and metadata to record. New function audiodb_insert_datum() to insert one of these audiodb_datum_t objects into the database; the intention is that not only can insertion of feature files be implemented in terms of this function, but that it will be a useful function in its own right, callable perhaps from PD, Max/MSP, and/or a VAMP plugin. This function is complicated enough that it actually gets a comment. Implement audioDB::insert() in terms of audiodb_insert_datum(), via a wrapper which handles the slightly wacky error/non-error case of attempting to insert features with a key that already exists in the database. Delete whole rafts of code. We can't quite delete everything because there's batchinsert / batchinsert_large_adb to sort out; the good news is that the batchinsert operation can simply be implemented as a loop around audiodb_insert_datum() without loss of efficiency. (There's also a stray extra audiodb_insert() in libtests/0027/, found through an earlier iteration of this patch.)
author mas01cr
date Fri, 05 Dec 2008 22:32:43 +0000
parents a8a5f2ca5380
children ef4792df8f93
line wrap: on
line source
#include "audioDB.h"
extern "C" {
#include "audioDB_API.h"
}
#include "audioDB-internals.h"

static bool audiodb_enough_data_space_free(adb_t *adb, off_t size) {
  adb_header_t *header = adb->header;
  /* FIXME: timesTableOffset isn't necessarily the next biggest offset
     after dataOffset.  Maybe make the offsets into an array that we
     can iterate over... */
  return (header->timesTableOffset > 
          header->dataOffset + header->length + size);
}

static bool audiodb_enough_per_file_space_free(adb_t *adb) {
  /* FIXME: the comment above about the ordering of the tables applies
     here too. */
  adb_header_t *header = adb->header;
  off_t file_table_length = header->trackTableOffset - header->fileTableOffset;
  off_t track_table_length = header->dataOffset - header->trackTableOffset;
  int fmaxfiles = file_table_length / O2_FILETABLE_ENTRY_SIZE;
  int tmaxfiles = track_table_length / O2_TRACKTABLE_ENTRY_SIZE;
  /* maxfiles is the _minimum_ of the two.  Do not be confused... */
  unsigned int maxfiles = fmaxfiles > tmaxfiles ? tmaxfiles : fmaxfiles;
  return (header->numFiles < maxfiles);
}

/*
 * Hey, look, a comment.  Normally I wouldn't bother, as the code
 * should be self-documenting, but a lot of logic is concentrated in
 * this one place, so let's give an overview beforehand.  To insert a
 * datum into the database, we:
 *
 *  1. check write permission;
 *  2. check !O2_FLAG_LARGE_ADB;
 *  3. check for enough space;
 *  4. check that datum->dim and adb->header->dim agree (or that the
 *     header dimension is zero, in which case write datum->dim to
 *     adb->header->dim).
 *  5. check for presence of datum->key in adb->keys;
 *  6. check for consistency between power and O2_FLAG_POWER, and 
 *     times and O2_FLAG_TIMES;
 *  7. write in data, power, times as appropriate; add to track
 *     and key tables too;
 *  8. if O2_FLAG_L2NORM, compute norms and fill in table;
 *  9. update adb->keys and adb->header;
 * 10. sync adb->header with disk.
 *
 * Step 10 essentially commits the transaction; until we update
 * header->length, nothing will recognize the newly-written data.
 * In principle, if it fails, we should roll back, which we can in
 * fact do on the assumption that nothing in step 9 can ever fail;
 * on the other hand, if it's failed, then it's unlikely that
 * rolling back by syncing the original header back to disk is going
 * to work desperately well.
 */
int audiodb_insert_datum(adb_t *adb, adb_datum_t *datum) {

  off_t size, offset, nfiles;
  double *l2norm_buffer, *lp, *dp;

  /* 1. check write permission; */
  if(!(adb->flags & O_RDWR)) {
    return 1;
  }
  /* 2. check !O2_FLAG_LARGE_ADB; */
  if(adb->header->flags & O2_FLAG_LARGE_ADB) {
    return 1;
  }
  /* 3. check for enough space; */
  size = sizeof(double) * datum->nvectors * datum->dim;
  if(!audiodb_enough_data_space_free(adb, size)) {
    return 1;
  }
  if(!audiodb_enough_per_file_space_free(adb)) {
    return 1;
  }
  /* 4. check that datum->dim and adb->header->dim agree (or that the
   *    header dimension is zero, in which case write datum->dim to
   *    adb->header->dim).
   */
  if(adb->header->dim == 0) {
    adb->header->dim = datum->dim;
  } else if (adb->header->dim != datum->dim) {
    return 1;
  }
  /* 5. check for presence of datum->key in adb->keys; */
  if(adb->keys->count(datum->key)) {
    /* not part of an explicit API/ABI, but we need a distinguished
       value in this circumstance to preserve somewhat wonky behaviour
       of audioDB::batchinsert. */
    return 2;
  }
  /* 6. check for consistency between power and O2_FLAG_POWER, and
   *    times and O2_FLAG_TIMES; 
   */
  if((datum->power && !(adb->header->flags & O2_FLAG_POWER)) ||
     ((adb->header->flags & O2_FLAG_POWER) && !datum->power)) {
    return 1;
  }
  if(datum->times && !(adb->header->flags & O2_FLAG_TIMES)) {
    if(adb->header->numFiles == 0) {
      adb->header->flags |= O2_FLAG_TIMES;
    } else {
      return 1;
    }
  } else if ((adb->header->flags & O2_FLAG_TIMES) && !datum->times) {
    return 1;
  }
  /* 7. write in data, power, times as appropriate; add to track
   *    and key tables too;
   */
  offset = adb->header->length;
  nfiles = adb->header->numFiles;

  /* FIXME: checking for all these lseek()s and write()s */
  lseek(adb->fd, adb->header->dataOffset + offset, SEEK_SET);
  write(adb->fd, datum->data, sizeof(double) * datum->nvectors * datum->dim);
  if(datum->power) {
    lseek(adb->fd, adb->header->powerTableOffset + offset / datum->dim, SEEK_SET);
    write(adb->fd, datum->power, sizeof(double) * datum->nvectors);
  }
  if(datum->times) {
    lseek(adb->fd, adb->header->timesTableOffset + offset / datum->dim * 2, SEEK_SET);
    write(adb->fd, datum->times, sizeof(double) * datum->nvectors * 2);
  }
  lseek(adb->fd, adb->header->trackTableOffset + nfiles * O2_TRACKTABLE_ENTRY_SIZE, SEEK_SET);
  write(adb->fd, &datum->nvectors, O2_TRACKTABLE_ENTRY_SIZE);
  lseek(adb->fd, adb->header->fileTableOffset + nfiles * O2_FILETABLE_ENTRY_SIZE, SEEK_SET);
  write(adb->fd, datum->key, strlen(datum->key)+1);

  /* 8. if O2_FLAG_L2NORM, compute norms and fill in table; */
  l2norm_buffer = (double *) malloc(datum->nvectors * sizeof(double));

  /* FIXME: shared code with audiodb_norm_existing() */
  dp = datum->data;
  lp = l2norm_buffer;
  for(size_t i = 0; i < datum->nvectors; i++) {
    *lp = 0;
    for(unsigned int k = 0; k < datum->dim; k++) {
      *lp += (*dp)*(*dp);
      dp++;
    }
    lp++;
  }
  lseek(adb->fd, adb->header->l2normTableOffset + offset / datum->dim, SEEK_SET);
  write(adb->fd, l2norm_buffer, sizeof(double) * datum->nvectors);
  free(l2norm_buffer);

  adb->keys->insert(datum->key);
  adb->header->numFiles += 1;
  adb->header->length += sizeof(double) * datum->nvectors * datum->dim;

  return audiodb_sync_header(adb);

 error:
  return 1;
}

bool audioDB::enough_per_file_space_free() {
  unsigned int fmaxfiles, tmaxfiles;
  unsigned int maxfiles;

  fmaxfiles = fileTableLength / O2_FILETABLE_ENTRY_SIZE;
  tmaxfiles = trackTableLength / O2_TRACKTABLE_ENTRY_SIZE;
  maxfiles = fmaxfiles > tmaxfiles ? tmaxfiles : fmaxfiles;
  return(dbH->numFiles < maxfiles);
}

void audioDB::insertDatum(const char *inFile, std::ifstream *timesFile, int powerfd, const char *key) {
  adb_datum_t datum;
  int fd;
  struct stat statbuf;
  off_t size;
  int err;
  
  datum.times = 0;
  datum.power = 0;
  
  if((fd = open(inFile, O_RDONLY)) == -1) {
    error("failed to open input file", inFile);
  }
  if(fstat(fd, &statbuf)) {
    error("failed to stat input file", inFile);
  }
  read(fd, &(datum.dim), sizeof(uint32_t));
  size = statbuf.st_size - sizeof(uint32_t);
  datum.nvectors = size / (sizeof(double) * datum.dim);
  datum.data = (double *) malloc(size);
  if(!datum.data) {
    error("failed to allocate memory");
  }
  read(fd, datum.data, size);
  close(fd);
  if(timesFile) {
    datum.times = (double *) malloc(sizeof(double) * datum.nvectors * 2);
    if(!datum.times) {
      error("failed to allocate memory");
    }
    insertTimeStamps(datum.nvectors, timesFile, datum.times);
  }
  if(powerfd) {
    datum.power = (double *) malloc(sizeof(double) * datum.nvectors);
    if(!datum.power) {
      error("failed to allocate memory");
    }
    insertPowerData(datum.nvectors, powerfd, datum.power);
  }
  datum.key = key ? key : inFile;
  err = audiodb_insert_datum(adb, &datum);
  if(err && (err != 2)) {
    error("failed to insert data for file", inFile);
  }
}

void audioDB::insert(const char* dbName, const char* inFile) {
  if(!adb) {
    if(!(adb = audiodb_open(dbName, O_RDWR))) {
      error("failed to open database", dbName);
    }
  }
  if(adb->header->flags & O2_FLAG_LARGE_ADB) {
    
  } else {
    /* at this point, we have powerfd (an fd), timesFile (a
     * std::ifstream *) and inFile (a char *).  Wacky, huh? */
    insertDatum(inFile, timesFile, powerfd, key);
  }
  status(dbName);
}

void audioDB::insertTimeStamps(unsigned numVectors, std::ifstream *timesFile, double *timesdata) {
  assert(usingTimes);

  unsigned numtimes = 0;

  if(!timesFile->is_open()) {
    error("problem opening times file on timestamped database", timesFileName);
  }

  double timepoint, next;
  *timesFile >> timepoint;
  if (timesFile->eof()) {
    error("no entries in times file", timesFileName);
  }
  numtimes++;
  do {
    *timesFile >> next;
    if (timesFile->eof()) {
      break;
    }
    numtimes++;
    timesdata[0] = timepoint;
    timepoint = (timesdata[1] = next);
    timesdata += 2;
  } while (numtimes < numVectors + 1);

  if (numtimes < numVectors + 1) {
    error("too few timepoints in times file", timesFileName);
  }

  *timesFile >> next;
  if (!timesFile->eof()) {
    error("too many timepoints in times file", timesFileName);
  }
}

void audioDB::insertPowerData(unsigned numVectors, int powerfd, double *powerdata) {
  if(usingPower){
    int one;
    unsigned int count;
    
    count = read(powerfd, &one, sizeof(unsigned int));
    if (count != sizeof(unsigned int)) {
      error("powerfd read failed", "int", "read");
    }
    if (one != 1) {
      error("dimensionality of power file not 1", powerFileName);
    }
    
    // FIXME: should check that the powerfile is the right size for
    // this.  -- CSR, 2007-10-30
    count = read(powerfd, powerdata, numVectors * sizeof(double));
    if (count != numVectors * sizeof(double)) {
      error("powerfd read failed", "double", "read");
    }
  }
}

void audioDB::batchinsert(const char* dbName, const char* inFile) {
  forWrite = true;
  initDBHeader(dbName);

  // Treat large ADB instances differently
  if( dbH->flags & O2_FLAG_LARGE_ADB ){
    batchinsert_large_adb(dbName, inFile) ;
    return;
  }
    
  if(!key)
    key=inFile;
  std::ifstream *filesIn = 0;
  std::ifstream *keysIn = 0;
  std::ifstream* thisTimesFile = 0;
  int thispowerfd = 0;

  if(!(filesIn = new std::ifstream(inFile)))
    error("Could not open batch in file", inFile);
  if(key && key!=inFile)
    if(!(keysIn = new std::ifstream(key)))
      error("Could not open batch key file",key);

  unsigned totalVectors=0;
  char *thisFile = new char[MAXSTR];
  char *thisKey = 0;
  if (key && (key != inFile)) {
    thisKey = new char[MAXSTR];
  }
  char *thisTimesFileName = new char[MAXSTR];
  char *thisPowerFileName = new char[MAXSTR];

  do {
    filesIn->getline(thisFile,MAXSTR);
    if(key && key!=inFile) {
      keysIn->getline(thisKey,MAXSTR);
    } else {
      thisKey = thisFile;
    }
    if(usingTimes) {
      timesFile->getline(thisTimesFileName,MAXSTR);
    }
    if(usingPower) {
      powerFile->getline(thisPowerFileName, MAXSTR);
    }
    
    if(filesIn->eof()) {
      break;
    }
    if(usingTimes){
      if(timesFile->eof()) {
        error("not enough timestamp files in timesList", timesFileName);
      }
      thisTimesFile = new std::ifstream(thisTimesFileName,std::ios::in);
    }
    if (usingPower) {
      if(powerFile->eof()) {
        error("not enough power files in powerList", powerFileName);
      }
      thispowerfd = open(thisPowerFileName, O_RDONLY);
      if (thispowerfd < 0) {
        error("failed to open power file", thisPowerFileName);
      }
    }
    insertDatum(thisFile, thisTimesFile, thispowerfd, thisKey);
    if(thisTimesFile)
      delete thisTimesFile;
    if(thispowerfd)
      close(thispowerfd);
  } while(!filesIn->eof());

  VERB_LOG(0, "%s %s %u vectors %ju bytes.\n", COM_BATCHINSERT, dbName, totalVectors, (intmax_t) (totalVectors * dbH->dim * sizeof(double)));

  delete [] thisPowerFileName;
  if(key && (key != inFile)) {
    delete [] thisKey;
  }
  delete [] thisFile;
  delete [] thisTimesFileName;
  
  delete filesIn;
  delete keysIn;

  // Report status
  status(dbName);
}


// BATCHINSERT_LARGE_ADB
//
// This method inserts file pointers into the ADB instance rather than the actual feature data
//
// This method is intended for databases that are large enough to only support indexed query
// So exhaustive searching across all feature vectors will not be performed
//
// We insert featureFileName, [powerFileName], [timesFileName]
//
// l2norms and power sequence sums are calculated on-the-fly at INDEX and --lsh_exact QUERY time
//
// LIMITS:
//
// We impose an upper limit of 1M keys, 1M featureFiles, 1M powerFiles and 1M timesFiles
//
void audioDB::batchinsert_large_adb(const char* dbName, const char* inFile) {

  if(!key)
    key=inFile;
  std::ifstream *filesIn = 0;
  std::ifstream *keysIn = 0;
  std::ifstream* thisTimesFile = 0;
  int thispowerfd = 0;

  if(!(filesIn = new std::ifstream(inFile)))
    error("Could not open batch in file", inFile);
  if(key && key!=inFile)
    if(!(keysIn = new std::ifstream(key)))
      error("Could not open batch key file",key);
  
  if(!usingTimes && (dbH->flags & O2_FLAG_TIMES))
    error("Must use timestamps with timestamped database","use --times");

  if(!usingPower && (dbH->flags & O2_FLAG_POWER))
    error("Must use power with power-enabled database", dbName);

  char *cwd = new char[PATH_MAX];

  if ((getcwd(cwd, PATH_MAX)) == 0) {
    error("error getting working directory", "", "getcwd");
  }

  unsigned totalVectors=0;
  char *thisFile = new char[MAXSTR];
  char *thisKey = 0;
  if (key && (key != inFile)) {
    thisKey = new char[MAXSTR];
  }
  char *thisTimesFileName = new char[MAXSTR];
  char *thisPowerFileName = new char[MAXSTR];

  std::set<std::string> s;

  for (unsigned k = 0; k < dbH->numFiles; k++) {
    s.insert(fileTable + k*O2_FILETABLE_ENTRY_SIZE);
  }

  do {
    filesIn->getline(thisFile,MAXSTR);
    if(key && key!=inFile) {
      keysIn->getline(thisKey,MAXSTR);
    } else {
      thisKey = thisFile;
    }
    if(usingTimes) {
      timesFile->getline(thisTimesFileName,MAXSTR);
    }
    if(usingPower) {
      powerFile->getline(thisPowerFileName, MAXSTR);
    }
    
    if(filesIn->eof()) {
      break;
    }
    
    initInputFile(thisFile, false);

    if(!enough_per_file_space_free()) {
      error("batchinsert failed: no more room for metadata", thisFile);
    }

    if(s.count(thisKey)) {
      VERB_LOG(0, "key already exists in database: %s\n", thisKey);
    } else {
      s.insert(thisKey);
      // Make a track index table of features to file indexes
      unsigned numVectors = (statbuf.st_size-sizeof(int))/(sizeof(double)*dbH->dim);
      if(!numVectors) {
        VERB_LOG(0, "ignoring zero-length feature vector file: %s\n", thisKey);
      }
      else{
	// Check that time-stamp file exists
	if(usingTimes){
	  if(timesFile->eof()) {
	    error("not enough timestamp files in timesList", timesFileName);
	  }
	  thisTimesFile = new std::ifstream(thisTimesFileName,std::ios::in);
	  if(!thisTimesFile->is_open()) {
	    error("Cannot open timestamp file", thisTimesFileName);
	  }
	  if(thisTimesFile)
	    delete thisTimesFile;
	}

	// Check that power file exists        
        if (usingPower) {
          if(powerFile->eof()) {
            error("not enough power files in powerList", powerFileName);
          }
          thispowerfd = open(thisPowerFileName, O_RDONLY);
          if (thispowerfd < 0) {
            error("failed to open power file", thisPowerFileName);
          }
          if (0 < thispowerfd) {
            close(thispowerfd);
          }
        }

	// persist links to the feature files for reading from filesystem later
	
	// Primary Keys
	INSERT_FILETABLE_STRING(fileTable, thisKey);

	if(*thisFile != '/') {
	  /* FIXME: MAXSTR and O2_FILETABLE_ENTRY_SIZE should probably
	     be the same thing.  Also, both are related to PATH_MAX,
	     which admittedly is not always defined or a
	     constant... */
	  char tmp[MAXSTR];
	  strncpy(tmp, thisFile, MAXSTR);
	  snprintf(thisFile, MAXSTR, "%s/%s", cwd, tmp);
	}
	// Feature Vector fileNames
	INSERT_FILETABLE_STRING(featureFileNameTable, thisFile);
	
	// Time Stamp fileNames
	if(usingTimes) {
	  if(*thisTimesFileName != '/') {
	    char tmp[MAXSTR];
	    strncpy(tmp, thisTimesFileName, MAXSTR);
	    snprintf(thisTimesFileName, MAXSTR, "%s/%s", cwd, tmp);
	  }
	  INSERT_FILETABLE_STRING(timesFileNameTable, thisTimesFileName);
	}

	// Power fileNames
	if(usingPower) {
	  if(*thisPowerFileName != '/') {
	    char tmp[MAXSTR];
	    strncpy(tmp, thisPowerFileName, MAXSTR);
	    snprintf(thisPowerFileName, MAXSTR, "%s/%s", cwd, tmp);
	  }
	  INSERT_FILETABLE_STRING(powerFileNameTable, thisPowerFileName);
	}

	// Increment file count
	dbH->numFiles++;  
  
	// Update Header information
	dbH->length+=(statbuf.st_size-sizeof(int));
  
	// Update track to file index map
	memcpy (trackTable+dbH->numFiles-1, &numVectors, sizeof(unsigned));  

	totalVectors+=numVectors;

	// Copy the header back to the database
	memcpy (db, dbH, sizeof(dbTableHeaderT));  
      }
    }
    // CLEAN UP
    if(indata)
      munmap(indata,statbuf.st_size);
    if(infid>0)
      close(infid);
  } while(!filesIn->eof());

  VERB_LOG(0, "%s %s %u vectors %ju bytes.\n", COM_BATCHINSERT, dbName, totalVectors, (intmax_t) (totalVectors * dbH->dim * sizeof(double)));

  delete [] thisPowerFileName;
  if(key && (key != inFile)) {
    delete [] thisKey;
  }
  delete [] thisFile;
  delete [] thisTimesFileName;
  
  delete filesIn;
  delete keysIn;

  // Report status
  status(dbName);
}