view insert.cpp @ 408:f0a69693eaef api-inversion

The lesser of two evils, part 1. Most of the body of audiodb_insert_datum() will apply to "LARGE_ADB"-type insertions: checking for the right flags, checking for enough space free, synchronizing the header. Wouldn't it be nice if we could reuse all that code (or at least the bits that apply) without one horrible almost-identical cut-and-paste job (see batchinsert_large_adb(), or if that's not compelling enough, the four almost-identical query loops from before the Great Refactoring). Well, yes, it would. Sadly C makes it mildly difficult, because its functions are explicitly typed (so we can't pass arbitrary arguments of other types, even if they're ABI-compatible), while its macros are textual (which makes writing and maintaining them horrible). The thought of a union argument was briefly entertained and then discarded as being just Too Weird. So, instead, (ab)use the oldest trick in the book: void *. Define an adb_datum_internal_t which has void * instead of double *; the intention is that this internal data type can be constructed both from an adb_datum_t and some notional adb_reference_t (which looks very much like an adb_insert_t at the time of writing, with char * structure entries representing filenames). This adb_datum_internal_t structure is very much an internals-only thing, so put its definition in the internals header. Call what was previously audiodb_insert_datum() a new function audiodb_insert_datum_internal(), made static so that really no-one is tempted to call it other than ourselves. audiodb_insert_datum() is then trivial in terms of this new function, if stupidly tedious. (If we were playing dangerously, we could just perform a cast, but relying on the fact that sizeof(double *) = sizeof(void *) would almost certainly end up biting when we least expect. Incidental inclusion in this patch, since I noticed it at the time: actually check for the O2_FLAG_L2NORM before scribbling all over the l2norm table. Somewhat unsurprisingly, there are as yet no tests to defend against this (harmless, as it turns out) erroneous behaviour.
author mas01cr
date Tue, 09 Dec 2008 20:53:39 +0000
parents c279adeb47f4
children 99e6cbad7f76
line wrap: on
line source
#include "audioDB.h"
extern "C" {
#include "audioDB_API.h"
}
#include "audioDB-internals.h"

static bool audiodb_enough_data_space_free(adb_t *adb, off_t size) {
  adb_header_t *header = adb->header;
  /* FIXME: timesTableOffset isn't necessarily the next biggest offset
     after dataOffset.  Maybe make the offsets into an array that we
     can iterate over... */
  return (header->timesTableOffset > 
          header->dataOffset + header->length + size);
}

static bool audiodb_enough_per_file_space_free(adb_t *adb) {
  /* FIXME: the comment above about the ordering of the tables applies
     here too. */
  adb_header_t *header = adb->header;
  off_t file_table_length = header->trackTableOffset - header->fileTableOffset;
  off_t track_table_length = header->dataOffset - header->trackTableOffset;
  int fmaxfiles = file_table_length / O2_FILETABLE_ENTRY_SIZE;
  int tmaxfiles = track_table_length / O2_TRACKTABLE_ENTRY_SIZE;
  /* maxfiles is the _minimum_ of the two.  Do not be confused... */
  unsigned int maxfiles = fmaxfiles > tmaxfiles ? tmaxfiles : fmaxfiles;
  return (header->numFiles < maxfiles);
}

/*
 * Hey, look, a comment.  Normally I wouldn't bother, as the code
 * should be self-documenting, but a lot of logic is concentrated in
 * this one place, so let's give an overview beforehand.  To insert a
 * datum into the database, we:
 *
 *  1. check write permission;
 *  2. check !O2_FLAG_LARGE_ADB;
 *  3. check for enough space;
 *  4. check that datum->dim and adb->header->dim agree (or that the
 *     header dimension is zero, in which case write datum->dim to
 *     adb->header->dim).
 *  5. check for presence of datum->key in adb->keys;
 *  6. check for consistency between power and O2_FLAG_POWER, and 
 *     times and O2_FLAG_TIMES;
 *  7. write in data, power, times as appropriate; add to track
 *     and key tables too;
 *  8. if O2_FLAG_L2NORM, compute norms and fill in table;
 *  9. update adb->keys and adb->header;
 * 10. sync adb->header with disk.
 *
 * Step 10 essentially commits the transaction; until we update
 * header->length, nothing will recognize the newly-written data.  In
 * principle, if it fails, we should roll back, which we can in fact
 * do on the assumption that nothing in step 9 can ever fail; on the
 * other hand, if it's failed, then it's unlikely that rolling back by
 * syncing the original header back to disk is going to work
 * desperately well.  We should perhaps take an operating-system lock
 * around step 10, so that we can't be interrupted part-way through
 * (except of course for SIGKILL, but if we're hit with that we will
 * always lose).
 */
static int audiodb_insert_datum_internal(adb_t *adb, adb_datum_internal_t *datum) {

  off_t size, offset, nfiles;
  double *l2norm_buffer, *lp, *dp;

  /* 1. check write permission; */
  if(!(adb->flags & O_RDWR)) {
    return 1;
  }
  /* 2. check !O2_FLAG_LARGE_ADB; */
  if(adb->header->flags & O2_FLAG_LARGE_ADB) {
    return 1;
  }
  /* 3. check for enough space; */
  size = sizeof(double) * datum->nvectors * datum->dim;
  if(!audiodb_enough_data_space_free(adb, size)) {
    return 1;
  }
  if(!audiodb_enough_per_file_space_free(adb)) {
    return 1;
  }
  /* 4. check that datum->dim and adb->header->dim agree (or that the
   *    header dimension is zero, in which case write datum->dim to
   *    adb->header->dim).
   */
  if(adb->header->dim == 0) {
    adb->header->dim = datum->dim;
  } else if (adb->header->dim != datum->dim) {
    return 1;
  }
  /* 5. check for presence of datum->key in adb->keys; */
  if(adb->keys->count(datum->key)) {
    /* not part of an explicit API/ABI, but we need a distinguished
       value in this circumstance to preserve somewhat wonky behaviour
       of audioDB::batchinsert. */
    return 2;
  }
  /* 6. check for consistency between power and O2_FLAG_POWER, and
   *    times and O2_FLAG_TIMES; 
   */
  if((datum->power && !(adb->header->flags & O2_FLAG_POWER)) ||
     ((adb->header->flags & O2_FLAG_POWER) && !datum->power)) {
    return 1;
  }
  if(datum->times && !(adb->header->flags & O2_FLAG_TIMES)) {
    if(adb->header->numFiles == 0) {
      adb->header->flags |= O2_FLAG_TIMES;
    } else {
      return 1;
    }
  } else if ((adb->header->flags & O2_FLAG_TIMES) && !datum->times) {
    return 1;
  }
  /* 7. write in data, power, times as appropriate; add to track
   *    and key tables too;
   */
  offset = adb->header->length;
  nfiles = adb->header->numFiles;

  /* FIXME: checking for all these lseek()s and write()s */
  lseek(adb->fd, adb->header->dataOffset + offset, SEEK_SET);
  write(adb->fd, datum->data, sizeof(double) * datum->nvectors * datum->dim);
  if(datum->power) {
    lseek(adb->fd, adb->header->powerTableOffset + offset / datum->dim, SEEK_SET);
    write(adb->fd, datum->power, sizeof(double) * datum->nvectors);
  }
  if(datum->times) {
    lseek(adb->fd, adb->header->timesTableOffset + offset / datum->dim * 2, SEEK_SET);
    write(adb->fd, datum->times, sizeof(double) * datum->nvectors * 2);
  }
  lseek(adb->fd, adb->header->trackTableOffset + nfiles * O2_TRACKTABLE_ENTRY_SIZE, SEEK_SET);
  write(adb->fd, &datum->nvectors, O2_TRACKTABLE_ENTRY_SIZE);
  lseek(adb->fd, adb->header->fileTableOffset + nfiles * O2_FILETABLE_ENTRY_SIZE, SEEK_SET);
  write(adb->fd, datum->key, strlen(datum->key)+1);

  /* 8. if O2_FLAG_L2NORM, compute norms and fill in table; */
  if(adb->header->flags & O2_FLAG_L2NORM) {
    l2norm_buffer = (double *) malloc(datum->nvectors * sizeof(double));
    
    /* FIXME: shared code with audiodb_norm_existing() */
    dp = (double *) datum->data;
    lp = l2norm_buffer;
    for(size_t i = 0; i < datum->nvectors; i++) {
      *lp = 0;
      for(unsigned int k = 0; k < datum->dim; k++) {
        *lp += (*dp)*(*dp);
        dp++;
      }
      lp++;
    }
    lseek(adb->fd, adb->header->l2normTableOffset + offset / datum->dim, SEEK_SET);
    write(adb->fd, l2norm_buffer, sizeof(double) * datum->nvectors);
    free(l2norm_buffer);
  }

  adb->keys->insert(datum->key);
  adb->header->numFiles += 1;
  adb->header->length += sizeof(double) * datum->nvectors * datum->dim;

  return audiodb_sync_header(adb);

 error:
  return 1;
}

int audiodb_insert_datum(adb_t *adb, adb_datum_t *datum) {
  adb_datum_internal_t d;
  d.nvectors = datum->nvectors;
  d.dim = datum->dim;
  d.key = datum->key;
  d.data = datum->data;
  d.times = datum->times;
  d.power = datum->power;
  return audiodb_insert_datum_internal(adb, &d);
}

bool audioDB::enough_per_file_space_free() {
  unsigned int fmaxfiles, tmaxfiles;
  unsigned int maxfiles;

  fmaxfiles = fileTableLength / O2_FILETABLE_ENTRY_SIZE;
  tmaxfiles = trackTableLength / O2_TRACKTABLE_ENTRY_SIZE;
  maxfiles = fmaxfiles > tmaxfiles ? tmaxfiles : fmaxfiles;
  return(dbH->numFiles < maxfiles);
}

static int audiodb_free_datum(adb_datum_t *datum) {
  if(datum->data) {
    free(datum->data);
  }
  if(datum->power) {
    free(datum->power);
  }
  if(datum->times) {
    free(datum->times);
  }
  return 0;
}

static int audiodb_insert_create_datum(adb_insert_t *insert, adb_datum_t *datum) {
  int fd = 0;
  FILE *file = NULL;
  struct stat st;
  off_t size;

  datum->data = NULL;
  datum->power = NULL;
  datum->times = NULL;
  if((fd = open(insert->features, O_RDONLY)) == -1) {
    goto error;
  }
  if(fstat(fd, &st)) {
    goto error;
  }
  read(fd, &(datum->dim), sizeof(uint32_t));
  size = st.st_size - sizeof(uint32_t);
  datum->nvectors = size / (sizeof(double) * datum->dim);
  datum->data = (double *) malloc(size);
  if(!datum->data) {
    goto error;
  }
  read(fd, datum->data, size);
  close(fd);
  fd = 0;
  if(insert->power) {
    int dim;
    if((fd = open(insert->power, O_RDONLY)) == -1) {
      goto error;
    }
    if(fstat(fd, &st)) {
      goto error;
    }
    if((st.st_size - sizeof(uint32_t)) != (size / datum->dim)) {
      goto error;
    }
    read(fd, &dim, sizeof(uint32_t));
    if(dim != 1) {
      goto error;
    }
    datum->power = (double *) malloc(size / datum->dim);
    if(!datum->power) {
      goto error;
    }
    read(fd, datum->power, size / datum->dim);
    close(fd);
  }
  if(insert->times) {
    double t, *tp;
    if(!(file = fopen(insert->times, "r"))) {
      goto error;
    }
    datum->times = (double *) malloc(2 * size / datum->dim);
    if(!datum->times) {
      goto error;
    }
    if(fscanf(file, " %lf", &t) != 1) {
      goto error;
    }
    tp = datum->times;
    *tp++ = t;
    for(unsigned int n = 0; n < datum->nvectors - 1; n++) {
      if(fscanf(file, " %lf", &t) != 1) {
        goto error;
      }
      *tp++ = t;
      *tp++ = t;
    }
    if(fscanf(file, " %lf", &t) != 1) {
      goto error;
    }
    *tp = t;
    fclose(file);
  }
  datum->key = insert->key ? insert->key : insert->features;
  return 0;

 error:
  if(fd > 0) {
    close(fd);
  }
  if(file) {
    fclose(file);
  }
  audiodb_free_datum(datum);
  return 1;
}

int audiodb_insert(adb_t *adb, adb_insert_t *insert) {
  if(adb->header->flags & O2_FLAG_LARGE_ADB) {
    return 1;
  } else {
    adb_datum_t datum;
    int err;

    if(audiodb_insert_create_datum(insert, &datum)) {
      return 1;
    }
    err = audiodb_insert_datum(adb, &datum);
    audiodb_free_datum(&datum);

    if(err == 2) {
      return 0;
    }
    else {
      return err;
    }
  }
}

int audiodb_batchinsert(adb_t *adb, adb_insert_t *insert, unsigned int size) {
  int err;
  for(unsigned int n = 0; n < size; n++) {
    if((err = audiodb_insert(adb, &(insert[n])))) {
      return err;
    }
  }
  return 0;
}

void audioDB::insert(const char* dbName, const char* inFile) {
  if(!adb) {
    if(!(adb = audiodb_open(dbName, O_RDWR))) {
      error("failed to open database", dbName);
    }
  }

  /* at this point, we have powerfd (an fd), timesFile (a
   * std::ifstream *) and inFile (a char *).  Wacky, huh?  Ignore
   * the wackiness and just use the names. */
  adb_insert_t insert;
  insert.features = inFile;
  insert.times = timesFileName;
  insert.power = powerFileName;
  insert.key = key;

  if(audiodb_insert(adb, &insert)) {
    error("insertion failure", inFile);
  }
  status(dbName);
}

void audioDB::batchinsert(const char* dbName, const char* inFile) {
  forWrite = true;
  initDBHeader(dbName);

  // Treat large ADB instances differently
  if( dbH->flags & O2_FLAG_LARGE_ADB ){
    batchinsert_large_adb(dbName, inFile) ;
    return;
  }
    
  if(!key)
    key=inFile;
  std::ifstream *filesIn = 0;
  std::ifstream *keysIn = 0;

  if(!(filesIn = new std::ifstream(inFile)))
    error("Could not open batch in file", inFile);
  if(key && key!=inFile)
    if(!(keysIn = new std::ifstream(key)))
      error("Could not open batch key file",key);

  unsigned totalVectors=0;
  char *thisFile = new char[MAXSTR];
  char *thisKey = 0;
  if (key && (key != inFile)) {
    thisKey = new char[MAXSTR];
  }
  char *thisTimesFileName = new char[MAXSTR];
  char *thisPowerFileName = new char[MAXSTR];

  do {
    filesIn->getline(thisFile,MAXSTR);
    if(key && key!=inFile) {
      keysIn->getline(thisKey,MAXSTR);
    } else {
      thisKey = thisFile;
    }
    if(usingTimes) {
      timesFile->getline(thisTimesFileName,MAXSTR);
    }
    if(usingPower) {
      powerFile->getline(thisPowerFileName, MAXSTR);
    }
    
    if(filesIn->eof()) {
      break;
    }
    if(usingTimes){
      if(timesFile->eof()) {
        error("not enough timestamp files in timesList", timesFileName);
      }
    }
    if (usingPower) {
      if(powerFile->eof()) {
        error("not enough power files in powerList", powerFileName);
      }
    }
    adb_insert_t insert;
    insert.features = thisFile;
    insert.times = usingTimes ? thisTimesFileName : NULL;
    insert.power = usingPower ? thisPowerFileName : NULL;
    insert.key = thisKey;
    if(audiodb_insert(adb, &insert)) {
      error("insertion failure", thisFile);
    }
  } while(!filesIn->eof());

  VERB_LOG(0, "%s %s %u vectors %ju bytes.\n", COM_BATCHINSERT, dbName, totalVectors, (intmax_t) (totalVectors * dbH->dim * sizeof(double)));

  delete [] thisPowerFileName;
  if(key && (key != inFile)) {
    delete [] thisKey;
  }
  delete [] thisFile;
  delete [] thisTimesFileName;
  
  delete filesIn;
  delete keysIn;

  // Report status
  status(dbName);
}


// BATCHINSERT_LARGE_ADB
//
// This method inserts file pointers into the ADB instance rather than the actual feature data
//
// This method is intended for databases that are large enough to only support indexed query
// So exhaustive searching across all feature vectors will not be performed
//
// We insert featureFileName, [powerFileName], [timesFileName]
//
// l2norms and power sequence sums are calculated on-the-fly at INDEX and --lsh_exact QUERY time
//
// LIMITS:
//
// We impose an upper limit of 1M keys, 1M featureFiles, 1M powerFiles and 1M timesFiles
//
void audioDB::batchinsert_large_adb(const char* dbName, const char* inFile) {

  if(!key)
    key=inFile;
  std::ifstream *filesIn = 0;
  std::ifstream *keysIn = 0;
  std::ifstream* thisTimesFile = 0;
  int thispowerfd = 0;

  if(!(filesIn = new std::ifstream(inFile)))
    error("Could not open batch in file", inFile);
  if(key && key!=inFile)
    if(!(keysIn = new std::ifstream(key)))
      error("Could not open batch key file",key);
  
  if(!usingTimes && (dbH->flags & O2_FLAG_TIMES))
    error("Must use timestamps with timestamped database","use --times");

  if(!usingPower && (dbH->flags & O2_FLAG_POWER))
    error("Must use power with power-enabled database", dbName);

  char *cwd = new char[PATH_MAX];

  if ((getcwd(cwd, PATH_MAX)) == 0) {
    error("error getting working directory", "", "getcwd");
  }

  unsigned totalVectors=0;
  char *thisFile = new char[MAXSTR];
  char *thisKey = 0;
  if (key && (key != inFile)) {
    thisKey = new char[MAXSTR];
  }
  char *thisTimesFileName = new char[MAXSTR];
  char *thisPowerFileName = new char[MAXSTR];

  std::set<std::string> s;

  for (unsigned k = 0; k < dbH->numFiles; k++) {
    s.insert(fileTable + k*O2_FILETABLE_ENTRY_SIZE);
  }

  do {
    filesIn->getline(thisFile,MAXSTR);
    if(key && key!=inFile) {
      keysIn->getline(thisKey,MAXSTR);
    } else {
      thisKey = thisFile;
    }
    if(usingTimes) {
      timesFile->getline(thisTimesFileName,MAXSTR);
    }
    if(usingPower) {
      powerFile->getline(thisPowerFileName, MAXSTR);
    }
    
    if(filesIn->eof()) {
      break;
    }
    
    initInputFile(thisFile, false);

    if(!enough_per_file_space_free()) {
      error("batchinsert failed: no more room for metadata", thisFile);
    }

    if(s.count(thisKey)) {
      VERB_LOG(0, "key already exists in database: %s\n", thisKey);
    } else {
      s.insert(thisKey);
      // Make a track index table of features to file indexes
      unsigned numVectors = (statbuf.st_size-sizeof(int))/(sizeof(double)*dbH->dim);
      if(!numVectors) {
        VERB_LOG(0, "ignoring zero-length feature vector file: %s\n", thisKey);
      }
      else{
	// Check that time-stamp file exists
	if(usingTimes){
	  if(timesFile->eof()) {
	    error("not enough timestamp files in timesList", timesFileName);
	  }
	  thisTimesFile = new std::ifstream(thisTimesFileName,std::ios::in);
	  if(!thisTimesFile->is_open()) {
	    error("Cannot open timestamp file", thisTimesFileName);
	  }
	  if(thisTimesFile)
	    delete thisTimesFile;
	}

	// Check that power file exists        
        if (usingPower) {
          if(powerFile->eof()) {
            error("not enough power files in powerList", powerFileName);
          }
          thispowerfd = open(thisPowerFileName, O_RDONLY);
          if (thispowerfd < 0) {
            error("failed to open power file", thisPowerFileName);
          }
          if (0 < thispowerfd) {
            close(thispowerfd);
          }
        }

	// persist links to the feature files for reading from filesystem later
	
	// Primary Keys
	INSERT_FILETABLE_STRING(fileTable, thisKey);

	if(*thisFile != '/') {
	  /* FIXME: MAXSTR and O2_FILETABLE_ENTRY_SIZE should probably
	     be the same thing.  Also, both are related to PATH_MAX,
	     which admittedly is not always defined or a
	     constant... */
	  char tmp[MAXSTR];
	  strncpy(tmp, thisFile, MAXSTR);
	  snprintf(thisFile, MAXSTR, "%s/%s", cwd, tmp);
	}
	// Feature Vector fileNames
	INSERT_FILETABLE_STRING(featureFileNameTable, thisFile);
	
	// Time Stamp fileNames
	if(usingTimes) {
	  if(*thisTimesFileName != '/') {
	    char tmp[MAXSTR];
	    strncpy(tmp, thisTimesFileName, MAXSTR);
	    snprintf(thisTimesFileName, MAXSTR, "%s/%s", cwd, tmp);
	  }
	  INSERT_FILETABLE_STRING(timesFileNameTable, thisTimesFileName);
	}

	// Power fileNames
	if(usingPower) {
	  if(*thisPowerFileName != '/') {
	    char tmp[MAXSTR];
	    strncpy(tmp, thisPowerFileName, MAXSTR);
	    snprintf(thisPowerFileName, MAXSTR, "%s/%s", cwd, tmp);
	  }
	  INSERT_FILETABLE_STRING(powerFileNameTable, thisPowerFileName);
	}

	// Increment file count
	dbH->numFiles++;  
  
	// Update Header information
	dbH->length+=(statbuf.st_size-sizeof(int));
  
	// Update track to file index map
	memcpy (trackTable+dbH->numFiles-1, &numVectors, sizeof(unsigned));  

	totalVectors+=numVectors;

	// Copy the header back to the database
	memcpy (db, dbH, sizeof(dbTableHeaderT));  
      }
    }
    // CLEAN UP
    if(indata)
      munmap(indata,statbuf.st_size);
    if(infid>0)
      close(infid);
  } while(!filesIn->eof());

  VERB_LOG(0, "%s %s %u vectors %ju bytes.\n", COM_BATCHINSERT, dbName, totalVectors, (intmax_t) (totalVectors * dbH->dim * sizeof(double)));

  delete [] thisPowerFileName;
  if(key && (key != inFile)) {
    delete [] thisKey;
  }
  delete [] thisFile;
  delete [] thisTimesFileName;
  
  delete filesIn;
  delete keysIn;

  // Report status
  status(dbName);
}