cannam@148: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors cannam@148: // Licensed under the MIT License: cannam@148: // cannam@148: // Permission is hereby granted, free of charge, to any person obtaining a copy cannam@148: // of this software and associated documentation files (the "Software"), to deal cannam@148: // in the Software without restriction, including without limitation the rights cannam@148: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell cannam@148: // copies of the Software, and to permit persons to whom the Software is cannam@148: // furnished to do so, subject to the following conditions: cannam@148: // cannam@148: // The above copyright notice and this permission notice shall be included in cannam@148: // all copies or substantial portions of the Software. cannam@148: // cannam@148: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR cannam@148: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, cannam@148: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE cannam@148: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER cannam@148: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, cannam@148: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN cannam@148: // THE SOFTWARE. cannam@148: cannam@148: #ifndef KJ_ASYNC_UNIX_H_ cannam@148: #define KJ_ASYNC_UNIX_H_ cannam@148: cannam@148: #if _WIN32 cannam@148: #error "This file is Unix-specific. On Windows, include async-win32.h instead." cannam@148: #endif cannam@148: cannam@148: #if defined(__GNUC__) && !KJ_HEADER_WARNINGS cannam@148: #pragma GCC system_header cannam@148: #endif cannam@148: cannam@148: #include "async.h" cannam@148: #include "time.h" cannam@148: #include "vector.h" cannam@148: #include "io.h" cannam@148: #include cannam@148: cannam@148: #if __linux__ && !__BIONIC__ && !defined(KJ_USE_EPOLL) cannam@148: // Default to epoll on Linux, except on Bionic (Android) which doesn't have signalfd.h. cannam@148: #define KJ_USE_EPOLL 1 cannam@148: #endif cannam@148: cannam@148: namespace kj { cannam@148: cannam@148: class UnixEventPort: public EventPort { cannam@148: // An EventPort implementation which can wait for events on file descriptors as well as signals. cannam@148: // This API only makes sense on Unix. cannam@148: // cannam@148: // The implementation uses `poll()` or possibly a platform-specific API (e.g. epoll, kqueue). cannam@148: // To also wait on signals without race conditions, the implementation may block signals until cannam@148: // just before `poll()` while using a signal handler which `siglongjmp()`s back to just before cannam@148: // the signal was unblocked, or it may use a nicer platform-specific API like signalfd. cannam@148: // cannam@148: // The implementation reserves a signal for internal use. By default, it uses SIGUSR1. If you cannam@148: // need to use SIGUSR1 for something else, you must offer a different signal by calling cannam@148: // setReservedSignal() at startup. cannam@148: // cannam@148: // WARNING: A UnixEventPort can only be used in the thread and process that created it. In cannam@148: // particular, note that after a fork(), a UnixEventPort created in the parent process will cannam@148: // not work correctly in the child, even if the parent ceases to use its copy. In particular cannam@148: // note that this means that server processes which daemonize themselves at startup must wait cannam@148: // until after daemonization to create a UnixEventPort. cannam@148: cannam@148: public: cannam@148: UnixEventPort(); cannam@148: ~UnixEventPort() noexcept(false); cannam@148: cannam@148: class FdObserver; cannam@148: // Class that watches an fd for readability or writability. See definition below. cannam@148: cannam@148: Promise onSignal(int signum); cannam@148: // When the given signal is delivered to this thread, return the corresponding siginfo_t. cannam@148: // The signal must have been captured using `captureSignal()`. cannam@148: // cannam@148: // If `onSignal()` has not been called, the signal will remain blocked in this thread. cannam@148: // Therefore, a signal which arrives before `onSignal()` was called will not be "missed" -- the cannam@148: // next call to 'onSignal()' will receive it. Also, you can control which thread receives a cannam@148: // process-wide signal by only calling `onSignal()` on that thread's event loop. cannam@148: // cannam@148: // The result of waiting on the same signal twice at once is undefined. cannam@148: cannam@148: static void captureSignal(int signum); cannam@148: // Arranges for the given signal to be captured and handled via UnixEventPort, so that you may cannam@148: // then pass it to `onSignal()`. This method is static because it registers a signal handler cannam@148: // which applies process-wide. If any other threads exist in the process when `captureSignal()` cannam@148: // is called, you *must* set the signal mask in those threads to block this signal, otherwise cannam@148: // terrible things will happen if the signal happens to be delivered to those threads. If at cannam@148: // all possible, call `captureSignal()` *before* creating threads, so that threads you create in cannam@148: // the future will inherit the proper signal mask. cannam@148: // cannam@148: // To un-capture a signal, simply install a different signal handler and then un-block it from cannam@148: // the signal mask. cannam@148: cannam@148: static void setReservedSignal(int signum); cannam@148: // Sets the signal number which `UnixEventPort` reserves for internal use. If your application cannam@148: // needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before cannam@148: // constructing an `UnixEventPort`) to offer a different signal. cannam@148: cannam@148: Timer& getTimer() { return timerImpl; } cannam@148: cannam@148: // implements EventPort ------------------------------------------------------ cannam@148: bool wait() override; cannam@148: bool poll() override; cannam@148: void wake() const override; cannam@148: cannam@148: private: cannam@148: struct TimerSet; // Defined in source file to avoid STL include. cannam@148: class TimerPromiseAdapter; cannam@148: class SignalPromiseAdapter; cannam@148: cannam@148: TimerImpl timerImpl; cannam@148: cannam@148: SignalPromiseAdapter* signalHead = nullptr; cannam@148: SignalPromiseAdapter** signalTail = &signalHead; cannam@148: cannam@148: TimePoint readClock(); cannam@148: void gotSignal(const siginfo_t& siginfo); cannam@148: cannam@148: friend class TimerPromiseAdapter; cannam@148: cannam@148: #if KJ_USE_EPOLL cannam@148: AutoCloseFd epollFd; cannam@148: AutoCloseFd signalFd; cannam@148: AutoCloseFd eventFd; // Used for cross-thread wakeups. cannam@148: cannam@148: sigset_t signalFdSigset; cannam@148: // Signal mask as currently set on the signalFd. Tracked so we can detect whether or not it cannam@148: // needs updating. cannam@148: cannam@148: bool doEpollWait(int timeout); cannam@148: cannam@148: #else cannam@148: class PollContext; cannam@148: cannam@148: FdObserver* observersHead = nullptr; cannam@148: FdObserver** observersTail = &observersHead; cannam@148: cannam@148: unsigned long long threadId; // actually pthread_t cannam@148: #endif cannam@148: }; cannam@148: cannam@148: class UnixEventPort::FdObserver { cannam@148: // Object which watches a file descriptor to determine when it is readable or writable. cannam@148: // cannam@148: // For listen sockets, "readable" means that there is a connection to accept(). For everything cannam@148: // else, it means that read() (or recv()) will return data. cannam@148: // cannam@148: // The presence of out-of-band data should NOT fire this event. However, the event may cannam@148: // occasionally fire spuriously (when there is actually no data to read), and one thing that can cannam@148: // cause such spurious events is the arrival of OOB data on certain platforms whose event cannam@148: // interfaces fail to distinguish between regular and OOB data (e.g. Mac OSX). cannam@148: // cannam@148: // WARNING: The exact behavior of this class differs across systems, since event interfaces cannam@148: // vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified cannam@148: // behavior. If at all possible, use the higher-level AsyncInputStream interface instead. cannam@148: cannam@148: public: cannam@148: enum Flags { cannam@148: OBSERVE_READ = 1, cannam@148: OBSERVE_WRITE = 2, cannam@148: OBSERVE_URGENT = 4, cannam@148: OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE cannam@148: }; cannam@148: cannam@148: FdObserver(UnixEventPort& eventPort, int fd, uint flags); cannam@148: // Begin watching the given file descriptor for readability. Only one ReadObserver may exist cannam@148: // for a given file descriptor at a time. cannam@148: cannam@148: ~FdObserver() noexcept(false); cannam@148: cannam@148: KJ_DISALLOW_COPY(FdObserver); cannam@148: cannam@148: Promise whenBecomesReadable(); cannam@148: // Resolves the next time the file descriptor transitions from having no data to read to having cannam@148: // some data to read. cannam@148: // cannam@148: // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error cannam@148: // to call this method when there is already data in the read buffer which has been there since cannam@148: // prior to the last turn of the event loop or prior to creation FdWatcher. In this case, it is cannam@148: // unspecified whether the promise will ever resolve -- it depends on the underlying event cannam@148: // mechanism being used. cannam@148: // cannam@148: // In order to avoid this problem, make sure that you only call `whenBecomesReadable()` cannam@148: // only at times when you know the buffer is empty. You know this for sure when one of the cannam@148: // following happens: cannam@148: // * read() or recv() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode cannam@148: // enabled on the fd!) cannam@148: // * The file descriptor is a regular byte-oriented object (like a socket or pipe), cannam@148: // read() or recv() returns fewer than the number of bytes requested, and `atEndHint()` cannam@148: // returns false. This can only happen if the buffer is empty but EOF is not reached. (Note, cannam@148: // though, that for record-oriented file descriptors like Linux's inotify interface, this cannam@148: // rule does not hold, because it could simply be that the next record did not fit into the cannam@148: // space available.) cannam@148: // cannam@148: // It is an error to call `whenBecomesReadable()` again when the promise returned previously cannam@148: // has not yet resolved. If you do this, the previous promise may throw an exception. cannam@148: cannam@148: inline Maybe atEndHint() { return atEnd; } cannam@148: // Returns true if the event system has indicated that EOF has been received. There may still cannam@148: // be data in the read buffer, but once that is gone, there's nothing left. cannam@148: // cannam@148: // Returns false if the event system has indicated that EOF had NOT been received as of the cannam@148: // last turn of the event loop. cannam@148: // cannam@148: // Returns nullptr if the event system does not know whether EOF has been reached. In this cannam@148: // case, the only way to know for sure is to call read() or recv() and check if it returns cannam@148: // zero. cannam@148: // cannam@148: // This hint may be useful as an optimization to avoid an unnecessary system call. cannam@148: cannam@148: Promise whenBecomesWritable(); cannam@148: // Resolves the next time the file descriptor transitions from having no space available in the cannam@148: // write buffer to having some space available. cannam@148: // cannam@148: // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error cannam@148: // to call this method when there is already space in the write buffer which has been there cannam@148: // since prior to the last turn of the event loop or prior to creation FdWatcher. In this case, cannam@148: // it is unspecified whether the promise will ever resolve -- it depends on the underlying cannam@148: // event mechanism being used. cannam@148: // cannam@148: // In order to avoid this problem, make sure that you only call `whenBecomesWritable()` cannam@148: // only at times when you know the buffer is full. You know this for sure when one of the cannam@148: // following happens: cannam@148: // * write() or send() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode cannam@148: // enabled on the fd!) cannam@148: // * write() or send() succeeds but accepts fewer than the number of bytes provided. This can cannam@148: // only happen if the buffer is full. cannam@148: // cannam@148: // It is an error to call `whenBecomesWritable()` again when the promise returned previously cannam@148: // has not yet resolved. If you do this, the previous promise may throw an exception. cannam@148: cannam@148: Promise whenUrgentDataAvailable(); cannam@148: // Resolves the next time the file descriptor's read buffer contains "urgent" data. cannam@148: // cannam@148: // The conditions for availability of urgent data are specific to the file descriptor's cannam@148: // underlying implementation. cannam@148: // cannam@148: // It is an error to call `whenUrgentDataAvailable()` again when the promise returned previously cannam@148: // has not yet resolved. If you do this, the previous promise may throw an exception. cannam@148: // cannam@148: // WARNING: This has some known weird behavior on macOS. See cannam@148: // https://github.com/sandstorm-io/capnproto/issues/374. cannam@148: cannam@148: private: cannam@148: UnixEventPort& eventPort; cannam@148: int fd; cannam@148: uint flags; cannam@148: cannam@148: kj::Maybe>> readFulfiller; cannam@148: kj::Maybe>> writeFulfiller; cannam@148: kj::Maybe>> urgentFulfiller; cannam@148: // Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to cannam@148: // null every time an event is fired. cannam@148: cannam@148: Maybe atEnd; cannam@148: cannam@148: void fire(short events); cannam@148: cannam@148: #if !KJ_USE_EPOLL cannam@148: FdObserver* next; cannam@148: FdObserver** prev; cannam@148: // Linked list of observers which currently have a non-null readFulfiller or writeFulfiller. cannam@148: // If `prev` is null then the observer is not currently in the list. cannam@148: cannam@148: short getEventMask(); cannam@148: #endif cannam@148: cannam@148: friend class UnixEventPort; cannam@148: }; cannam@148: cannam@148: } // namespace kj cannam@148: cannam@148: #endif // KJ_ASYNC_UNIX_H_