Chris@16: // Copyright (C) 2004-2006 The Trustees of Indiana University. Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP Chris@16: #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: namespace boost { namespace graph { namespace distributed { Chris@16: Chris@16: /// A unary predicate that always returns "true". Chris@16: struct always_push Chris@16: { Chris@16: template bool operator()(const T&) const { return true; } Chris@16: }; Chris@16: Chris@16: Chris@16: Chris@16: /** A distributed queue adaptor. Chris@16: * Chris@16: * Class template @c distributed_queue implements a distributed queue Chris@16: * across a process group. The distributed queue is an adaptor over an Chris@16: * existing (local) queue, which must model the @ref Buffer Chris@16: * concept. Each process stores a distinct copy of the local queue, Chris@16: * from which it draws or removes elements via the @ref pop and @ref Chris@16: * top members. Chris@16: * Chris@16: * The value type of the local queue must be a model of the @ref Chris@16: * GlobalDescriptor concept. The @ref push operation of the Chris@16: * distributed queue passes (via a message) the value to its owning Chris@16: * processor. Thus, the elements within a particular local queue are Chris@16: * guaranteed to have the process owning that local queue as an owner. Chris@16: * Chris@16: * Synchronization of distributed queues occurs in the @ref empty and Chris@16: * @ref size functions, which will only return "empty" values (true or Chris@16: * 0, respectively) when the entire distributed queue is empty. If the Chris@16: * local queue is empty but the distributed queue is not, the Chris@16: * operation will block until either condition changes. When the @ref Chris@16: * size function of a nonempty queue returns, it returns the size of Chris@16: * the local queue. These semantics were selected so that sequential Chris@16: * code that processes elements in the queue via the following idiom Chris@16: * can be parallelized via introduction of a distributed queue: Chris@16: * Chris@16: * distributed_queue<...> Q; Chris@16: * Q.push(x); Chris@16: * while (!Q.empty()) { Chris@16: * // do something, that may push a value onto Q Chris@16: * } Chris@16: * Chris@16: * In the parallel version, the initial @ref push operation will place Chris@16: * the value @c x onto its owner's queue. All processes will Chris@16: * synchronize at the call to empty, and only the process owning @c x Chris@16: * will be allowed to execute the loop (@ref Q.empty() returns Chris@16: * false). This iteration may in turn push values onto other remote Chris@16: * queues, so when that process finishes execution of the loop body Chris@16: * and all processes synchronize again in @ref empty, more processes Chris@16: * may have nonempty local queues to execute. Once all local queues Chris@16: * are empty, @ref Q.empty() returns @c false for all processes. Chris@16: * Chris@16: * The distributed queue can receive messages at two different times: Chris@16: * during synchronization and when polling @ref empty. Messages are Chris@16: * always received during synchronization, to ensure that accurate Chris@16: * local queue sizes can be determines. However, whether @ref empty Chris@16: * should poll for messages is specified as an option to the Chris@16: * constructor. Polling may be desired when the order in which Chris@16: * elements in the queue are processed is not important, because it Chris@16: * permits fewer synchronization steps and less communication Chris@16: * overhead. However, when more strict ordering guarantees are Chris@16: * required, polling may be semantically incorrect. By disabling Chris@16: * polling, one ensures that parallel execution using the idiom above Chris@16: * will not process an element at a later "level" before an earlier Chris@16: * "level". Chris@16: * Chris@16: * The distributed queue nearly models the @ref Buffer Chris@16: * concept. However, the @ref push routine does not necessarily Chris@16: * increase the result of @c size() by one (although the size of the Chris@16: * global queue does increase by one). Chris@16: */ Chris@16: template Chris@16: class distributed_queue Chris@16: { Chris@16: typedef distributed_queue self_type; Chris@16: Chris@16: enum { Chris@16: /** Message indicating a remote push. The message contains a Chris@16: * single value x of type value_type that is to be pushed on the Chris@16: * receiver's queue. Chris@16: */ Chris@16: msg_push, Chris@16: /** Push many elements at once. */ Chris@16: msg_multipush Chris@16: }; Chris@16: Chris@16: public: Chris@16: typedef ProcessGroup process_group_type; Chris@16: typedef Buffer buffer_type; Chris@16: typedef typename buffer_type::value_type value_type; Chris@16: typedef typename buffer_type::size_type size_type; Chris@16: Chris@16: /** Construct a new distributed queue. Chris@16: * Chris@16: * Build a new distributed queue that communicates over the given @p Chris@16: * process_group, whose local queue is initialized via @p buffer and Chris@16: * which may or may not poll for messages. Chris@16: */ Chris@16: explicit Chris@16: distributed_queue(const ProcessGroup& process_group, Chris@16: const OwnerMap& owner, Chris@16: const Buffer& buffer, Chris@16: bool polling = false); Chris@16: Chris@16: /** Construct a new distributed queue. Chris@16: * Chris@16: * Build a new distributed queue that communicates over the given @p Chris@16: * process_group, whose local queue is initialized via @p buffer and Chris@16: * which may or may not poll for messages. Chris@16: */ Chris@16: explicit Chris@16: distributed_queue(const ProcessGroup& process_group = ProcessGroup(), Chris@16: const OwnerMap& owner = OwnerMap(), Chris@16: const Buffer& buffer = Buffer(), Chris@16: const UnaryPredicate& pred = UnaryPredicate(), Chris@16: bool polling = false); Chris@16: Chris@16: /** Construct a new distributed queue. Chris@16: * Chris@16: * Build a new distributed queue that communicates over the given @p Chris@16: * process_group, whose local queue is default-initalized and which Chris@16: * may or may not poll for messages. Chris@16: */ Chris@16: distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, Chris@16: const UnaryPredicate& pred, bool polling = false); Chris@16: Chris@16: /** Virtual destructor required with virtual functions. Chris@16: * Chris@16: */ Chris@16: virtual ~distributed_queue() {} Chris@16: Chris@16: /** Push an element onto the distributed queue. Chris@16: * Chris@16: * The element will be sent to its owner process to be added to that Chris@16: * process's local queue. If polling is enabled for this queue and Chris@16: * the owner process is the current process, the value will be Chris@16: * immediately pushed onto the local queue. Chris@16: * Chris@16: * Complexity: O(1) messages of size O(sizeof(value_type)) will be Chris@16: * transmitted. Chris@16: */ Chris@16: void push(const value_type& x); Chris@16: Chris@16: /** Pop an element off the local queue. Chris@16: * Chris@16: * @p @c !empty() Chris@16: */ Chris@16: void pop() { buffer.pop(); } Chris@16: Chris@16: /** Chris@16: * Return the element at the top of the local queue. Chris@16: * Chris@16: * @p @c !empty() Chris@16: */ Chris@16: value_type& top() { return buffer.top(); } Chris@16: Chris@16: /** Chris@16: * \overload Chris@16: */ Chris@16: const value_type& top() const { return buffer.top(); } Chris@16: Chris@16: /** Determine if the queue is empty. Chris@16: * Chris@16: * When the local queue is nonempty, returns @c true. If the local Chris@16: * queue is empty, synchronizes with all other processes in the Chris@16: * process group until either (1) the local queue is nonempty Chris@16: * (returns @c true) (2) the entire distributed queue is empty Chris@16: * (returns @c false). Chris@16: */ Chris@16: bool empty() const; Chris@16: Chris@16: /** Determine the size of the local queue. Chris@16: * Chris@16: * The behavior of this routine is equivalent to the behavior of Chris@16: * @ref empty, except that when @ref empty returns true this Chris@16: * function returns the size of the local queue and when @ref empty Chris@16: * returns false this function returns zero. Chris@16: */ Chris@16: size_type size() const; Chris@16: Chris@16: // private: Chris@16: /** Synchronize the distributed queue and determine if all queues Chris@16: * are empty. Chris@16: * Chris@16: * \returns \c true when all local queues are empty, or false if at least Chris@16: * one of the local queues is nonempty. Chris@16: * Defined as virtual for derived classes like depth_limited_distributed_queue. Chris@16: */ Chris@16: virtual bool do_synchronize() const; Chris@16: Chris@16: private: Chris@16: // Setup triggers Chris@16: void setup_triggers(); Chris@16: Chris@16: // Message handlers Chris@16: void Chris@16: handle_push(int source, int tag, const value_type& value, Chris@16: trigger_receive_context); Chris@16: Chris@16: void Chris@16: handle_multipush(int source, int tag, const std::vector& values, Chris@16: trigger_receive_context); Chris@16: Chris@16: mutable ProcessGroup process_group; Chris@16: OwnerMap owner; Chris@16: mutable Buffer buffer; Chris@16: UnaryPredicate pred; Chris@16: bool polling; Chris@16: Chris@16: typedef std::vector outgoing_buffer_t; Chris@16: typedef std::vector outgoing_buffers_t; Chris@16: shared_ptr outgoing_buffers; Chris@16: }; Chris@16: Chris@16: /// Helper macro containing the normal names for the template Chris@16: /// parameters to distributed_queue. Chris@16: #define BOOST_DISTRIBUTED_QUEUE_PARMS \ Chris@16: typename ProcessGroup, typename OwnerMap, typename Buffer, \ Chris@16: typename UnaryPredicate Chris@16: Chris@16: /// Helper macro containing the normal template-id for Chris@16: /// distributed_queue. Chris@16: #define BOOST_DISTRIBUTED_QUEUE_TYPE \ Chris@16: distributed_queue Chris@16: Chris@16: /** Synchronize all processes involved with the given distributed queue. Chris@16: * Chris@16: * This function will synchronize all of the local queues for a given Chris@16: * distributed queue, by ensuring that no additional messages are in Chris@16: * transit. It is rarely required by the user, because most Chris@16: * synchronization of distributed queues occurs via the @c empty or @c Chris@16: * size methods. Chris@16: */ Chris@16: template Chris@16: inline void Chris@16: synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) Chris@16: { Q.do_synchronize(); } Chris@16: Chris@16: /// Construct a new distributed queue. Chris@16: template Chris@16: inline distributed_queue Chris@16: make_distributed_queue(const ProcessGroup& process_group, Chris@16: const OwnerMap& owner, Chris@16: const Buffer& buffer, Chris@16: bool polling = false) Chris@16: { Chris@16: typedef distributed_queue result_type; Chris@16: return result_type(process_group, owner, buffer, polling); Chris@16: } Chris@16: Chris@16: } } } // end namespace boost::graph::distributed Chris@16: Chris@16: #include Chris@16: Chris@16: #undef BOOST_DISTRIBUTED_QUEUE_TYPE Chris@16: #undef BOOST_DISTRIBUTED_QUEUE_PARMS Chris@16: Chris@16: #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP