cannam@111: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ c@118: /* c@118: Piper C++ c@118: c@118: An API for audio analysis and feature extraction plugins. c@118: c@118: Centre for Digital Music, Queen Mary, University of London. c@118: Copyright 2006-2016 Chris Cannam and QMUL. c@118: c@118: Permission is hereby granted, free of charge, to any person c@118: obtaining a copy of this software and associated documentation c@118: files (the "Software"), to deal in the Software without c@118: restriction, including without limitation the rights to use, copy, c@118: modify, merge, publish, distribute, sublicense, and/or sell copies c@118: of the Software, and to permit persons to whom the Software is c@118: furnished to do so, subject to the following conditions: c@118: c@118: The above copyright notice and this permission notice shall be c@118: included in all copies or substantial portions of the Software. c@118: c@118: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, c@118: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF c@118: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND c@118: NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR c@118: ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF c@118: CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION c@118: WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. c@118: c@118: Except as contained in this notice, the names of the Centre for c@118: Digital Music; Queen Mary, University of London; and Chris Cannam c@118: shall not be used in advertising or otherwise to promote the sale, c@118: use or other dealings in this Software without prior written c@118: authorization. c@118: */ cannam@111: c@94: #ifndef PIPER_PROCESS_QT_TRANSPORT_H c@94: #define PIPER_PROCESS_QT_TRANSPORT_H c@94: c@94: #include "SynchronousTransport.h" c@94: c@94: #include c@94: #include c@100: #include c@148: #include c@94: c@94: #include c@94: c@124: //#define DEBUG_TRANSPORT 1 c@124: c@97: namespace piper_vamp { c@97: namespace client { c@94: c@100: /** c@100: * A SynchronousTransport implementation that spawns a sub-process c@100: * using Qt's QProcess abstraction and talks to it via stdin/stdout c@100: * channels. Calls are completely serialized; the protocol only c@100: * supports one call in process at a time, and therefore the transport c@125: * only allows one at a time. c@125: * c@125: * This class is thread-safe, but in practice you can only use it from c@125: * within a single thread, because the underlying QProcess does not c@125: * support switching threads. c@100: */ c@94: class ProcessQtTransport : public SynchronousTransport c@94: { c@94: public: c@134: ProcessQtTransport(std::string processName, c@134: std::string formatArg, c@134: LogCallback *logger) : // logger may be nullptr for cerr c@134: m_logger(logger), c@126: m_completenessChecker(0), c@126: m_crashed(false) { c@119: c@94: m_process = new QProcess(); c@94: m_process->setReadChannel(QProcess::StandardOutput); c@94: m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel); cannam@114: c@119: m_process->start(QString::fromStdString(processName), c@124: { QString::fromStdString(formatArg) }); cannam@114: c@94: if (!m_process->waitForStarted()) { cannam@113: if (m_process->state() == QProcess::NotRunning) { cannam@113: QProcess::ProcessError err = m_process->error(); cannam@113: if (err == QProcess::FailedToStart) { c@134: log("Unable to start server process " + processName); cannam@113: } else if (err == QProcess::Crashed) { c@134: log("Server process " + processName + " crashed on startup"); cannam@113: } else { c@134: QString e = QString("%1").arg(err); c@134: log("Server process " + processName + c@134: " failed on startup with error code " + e.toStdString()); cannam@113: } cannam@113: delete m_process; cannam@113: m_process = nullptr; cannam@111: } c@94: } c@134: c@134: if (m_process) { c@134: log("Server process " + processName + " started OK"); c@134: } c@94: } c@94: c@94: ~ProcessQtTransport() { c@94: if (m_process) { c@94: if (m_process->state() != QProcess::NotRunning) { c@118: m_process->closeWriteChannel(); c@94: m_process->waitForFinished(200); c@94: m_process->close(); c@94: m_process->waitForFinished(); c@134: log("Server process exited normally"); c@94: } c@94: delete m_process; c@94: } c@94: } c@94: c@94: void cannam@111: setCompletenessChecker(MessageCompletenessChecker *checker) override { c@94: m_completenessChecker = checker; c@94: } c@94: c@94: bool c@94: isOK() const override { c@126: return (m_process != nullptr) && !m_crashed; c@94: } c@94: c@94: std::vector c@134: call(const char *ptr, size_t size, std::string type, bool slow) override { c@94: c@118: QMutexLocker locker(&m_mutex); c@118: c@94: if (!m_completenessChecker) { c@134: log("call: No completeness checker set on transport"); c@94: throw std::logic_error("No completeness checker set on transport"); c@94: } c@126: if (!isOK()) { c@134: log("call: Transport is not OK"); c@126: throw std::logic_error("Transport is not OK"); c@126: } c@94: c@124: #ifdef DEBUG_TRANSPORT c@115: std::cerr << "writing " << size << " bytes to server" << std::endl; c@124: #endif c@94: m_process->write(ptr, size); c@126: m_process->waitForBytesWritten(); c@94: c@94: std::vector buffer; c@94: bool complete = false; c@148: c@148: QTime t; c@148: t.start(); c@148: c@148: // We don't like to timeout at all while waiting for a c@148: // response -- we'd like to wait as long as the server c@148: // continues running. c@148: // c@148: int beforeResponseTimeout = 0; // ms, 0 = no timeout c@148: c@148: // But if the call is marked as fast (i.e. just retrieving c@148: // info rather than calculating something) we will time out c@148: // after a bit. c@148: // c@148: if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout c@148: c@148: // But we do timeout if the server sends part of a reply and c@148: // then gets stuck. It's reasonable to assume that a server c@148: // that's already prepared its message and started sending has c@148: // finished doing any real work. In each case the timeout is c@148: // measured since data was last read. c@148: // c@148: int duringResponseTimeout = 5000; // ms, 0 = no timeout c@94: c@94: while (!complete) { c@94: c@148: bool responseStarted = !buffer.empty(); // already have something c@148: int ms = t.elapsed(); // time since start or since last read c@148: c@94: qint64 byteCount = m_process->bytesAvailable(); c@94: c@118: if (!byteCount) { c@148: c@148: if (responseStarted) { c@148: if (duringResponseTimeout > 0 && ms > duringResponseTimeout) { c@148: log("Server timed out during response"); c@148: throw std::runtime_error("Request timed out"); c@148: } c@148: } else { c@148: if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) { c@148: log("Server timed out before response"); c@148: throw std::runtime_error("Request timed out"); c@148: } c@148: } c@148: c@124: #ifdef DEBUG_TRANSPORT c@145: std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl; c@124: #endif c@126: if (slow) { c@126: m_process->waitForReadyRead(1000); c@126: } else { c@126: #ifdef _WIN32 c@126: // This is most unsatisfactory -- if we give a non-zero c@126: // arg here, then we end up sleeping way beyond the arrival c@126: // of the data to read -- can end up using less than 10% c@126: // CPU during processing which is crazy. So for Windows c@126: // only, we busy-wait during "fast" calls. It works out c@126: // much faster in the end. Could do with a simpler native c@126: // blocking API really. c@126: m_process->waitForReadyRead(0); c@126: #else c@126: m_process->waitForReadyRead(100); c@126: #endif c@126: } c@126: if (m_process->state() == QProcess::NotRunning && c@126: // don't give up until we've read all that's been buffered! c@126: !m_process->bytesAvailable()) { c@115: QProcess::ProcessError err = m_process->error(); c@115: if (err == QProcess::Crashed) { c@134: log("Server crashed during " + type + " request"); c@115: } else { c@134: QString e = QString("%1").arg(err); c@134: log("Server failed during " + type c@134: + " request with error code " + e.toStdString()); c@115: } c@126: m_crashed = true; c@121: throw ServerCrashed(); c@94: } c@94: } else { c@94: size_t formerSize = buffer.size(); c@94: buffer.resize(formerSize + byteCount); c@94: m_process->read(buffer.data() + formerSize, byteCount); c@146: switch (m_completenessChecker->check(buffer)) { c@146: case MessageCompletenessChecker::Complete: complete = true; break; c@146: case MessageCompletenessChecker::Incomplete: break; c@146: case MessageCompletenessChecker::Invalid: c@146: throw std::runtime_error c@146: ("Invalid message received: corrupt stream from server?"); c@146: } c@148: (void)t.restart(); // reset timeout when we read anything c@94: } c@94: } c@94: c@94: return buffer; c@94: } c@94: c@94: private: c@134: LogCallback *m_logger; c@94: MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently) c@94: QProcess *m_process; // I own this c@100: QMutex m_mutex; c@126: bool m_crashed; c@134: c@134: void log(std::string message) const { c@134: if (m_logger) m_logger->log(message); c@134: else std::cerr << message << std::endl; c@134: } c@94: }; c@94: c@94: } c@94: } c@94: c@94: #endif