Mercurial > hg > vamp-build-and-test
diff DEPENDENCIES/generic/include/boost/graph/distributed/queue.hpp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/DEPENDENCIES/generic/include/boost/graph/distributed/queue.hpp Tue Aug 05 11:11:38 2014 +0100 @@ -0,0 +1,278 @@ +// Copyright (C) 2004-2006 The Trustees of Indiana University. + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Authors: Douglas Gregor +// Andrew Lumsdaine +#ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP +#define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP + +#ifndef BOOST_GRAPH_USE_MPI +#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" +#endif + +#include <boost/graph/parallel/process_group.hpp> +#include <boost/optional.hpp> +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace boost { namespace graph { namespace distributed { + +/// A unary predicate that always returns "true". +struct always_push +{ + template<typename T> bool operator()(const T&) const { return true; } +}; + + + +/** A distributed queue adaptor. + * + * Class template @c distributed_queue implements a distributed queue + * across a process group. The distributed queue is an adaptor over an + * existing (local) queue, which must model the @ref Buffer + * concept. Each process stores a distinct copy of the local queue, + * from which it draws or removes elements via the @ref pop and @ref + * top members. + * + * The value type of the local queue must be a model of the @ref + * GlobalDescriptor concept. The @ref push operation of the + * distributed queue passes (via a message) the value to its owning + * processor. Thus, the elements within a particular local queue are + * guaranteed to have the process owning that local queue as an owner. + * + * Synchronization of distributed queues occurs in the @ref empty and + * @ref size functions, which will only return "empty" values (true or + * 0, respectively) when the entire distributed queue is empty. If the + * local queue is empty but the distributed queue is not, the + * operation will block until either condition changes. When the @ref + * size function of a nonempty queue returns, it returns the size of + * the local queue. These semantics were selected so that sequential + * code that processes elements in the queue via the following idiom + * can be parallelized via introduction of a distributed queue: + * + * distributed_queue<...> Q; + * Q.push(x); + * while (!Q.empty()) { + * // do something, that may push a value onto Q + * } + * + * In the parallel version, the initial @ref push operation will place + * the value @c x onto its owner's queue. All processes will + * synchronize at the call to empty, and only the process owning @c x + * will be allowed to execute the loop (@ref Q.empty() returns + * false). This iteration may in turn push values onto other remote + * queues, so when that process finishes execution of the loop body + * and all processes synchronize again in @ref empty, more processes + * may have nonempty local queues to execute. Once all local queues + * are empty, @ref Q.empty() returns @c false for all processes. + * + * The distributed queue can receive messages at two different times: + * during synchronization and when polling @ref empty. Messages are + * always received during synchronization, to ensure that accurate + * local queue sizes can be determines. However, whether @ref empty + * should poll for messages is specified as an option to the + * constructor. Polling may be desired when the order in which + * elements in the queue are processed is not important, because it + * permits fewer synchronization steps and less communication + * overhead. However, when more strict ordering guarantees are + * required, polling may be semantically incorrect. By disabling + * polling, one ensures that parallel execution using the idiom above + * will not process an element at a later "level" before an earlier + * "level". + * + * The distributed queue nearly models the @ref Buffer + * concept. However, the @ref push routine does not necessarily + * increase the result of @c size() by one (although the size of the + * global queue does increase by one). + */ +template<typename ProcessGroup, typename OwnerMap, typename Buffer, + typename UnaryPredicate = always_push> +class distributed_queue +{ + typedef distributed_queue self_type; + + enum { + /** Message indicating a remote push. The message contains a + * single value x of type value_type that is to be pushed on the + * receiver's queue. + */ + msg_push, + /** Push many elements at once. */ + msg_multipush + }; + + public: + typedef ProcessGroup process_group_type; + typedef Buffer buffer_type; + typedef typename buffer_type::value_type value_type; + typedef typename buffer_type::size_type size_type; + + /** Construct a new distributed queue. + * + * Build a new distributed queue that communicates over the given @p + * process_group, whose local queue is initialized via @p buffer and + * which may or may not poll for messages. + */ + explicit + distributed_queue(const ProcessGroup& process_group, + const OwnerMap& owner, + const Buffer& buffer, + bool polling = false); + + /** Construct a new distributed queue. + * + * Build a new distributed queue that communicates over the given @p + * process_group, whose local queue is initialized via @p buffer and + * which may or may not poll for messages. + */ + explicit + distributed_queue(const ProcessGroup& process_group = ProcessGroup(), + const OwnerMap& owner = OwnerMap(), + const Buffer& buffer = Buffer(), + const UnaryPredicate& pred = UnaryPredicate(), + bool polling = false); + + /** Construct a new distributed queue. + * + * Build a new distributed queue that communicates over the given @p + * process_group, whose local queue is default-initalized and which + * may or may not poll for messages. + */ + distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, + const UnaryPredicate& pred, bool polling = false); + + /** Virtual destructor required with virtual functions. + * + */ + virtual ~distributed_queue() {} + + /** Push an element onto the distributed queue. + * + * The element will be sent to its owner process to be added to that + * process's local queue. If polling is enabled for this queue and + * the owner process is the current process, the value will be + * immediately pushed onto the local queue. + * + * Complexity: O(1) messages of size O(sizeof(value_type)) will be + * transmitted. + */ + void push(const value_type& x); + + /** Pop an element off the local queue. + * + * @p @c !empty() + */ + void pop() { buffer.pop(); } + + /** + * Return the element at the top of the local queue. + * + * @p @c !empty() + */ + value_type& top() { return buffer.top(); } + + /** + * \overload + */ + const value_type& top() const { return buffer.top(); } + + /** Determine if the queue is empty. + * + * When the local queue is nonempty, returns @c true. If the local + * queue is empty, synchronizes with all other processes in the + * process group until either (1) the local queue is nonempty + * (returns @c true) (2) the entire distributed queue is empty + * (returns @c false). + */ + bool empty() const; + + /** Determine the size of the local queue. + * + * The behavior of this routine is equivalent to the behavior of + * @ref empty, except that when @ref empty returns true this + * function returns the size of the local queue and when @ref empty + * returns false this function returns zero. + */ + size_type size() const; + + // private: + /** Synchronize the distributed queue and determine if all queues + * are empty. + * + * \returns \c true when all local queues are empty, or false if at least + * one of the local queues is nonempty. + * Defined as virtual for derived classes like depth_limited_distributed_queue. + */ + virtual bool do_synchronize() const; + + private: + // Setup triggers + void setup_triggers(); + + // Message handlers + void + handle_push(int source, int tag, const value_type& value, + trigger_receive_context); + + void + handle_multipush(int source, int tag, const std::vector<value_type>& values, + trigger_receive_context); + + mutable ProcessGroup process_group; + OwnerMap owner; + mutable Buffer buffer; + UnaryPredicate pred; + bool polling; + + typedef std::vector<value_type> outgoing_buffer_t; + typedef std::vector<outgoing_buffer_t> outgoing_buffers_t; + shared_ptr<outgoing_buffers_t> outgoing_buffers; +}; + +/// Helper macro containing the normal names for the template +/// parameters to distributed_queue. +#define BOOST_DISTRIBUTED_QUEUE_PARMS \ + typename ProcessGroup, typename OwnerMap, typename Buffer, \ + typename UnaryPredicate + +/// Helper macro containing the normal template-id for +/// distributed_queue. +#define BOOST_DISTRIBUTED_QUEUE_TYPE \ + distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate> + +/** Synchronize all processes involved with the given distributed queue. + * + * This function will synchronize all of the local queues for a given + * distributed queue, by ensuring that no additional messages are in + * transit. It is rarely required by the user, because most + * synchronization of distributed queues occurs via the @c empty or @c + * size methods. + */ +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +inline void +synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) +{ Q.do_synchronize(); } + +/// Construct a new distributed queue. +template<typename ProcessGroup, typename OwnerMap, typename Buffer> +inline distributed_queue<ProcessGroup, OwnerMap, Buffer> +make_distributed_queue(const ProcessGroup& process_group, + const OwnerMap& owner, + const Buffer& buffer, + bool polling = false) +{ + typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type; + return result_type(process_group, owner, buffer, polling); +} + +} } } // end namespace boost::graph::distributed + +#include <boost/graph/distributed/detail/queue.ipp> + +#undef BOOST_DISTRIBUTED_QUEUE_TYPE +#undef BOOST_DISTRIBUTED_QUEUE_PARMS + +#endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP