cannam@150: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ cannam@150: /* cannam@150: Piper C++ cannam@150: cannam@150: An API for audio analysis and feature extraction plugins. cannam@150: cannam@150: Centre for Digital Music, Queen Mary, University of London. cannam@150: Copyright 2006-2016 Chris Cannam and QMUL. cannam@150: cannam@150: Permission is hereby granted, free of charge, to any person cannam@150: obtaining a copy of this software and associated documentation cannam@150: files (the "Software"), to deal in the Software without cannam@150: restriction, including without limitation the rights to use, copy, cannam@150: modify, merge, publish, distribute, sublicense, and/or sell copies cannam@150: of the Software, and to permit persons to whom the Software is cannam@150: furnished to do so, subject to the following conditions: cannam@150: cannam@150: The above copyright notice and this permission notice shall be cannam@150: included in all copies or substantial portions of the Software. cannam@150: cannam@150: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, cannam@150: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF cannam@150: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND cannam@150: NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR cannam@150: ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF cannam@150: CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION cannam@150: WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. cannam@150: cannam@150: Except as contained in this notice, the names of the Centre for cannam@150: Digital Music; Queen Mary, University of London; and Chris Cannam cannam@150: shall not be used in advertising or otherwise to promote the sale, cannam@150: use or other dealings in this Software without prior written cannam@150: authorization. cannam@150: */ cannam@150: cannam@150: #ifndef PIPER_PROCESS_QT_TRANSPORT_H cannam@150: #define PIPER_PROCESS_QT_TRANSPORT_H cannam@150: cannam@152: #include "../SynchronousTransport.h" cannam@170: #include "../Exceptions.h" cannam@150: cannam@150: #include cannam@150: #include cannam@150: #include cannam@286: #include cannam@150: cannam@150: #include cannam@150: cannam@150: //#define DEBUG_TRANSPORT 1 cannam@150: cannam@150: namespace piper_vamp { cannam@150: namespace client { cannam@150: cannam@150: /** cannam@150: * A SynchronousTransport implementation that spawns a sub-process cannam@150: * using Qt's QProcess abstraction and talks to it via stdin/stdout cannam@150: * channels. Calls are completely serialized; the protocol only cannam@150: * supports one call in process at a time, and therefore the transport cannam@150: * only allows one at a time. cannam@150: * cannam@150: * This class is thread-safe, but in practice you can only use it from cannam@150: * within a single thread, because the underlying QProcess does not cannam@150: * support switching threads. cannam@150: */ cannam@150: class ProcessQtTransport : public SynchronousTransport cannam@150: { cannam@150: public: cannam@150: ProcessQtTransport(std::string processName, cannam@150: std::string formatArg, cannam@150: LogCallback *logger) : // logger may be nullptr for cerr cannam@150: m_logger(logger), cannam@150: m_completenessChecker(0), cannam@150: m_crashed(false) { cannam@150: cannam@150: m_process = new QProcess(); cannam@150: m_process->setReadChannel(QProcess::StandardOutput); cannam@282: cannam@282: if (m_logger) { cannam@282: m_process->setProcessChannelMode(QProcess::SeparateChannels); cannam@282: } else { cannam@282: m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel); cannam@282: } cannam@150: cannam@150: m_process->start(QString::fromStdString(processName), cannam@150: { QString::fromStdString(formatArg) }); cannam@150: cannam@150: if (!m_process->waitForStarted()) { cannam@150: if (m_process->state() == QProcess::NotRunning) { cannam@150: QProcess::ProcessError err = m_process->error(); cannam@150: if (err == QProcess::FailedToStart) { cannam@150: log("Unable to start server process " + processName); cannam@150: } else if (err == QProcess::Crashed) { cannam@150: log("Server process " + processName + " crashed on startup"); cannam@150: } else { cannam@150: QString e = QString("%1").arg(err); cannam@150: log("Server process " + processName + cannam@150: " failed on startup with error code " + e.toStdString()); cannam@150: } cannam@282: logServerErrors(); cannam@150: delete m_process; cannam@150: m_process = nullptr; cannam@150: } cannam@150: } cannam@150: cannam@150: if (m_process) { cannam@150: log("Server process " + processName + " started OK"); cannam@282: logServerErrors(); cannam@150: } cannam@150: } cannam@150: cannam@150: ~ProcessQtTransport() { cannam@150: if (m_process) { cannam@150: if (m_process->state() != QProcess::NotRunning) { cannam@150: m_process->closeWriteChannel(); cannam@150: m_process->waitForFinished(200); cannam@150: m_process->close(); cannam@150: m_process->waitForFinished(); cannam@150: log("Server process exited normally"); cannam@150: } cannam@282: logServerErrors(); cannam@150: delete m_process; cannam@150: } cannam@150: } cannam@150: cannam@150: void cannam@150: setCompletenessChecker(MessageCompletenessChecker *checker) override { cannam@150: m_completenessChecker = checker; cannam@150: } cannam@150: cannam@150: bool cannam@150: isOK() const override { cannam@150: return (m_process != nullptr) && !m_crashed; cannam@150: } cannam@150: cannam@150: std::vector cannam@150: call(const char *ptr, size_t size, std::string type, bool slow) override { cannam@150: cannam@150: QMutexLocker locker(&m_mutex); cannam@150: cannam@150: if (!m_completenessChecker) { cannam@150: log("call: No completeness checker set on transport"); cannam@150: throw std::logic_error("No completeness checker set on transport"); cannam@150: } cannam@150: if (!isOK()) { cannam@150: log("call: Transport is not OK"); cannam@150: throw std::logic_error("Transport is not OK"); cannam@150: } cannam@150: cannam@150: #ifdef DEBUG_TRANSPORT cannam@150: std::cerr << "writing " << size << " bytes to server" << std::endl; cannam@150: #endif cannam@150: m_process->write(ptr, size); cannam@150: m_process->waitForBytesWritten(); cannam@150: cannam@150: std::vector buffer; cannam@150: bool complete = false; cannam@150: cannam@286: QElapsedTimer t; cannam@150: t.start(); cannam@150: cannam@150: // We don't like to timeout at all while waiting for a cannam@150: // response -- we'd like to wait as long as the server cannam@150: // continues running. cannam@150: // cannam@286: qint64 beforeResponseTimeout = 0; // ms, 0 = no timeout cannam@150: cannam@150: // But if the call is marked as fast (i.e. just retrieving cannam@150: // info rather than calculating something) we will time out cannam@150: // after a bit. cannam@150: // cannam@150: if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout cannam@150: cannam@150: // But we do timeout if the server sends part of a reply and cannam@150: // then gets stuck. It's reasonable to assume that a server cannam@150: // that's already prepared its message and started sending has cannam@150: // finished doing any real work. In each case the timeout is cannam@150: // measured since data was last read. cannam@150: // cannam@286: qint64 duringResponseTimeout = 5000; // ms, 0 = no timeout cannam@150: cannam@150: while (!complete) { cannam@150: cannam@150: bool responseStarted = !buffer.empty(); // already have something cannam@286: qint64 ms = t.elapsed(); // time since start or since last read cannam@150: cannam@150: qint64 byteCount = m_process->bytesAvailable(); cannam@150: cannam@150: if (!byteCount) { cannam@150: cannam@150: if (responseStarted) { cannam@150: if (duringResponseTimeout > 0 && ms > duringResponseTimeout) { cannam@150: log("Server timed out during response"); cannam@282: logServerErrors(); cannam@170: m_crashed = true; cannam@170: throw RequestTimedOut(); cannam@150: } cannam@150: } else { cannam@150: if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) { cannam@150: log("Server timed out before response"); cannam@282: logServerErrors(); cannam@170: m_crashed = true; cannam@170: throw RequestTimedOut(); cannam@150: } cannam@150: } cannam@150: cannam@150: #ifdef DEBUG_TRANSPORT cannam@150: std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl; cannam@150: #endif cannam@150: if (slow) { cannam@150: m_process->waitForReadyRead(1000); cannam@150: } else { cannam@150: #ifdef _WIN32 cannam@150: // This is most unsatisfactory -- if we give a non-zero cannam@150: // arg here, then we end up sleeping way beyond the arrival cannam@150: // of the data to read -- can end up using less than 10% cannam@150: // CPU during processing which is crazy. So for Windows cannam@150: // only, we busy-wait during "fast" calls. It works out cannam@150: // much faster in the end. Could do with a simpler native cannam@150: // blocking API really. cannam@150: m_process->waitForReadyRead(0); cannam@150: #else cannam@150: m_process->waitForReadyRead(100); cannam@150: #endif cannam@150: } cannam@150: if (m_process->state() == QProcess::NotRunning && cannam@150: // don't give up until we've read all that's been buffered! cannam@150: !m_process->bytesAvailable()) { cannam@150: QProcess::ProcessError err = m_process->error(); cannam@150: if (err == QProcess::Crashed) { cannam@150: log("Server crashed during " + type + " request"); cannam@150: } else { cannam@150: QString e = QString("%1").arg(err); cannam@150: log("Server failed during " + type cannam@150: + " request with error code " + e.toStdString()); cannam@150: } cannam@150: m_crashed = true; cannam@150: throw ServerCrashed(); cannam@150: } cannam@150: } else { cannam@150: size_t formerSize = buffer.size(); cannam@150: buffer.resize(formerSize + byteCount); cannam@150: m_process->read(buffer.data() + formerSize, byteCount); cannam@150: switch (m_completenessChecker->check(buffer)) { cannam@150: case MessageCompletenessChecker::Complete: complete = true; break; cannam@150: case MessageCompletenessChecker::Incomplete: break; cannam@170: case MessageCompletenessChecker::Invalid: throw ProtocolError(); cannam@150: } cannam@150: (void)t.restart(); // reset timeout when we read anything cannam@150: } cannam@150: } cannam@150: cannam@282: logServerErrors(); cannam@150: return buffer; cannam@150: } cannam@150: cannam@150: private: cannam@150: LogCallback *m_logger; cannam@150: MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently) cannam@150: QProcess *m_process; // I own this cannam@150: QMutex m_mutex; cannam@150: bool m_crashed; cannam@150: cannam@150: void log(std::string message) const { cannam@150: if (m_logger) m_logger->log(message); cannam@150: else std::cerr << message << std::endl; cannam@150: } cannam@282: cannam@282: void logServerErrors() const { cannam@282: if (!m_logger || !m_process) return; cannam@282: cannam@282: m_process->setReadChannel(QProcess::StandardError); cannam@282: cannam@282: qint64 byteCount = m_process->bytesAvailable(); cannam@282: if (byteCount == 0) { cannam@282: m_process->setReadChannel(QProcess::StandardOutput); cannam@282: return; cannam@282: } cannam@282: cannam@282: QByteArray buffer = m_process->read(byteCount); cannam@285: while (buffer.endsWith('\n') || buffer.endsWith('\r')) { cannam@285: buffer.chop(1); cannam@285: } cannam@284: std::string str(buffer.constData(), buffer.size()); cannam@282: m_logger->log("Piper server stderr output follows:\n" + str); cannam@282: m_logger->log("Piper server stderr output ends"); cannam@282: cannam@282: m_process->setReadChannel(QProcess::StandardOutput); cannam@282: } cannam@150: }; cannam@150: cannam@150: } cannam@150: } cannam@150: cannam@150: #endif