Chris@47: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors Chris@47: // Licensed under the MIT License: Chris@47: // Chris@47: // Permission is hereby granted, free of charge, to any person obtaining a copy Chris@47: // of this software and associated documentation files (the "Software"), to deal Chris@47: // in the Software without restriction, including without limitation the rights Chris@47: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell Chris@47: // copies of the Software, and to permit persons to whom the Software is Chris@47: // furnished to do so, subject to the following conditions: Chris@47: // Chris@47: // The above copyright notice and this permission notice shall be included in Chris@47: // all copies or substantial portions of the Software. Chris@47: // Chris@47: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR Chris@47: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, Chris@47: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE Chris@47: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER Chris@47: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, Chris@47: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN Chris@47: // THE SOFTWARE. Chris@47: Chris@47: #ifndef CAPNP_RPC_TWOPARTY_H_ Chris@47: #define CAPNP_RPC_TWOPARTY_H_ Chris@47: Chris@47: #if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS) Chris@47: #pragma GCC system_header Chris@47: #endif Chris@47: Chris@47: #include "rpc.h" Chris@47: #include "message.h" Chris@47: #include Chris@47: #include Chris@47: Chris@47: namespace capnp { Chris@47: Chris@47: namespace rpc { Chris@47: namespace twoparty { Chris@47: typedef VatId SturdyRefHostId; // For backwards-compatibility with version 0.4. Chris@47: } Chris@47: } Chris@47: Chris@47: typedef VatNetwork Chris@47: TwoPartyVatNetworkBase; Chris@47: Chris@47: class TwoPartyVatNetwork: public TwoPartyVatNetworkBase, Chris@47: private TwoPartyVatNetworkBase::Connection { Chris@47: // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte Chris@47: // stream. This is used to implement the common case of a client/server network. Chris@47: // Chris@47: // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers. Chris@47: // Use `TwoPartyVatNetwork` only if you need the advanced features. Chris@47: Chris@47: public: Chris@47: TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side, Chris@47: ReaderOptions receiveOptions = ReaderOptions()); Chris@47: KJ_DISALLOW_COPY(TwoPartyVatNetwork); Chris@47: Chris@47: kj::Promise onDisconnect() { return disconnectPromise.addBranch(); } Chris@47: // Returns a promise that resolves when the peer disconnects. Chris@47: Chris@47: rpc::twoparty::Side getSide() { return side; } Chris@47: Chris@47: // implements VatNetwork ----------------------------------------------------- Chris@47: Chris@47: kj::Maybe> connect( Chris@47: rpc::twoparty::VatId::Reader ref) override; Chris@47: kj::Promise> accept() override; Chris@47: Chris@47: private: Chris@47: class OutgoingMessageImpl; Chris@47: class IncomingMessageImpl; Chris@47: Chris@47: kj::AsyncIoStream& stream; Chris@47: rpc::twoparty::Side side; Chris@47: MallocMessageBuilder peerVatId; Chris@47: ReaderOptions receiveOptions; Chris@47: bool accepted = false; Chris@47: Chris@47: kj::Maybe> previousWrite; Chris@47: // Resolves when the previous write completes. This effectively serves as the write queue. Chris@47: // Becomes null when shutdown() is called. Chris@47: Chris@47: kj::Own>> acceptFulfiller; Chris@47: // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the Chris@47: // second call on the server side. Never fulfilled, because there is only one connection. Chris@47: Chris@47: kj::ForkedPromise disconnectPromise = nullptr; Chris@47: Chris@47: class FulfillerDisposer: public kj::Disposer { Chris@47: // Hack: TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection. When the RPC Chris@47: // system detects (or initiates) a disconnection, it drops its reference to the Connection. Chris@47: // When all references have been dropped, then we want disconnectPromise to be fulfilled. Chris@47: // So we hand out Owns with this disposer attached, so that we can detect when Chris@47: // they are dropped. Chris@47: Chris@47: public: Chris@47: mutable kj::Own> fulfiller; Chris@47: mutable uint refcount = 0; Chris@47: Chris@47: void disposeImpl(void* pointer) const override; Chris@47: }; Chris@47: FulfillerDisposer disconnectFulfiller; Chris@47: Chris@47: kj::Own asConnection(); Chris@47: // Returns a pointer to this with the disposer set to disconnectFulfiller. Chris@47: Chris@47: // implements Connection ----------------------------------------------------- Chris@47: Chris@47: rpc::twoparty::VatId::Reader getPeerVatId() override; Chris@47: kj::Own newOutgoingMessage(uint firstSegmentWordSize) override; Chris@47: kj::Promise>> receiveIncomingMessage() override; Chris@47: kj::Promise shutdown() override; Chris@47: }; Chris@47: Chris@47: class TwoPartyServer: private kj::TaskSet::ErrorHandler { Chris@47: // Convenience class which implements a simple server which accepts connections on a listener Chris@47: // socket and serices them as two-party connections. Chris@47: Chris@47: public: Chris@47: explicit TwoPartyServer(Capability::Client bootstrapInterface); Chris@47: Chris@47: void accept(kj::Own&& connection); Chris@47: // Accepts the connection for servicing. Chris@47: Chris@47: kj::Promise listen(kj::ConnectionReceiver& listener); Chris@47: // Listens for connections on the given listener. The returned promise never resolves unless an Chris@47: // exception is thrown while trying to accept. You may discard the returned promise to cancel Chris@47: // listening. Chris@47: Chris@47: private: Chris@47: Capability::Client bootstrapInterface; Chris@47: kj::TaskSet tasks; Chris@47: Chris@47: struct AcceptedConnection; Chris@47: Chris@47: void taskFailed(kj::Exception&& exception) override; Chris@47: }; Chris@47: Chris@47: class TwoPartyClient { Chris@47: // Convenience class which implements a simple client. Chris@47: Chris@47: public: Chris@47: explicit TwoPartyClient(kj::AsyncIoStream& connection); Chris@47: TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface, Chris@47: rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT); Chris@47: Chris@47: Capability::Client bootstrap(); Chris@47: // Get the server's bootstrap interface. Chris@47: Chris@47: inline kj::Promise onDisconnect() { return network.onDisconnect(); } Chris@47: Chris@47: private: Chris@47: TwoPartyVatNetwork network; Chris@47: RpcSystem rpcSystem; Chris@47: }; Chris@47: Chris@47: } // namespace capnp Chris@47: Chris@47: #endif // CAPNP_RPC_TWOPARTY_H_