view src/sc_evaluator.cpp @ 0:add35537fdbb tip

Initial import
author irh <ian.r.hobson@gmail.com>
date Thu, 25 Aug 2011 11:05:55 +0100
parents
children
line wrap: on
line source
//  Copyright 2011, Ian Hobson.
//
//  This file is part of gpsynth.
//
//  gpsynth is free software: you can redistribute it and/or modify
//  it under the terms of the GNU General Public License as published by
//  the Free Software Foundation, either version 3 of the License, or
//  (at your option) any later version.
//
//  gpsynth is distributed in the hope that it will be useful,
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//  GNU General Public License for more details.
//
//  You should have received a copy of the GNU General Public License
//  along with gpsynth in the file COPYING. 
//  If not, see http://www.gnu.org/licenses/.

#include "sc_evaluator.hpp"

#include "boost_ex.hpp"
#include "logger.hpp"
#include "sc_converter.hpp"
#include "statistics.hpp"

#include "boost/filesystem.hpp"
#include "boost/format.hpp"
#include "boost/lexical_cast.hpp"
#include "boost/process.hpp"
#include "boost/thread.hpp"
#include "boost/math/special_functions/fpclassify.hpp"

#include <algorithm>
#include <cmath>
#include <functional>
#include <numeric>

namespace bp = boost::process;

namespace sc {
  
  const int g_max_file_wait_time_ms = 10000;
  const int g_file_wait_increment_ms = 500;
  const int g_sclang_timeout_s = 10;
  const double g_fitness_failure_value = 100;
  const std::string g_score_variable_name = "x";
 
// Worker class, runs in own thread
class EvaluatorWorker {
  sc::Converter converter_;
  int thread_id_;
  bool quit_threads_;
  Evaluator* parent_;
  dsp::FileComparer file_comparer_;
  // work thread communicates with parent, handles work queue
  boost::thread work_thread_;
  // render thread launches child processes
  boost::thread render_thread_;
  boost::mutex render_mutex_;
  // render duration in seconds
  double render_length_;
  std::string render_command_path_;
  std::string render_osc_path_;
  std::string render_output_path_;
  // flag for render thread that a synth is ready for rendering
  bool do_render_;
  // condition to notify render thread that work is available
  boost::condition_variable render_start_;
  // condition to notify work thread that work has been done
  boost::condition_variable render_finished_;
  // stores handle to active child process
  boost::shared_ptr<bp::child> render_process_;
  
public:
  EvaluatorWorker(Evaluator* parent,
                  int thread_id,
                  const dsp::FileComparer& target)
  : parent_(parent),
    thread_id_(thread_id),
    converter_(parent->grammar_),
    quit_threads_(false),
    file_comparer_(target),
    render_length_(file_comparer_.TargetDuration()),
    do_render_(false)
  {
    work_thread_ = boost::thread(&EvaluatorWorker::DoWork, this);
    render_thread_ = boost::thread(&EvaluatorWorker::DoSCRender, this);
  }
  
  ~EvaluatorWorker() {
    // tell threads to quit
    quit_threads_ = true;
    // work thread
    try {
      boost::system_time timeout = boost::get_system_time();
      timeout += boost::posix_time::seconds(g_sclang_timeout_s);
      parent_->queue_condition_.notify_all();
      work_thread_.timed_join(timeout);
      // render thread
      timeout = boost::get_system_time();
      timeout += boost::posix_time::seconds(g_sclang_timeout_s);
      render_start_.notify_one();
      render_thread_.timed_join(timeout);      
    }
    catch (const std::exception& e) {
      std::stringstream message;
      message << "~sc::EvaluatorWorker - Exception: " << e.what() << '\n';
      WorkerLog(message.str());
    }
  }
  
private:
  
  void DoWork() {
    while (!quit_threads_) {
      // wait for some work
      {
        boost::mutex::scoped_lock lock(parent_->work_mutex_);
        while (parent_->work_queue_.empty() && !quit_threads_) {
          parent_->queue_condition_.wait(lock);
        }
      }
      // keep going until the work queue is empty 
      while (!quit_threads_) {
        sg::Graph* graph = NULL;
        {
          // check there's still work to do
          boost::mutex::scoped_lock lock(parent_->work_mutex_);
          if (parent_->work_queue_.empty()) {
            // wait for all threads to reach this point before continuing
            lock.unlock();
            parent_->work_done_barrier_->wait();
            lock.lock();
            // notify main thread that work's finished
            parent_->work_done_condition_.notify_one();
            break;
          }
          // get the next graph from the queue
          graph = parent_->work_queue_.front();
          parent_->work_queue_.pop();
        }
        // get the graph properties
        sg::Graph& graph_ref = *graph;
        sg::GraphProperties& graph_properties = graph_ref[boost::graph_bundle];
        double fitness;
        try {
          // rate the graph
          fitness = RateGraph(graph_ref);
        } catch (const std::exception& e) {
          std::stringstream message;
          message << "Exception: " << e.what() << '\n';
          WorkerLog(message.str());
          fitness = g_fitness_failure_value;
        }
        if (boost::math::isinf(fitness)) {
          fitness = g_fitness_failure_value;
        } else if (boost::math::isnan(fitness)) {
          fitness = g_fitness_failure_value;
        }
        graph_properties.fitness_ = fitness;
//        std::stringstream message;
//        message << "Rated " << graph_properties.id_ << ":" << fitness << '\n';
//        WorkerLog(message.str());
        {
          // decrement the graph queue count
          boost::mutex::scoped_lock lock(parent_->work_mutex_);
          parent_->graphs_to_rate_--;
          parent_->work_done_condition_.notify_one();
        }
      }
    }
  }
  
  void WorkerLog(const std::string& message) {
    std::stringstream formatter;
    formatter << "\n[" << thread_id_ << "] " << message;
    logger::Log(formatter.str());
  }
  
  double RateGraph(sg::Graph& graph) {
    int id = graph[boost::graph_bundle].id_;
    std::stringstream name;
    name << "gpsynth_"
         << std::setw(parent_->synth_name_zero_pad_length_) << std::setfill('0')
         << id;
    std::string synth_name = name.str();
    // convert the graph to a synthdef and score
    std::string synthdef = converter_.ToSynthDef(graph,
                                                 synth_name,
                                                 false,
                                                 parent_->synthdef_folder_);
    std::string score = converter_.ToScore(graph,
                                           synth_name,
                                           g_score_variable_name,
                                           false, // prepare for non-realtime
                                           parent_->synthdef_folder_,
                                           render_length_);
    {
      // prepare the synthdef and score for the render thread
      boost::mutex::scoped_lock lock(render_mutex_);
      render_command_path_ = parent_->sc_folder_ + synth_name + ".sc";
      render_osc_path_ = parent_->osc_folder_ + synth_name + ".osc";
      render_output_path_ = parent_->audio_folder_+ synth_name + ".aiff";
      // prepare render script for sclang
      std::ofstream command_file(render_command_path_.c_str());
      // write the synthdef and the score
      command_file << synthdef << '\n' << score;
      // tell sclang to write the score as OSC data
      command_file << "Score.write(" << g_score_variable_name << ", \"" 
                   << render_osc_path_ << "\");\n";
      // tell sclang to quit at end of script
      command_file << "\n0.exit;";
      // finished, close the file
      command_file.close();
      // tell the render thread that work is available
      do_render_ = true;
      boost::system_time timeout = boost::get_system_time();
      timeout += boost::posix_time::seconds(g_sclang_timeout_s);
      render_start_.notify_one();
      // wait for the render thread to complete the job
      if (!render_finished_.timed_wait(lock, timeout)) {
        // sclang or scsynth timed out, kill process and set fitness to failure
        // WorkerLog("Render thread timed out.\n");
        try {
          render_process_->terminate();
        }
        catch (boost::system::system_error& e) {
          // don't need to do anything here, process may already be terminated.
        }
        do_render_ = false;
        // maximum error
        return g_fitness_failure_value;
      }
    }
    // check that a rendered file has been produced
    if (!boost::filesystem::exists(render_output_path_)) {
      // file missing, something's gone wrong..
      WorkerLog("Render output missing.\n");
      return g_fitness_failure_value;
    }
    // store render path with graph for future reference
    graph[boost::graph_bundle].render_path_ = render_output_path_;
    // compare the rendered file to the target
    return CompareToTarget(render_output_path_);
  }
  
  void DoSCRender() {
    std::vector<std::string> args;
    while (!quit_threads_) {
      // wait for a command to render
      {
        boost::mutex::scoped_lock lock(render_mutex_);
        do_render_ = false;
        while (!do_render_ && !quit_threads_) {
          render_start_.wait(lock);
        }
        if (quit_threads_) {
          return;
        }
      }
      // Call sclang to get OSC command for scsynth
      args.clear();
      args.push_back("-d");
      args.push_back(parent_->sc_app_path_);
      args.push_back(render_command_path_);
      // replace process pipes to suppress output
      bp::context closed_streams;
      closed_streams.streams[bp::stdout_id] = bp::behavior::close();
      closed_streams.streams[bp::stderr_id] = bp::behavior::close();
      bp::child process = bp::create_child(parent_->sclang_path_,
                                           args,
                                           closed_streams);
      // store process handle to allow work thread to kill process if it's hung
      {
        boost::mutex::scoped_lock lock(render_mutex_);
        render_process_.reset(new bp::child(process));
      }
      // wait for sclang to finish
      process.wait();
      if (!do_render_) {
        // if do_render_ is false, process was killed by work thread 
        continue;
      }
      // check that sclang generated an osc command
      if (!boost::filesystem::exists(render_osc_path_)) {
        // sclang failed, notify work thread and continue
        boost::mutex::scoped_lock lock(render_mutex_);
        render_finished_.notify_one();
        continue;
      }
      // render osc with scsynth
      args.clear();
      args.push_back("-N");
      args.push_back(render_osc_path_);
      args.push_back("_"); // no input file
      args.push_back(render_output_path_);
      args.push_back("44100");
      args.push_back("AIFF");
      args.push_back("int16");
      // output channels
      args.push_back("-o");
      args.push_back("1");
      // don't publish to rendevous
      args.push_back("-R");
      args.push_back("0");
      // don't load synthdefs
      args.push_back("-D");
      args.push_back("0");
      // plugins path
      args.push_back("-U");
      args.push_back(parent_->sc_plugins_path_);
      // call scsynth
      process = bp::create_child(parent_->scsynth_path_, args, closed_streams);
      {
        boost::mutex::scoped_lock lock(render_mutex_);
        render_process_.reset(new bp::child(process));
      }
      // wait for scsynth to finish rendering
      process.wait();
      if (!do_render_) {
        // process timed out, killed by work thread
        continue;
      }
      // job done, notify work thread
      {
        boost::mutex::scoped_lock lock(render_mutex_);
        render_finished_.notify_one();
      }
    }
  }
  
  double CompareToTarget(const std::string& file_path) {
    return file_comparer_.CompareFile(file_path);
  }
};


////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

Evaluator::Evaluator(const sc::Grammar& grammar,
                     const std::string& sc_app_path,
                     const dsp::FileComparer& target,
                     int core_limit /* = 0 */)
: grammar_(grammar),
  listener_(NULL),
  sc_app_path_(sc_app_path),
  sc_plugins_path_(sc_app_path + "/plugins"),
  sclang_path_(sc_app_path + "/sclang"),
  scsynth_path_(sc_app_path + "/scsynth")
{
  SetWorkFolder("/tmp/");
  int workers = boost::thread::hardware_concurrency();
  if (core_limit > 0 && core_limit < workers) {
    workers = core_limit;
  }
  work_done_barrier_.reset(new boost::barrier(workers));
  for (int worker_id = 0; worker_id < workers; worker_id++) {
    workers_.push_back(EvaluatorWorkerPtr(new EvaluatorWorker(this,
                                                              worker_id,
                                                              target)));
  }
}

Evaluator::~Evaluator()
{}


void Evaluator::RateGraphs(std::vector<sg::Graph>& graphs) {
  // make sure necessary subfolders exist in work folder
  audio_folder_ = work_folder_ + "/audio/";
  osc_folder_ = work_folder_ + "/osc/";
  sc_folder_ = work_folder_ + "/sc/";
  synthdef_folder_ = work_folder_ + "/synthdefs/";
  boost::filesystem::create_directories(audio_folder_);
  boost::filesystem::create_directories(osc_folder_);
  boost::filesystem::create_directories(sc_folder_);
  boost::filesystem::create_directories(synthdef_folder_);
  // find zero padding length for synth names
  synth_name_zero_pad_length_ = std::log10(graphs.size()) + 1;
  // store the graphs in the work queue
  boost::mutex::scoped_lock lock(work_mutex_);
  graphs_to_rate_ = static_cast<int>(graphs.size());
  for (int i = 0; i < graphs_to_rate_; i++) {
    work_queue_.push(&graphs[i]);
  }
  // wait for graphs to be rated
  std::size_t last_rated_count = 0;
  while (graphs_to_rate_ > 0) {
    queue_condition_.notify_all();
    work_done_condition_.wait(lock);
    if (listener_ != NULL) {
      std::size_t rated_count = graphs.size() - graphs_to_rate_;
      if (rated_count != last_rated_count) {
        last_rated_count = rated_count;
        listener_->GraphRatedNotification(rated_count);
      }
    }
  }
  // remove OSC folder, no longer required
  boost::filesystem::remove_all(osc_folder_);
}
  
void Evaluator::SetWorkFolder(const std::string& path) {
  work_folder_ = path;
}
  
} // sc namespace