cannam@147: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors cannam@147: // Licensed under the MIT License: cannam@147: // cannam@147: // Permission is hereby granted, free of charge, to any person obtaining a copy cannam@147: // of this software and associated documentation files (the "Software"), to deal cannam@147: // in the Software without restriction, including without limitation the rights cannam@147: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell cannam@147: // copies of the Software, and to permit persons to whom the Software is cannam@147: // furnished to do so, subject to the following conditions: cannam@147: // cannam@147: // The above copyright notice and this permission notice shall be included in cannam@147: // all copies or substantial portions of the Software. cannam@147: // cannam@147: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR cannam@147: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, cannam@147: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE cannam@147: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER cannam@147: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, cannam@147: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN cannam@147: // THE SOFTWARE. cannam@147: cannam@147: #ifndef CAPNP_RPC_TWOPARTY_H_ cannam@147: #define CAPNP_RPC_TWOPARTY_H_ cannam@147: cannam@147: #if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS) cannam@147: #pragma GCC system_header cannam@147: #endif cannam@147: cannam@147: #include "rpc.h" cannam@147: #include "message.h" cannam@147: #include cannam@147: #include cannam@147: cannam@147: namespace capnp { cannam@147: cannam@147: namespace rpc { cannam@147: namespace twoparty { cannam@147: typedef VatId SturdyRefHostId; // For backwards-compatibility with version 0.4. cannam@147: } cannam@147: } cannam@147: cannam@147: typedef VatNetwork cannam@147: TwoPartyVatNetworkBase; cannam@147: cannam@147: class TwoPartyVatNetwork: public TwoPartyVatNetworkBase, cannam@147: private TwoPartyVatNetworkBase::Connection { cannam@147: // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte cannam@147: // stream. This is used to implement the common case of a client/server network. cannam@147: // cannam@147: // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers. cannam@147: // Use `TwoPartyVatNetwork` only if you need the advanced features. cannam@147: cannam@147: public: cannam@147: TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side, cannam@147: ReaderOptions receiveOptions = ReaderOptions()); cannam@147: KJ_DISALLOW_COPY(TwoPartyVatNetwork); cannam@147: cannam@147: kj::Promise onDisconnect() { return disconnectPromise.addBranch(); } cannam@147: // Returns a promise that resolves when the peer disconnects. cannam@147: cannam@147: rpc::twoparty::Side getSide() { return side; } cannam@147: cannam@147: // implements VatNetwork ----------------------------------------------------- cannam@147: cannam@147: kj::Maybe> connect( cannam@147: rpc::twoparty::VatId::Reader ref) override; cannam@147: kj::Promise> accept() override; cannam@147: cannam@147: private: cannam@147: class OutgoingMessageImpl; cannam@147: class IncomingMessageImpl; cannam@147: cannam@147: kj::AsyncIoStream& stream; cannam@147: rpc::twoparty::Side side; cannam@147: MallocMessageBuilder peerVatId; cannam@147: ReaderOptions receiveOptions; cannam@147: bool accepted = false; cannam@147: cannam@147: kj::Maybe> previousWrite; cannam@147: // Resolves when the previous write completes. This effectively serves as the write queue. cannam@147: // Becomes null when shutdown() is called. cannam@147: cannam@147: kj::Own>> acceptFulfiller; cannam@147: // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the cannam@147: // second call on the server side. Never fulfilled, because there is only one connection. cannam@147: cannam@147: kj::ForkedPromise disconnectPromise = nullptr; cannam@147: cannam@147: class FulfillerDisposer: public kj::Disposer { cannam@147: // Hack: TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection. When the RPC cannam@147: // system detects (or initiates) a disconnection, it drops its reference to the Connection. cannam@147: // When all references have been dropped, then we want disconnectPromise to be fulfilled. cannam@147: // So we hand out Owns with this disposer attached, so that we can detect when cannam@147: // they are dropped. cannam@147: cannam@147: public: cannam@147: mutable kj::Own> fulfiller; cannam@147: mutable uint refcount = 0; cannam@147: cannam@147: void disposeImpl(void* pointer) const override; cannam@147: }; cannam@147: FulfillerDisposer disconnectFulfiller; cannam@147: cannam@147: kj::Own asConnection(); cannam@147: // Returns a pointer to this with the disposer set to disconnectFulfiller. cannam@147: cannam@147: // implements Connection ----------------------------------------------------- cannam@147: cannam@147: rpc::twoparty::VatId::Reader getPeerVatId() override; cannam@147: kj::Own newOutgoingMessage(uint firstSegmentWordSize) override; cannam@147: kj::Promise>> receiveIncomingMessage() override; cannam@147: kj::Promise shutdown() override; cannam@147: }; cannam@147: cannam@147: class TwoPartyServer: private kj::TaskSet::ErrorHandler { cannam@147: // Convenience class which implements a simple server which accepts connections on a listener cannam@147: // socket and serices them as two-party connections. cannam@147: cannam@147: public: cannam@147: explicit TwoPartyServer(Capability::Client bootstrapInterface); cannam@147: cannam@147: void accept(kj::Own&& connection); cannam@147: // Accepts the connection for servicing. cannam@147: cannam@147: kj::Promise listen(kj::ConnectionReceiver& listener); cannam@147: // Listens for connections on the given listener. The returned promise never resolves unless an cannam@147: // exception is thrown while trying to accept. You may discard the returned promise to cancel cannam@147: // listening. cannam@147: cannam@147: private: cannam@147: Capability::Client bootstrapInterface; cannam@147: kj::TaskSet tasks; cannam@147: cannam@147: struct AcceptedConnection; cannam@147: cannam@147: void taskFailed(kj::Exception&& exception) override; cannam@147: }; cannam@147: cannam@147: class TwoPartyClient { cannam@147: // Convenience class which implements a simple client. cannam@147: cannam@147: public: cannam@147: explicit TwoPartyClient(kj::AsyncIoStream& connection); cannam@147: TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface, cannam@147: rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT); cannam@147: cannam@147: Capability::Client bootstrap(); cannam@147: // Get the server's bootstrap interface. cannam@147: cannam@147: inline kj::Promise onDisconnect() { return network.onDisconnect(); } cannam@147: cannam@147: private: cannam@147: TwoPartyVatNetwork network; cannam@147: RpcSystem rpcSystem; cannam@147: }; cannam@147: cannam@147: } // namespace capnp cannam@147: cannam@147: #endif // CAPNP_RPC_TWOPARTY_H_