diff DEPENDENCIES/generic/include/boost/graph/distributed/queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/DEPENDENCIES/generic/include/boost/graph/distributed/queue.hpp	Tue Aug 05 11:11:38 2014 +0100
@@ -0,0 +1,278 @@
+// Copyright (C) 2004-2006 The Trustees of Indiana University.
+
+// Use, modification and distribution is subject to the Boost Software
+// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+//  Authors: Douglas Gregor
+//           Andrew Lumsdaine
+#ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
+#define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
+
+#ifndef BOOST_GRAPH_USE_MPI
+#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
+#endif
+
+#include <boost/graph/parallel/process_group.hpp>
+#include <boost/optional.hpp>
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace boost { namespace graph { namespace distributed {
+
+/// A unary predicate that always returns "true".
+struct always_push
+{
+  template<typename T> bool operator()(const T&) const { return true; }
+};
+
+
+
+/** A distributed queue adaptor.
+ *
+ * Class template @c distributed_queue implements a distributed queue
+ * across a process group. The distributed queue is an adaptor over an
+ * existing (local) queue, which must model the @ref Buffer
+ * concept. Each process stores a distinct copy of the local queue,
+ * from which it draws or removes elements via the @ref pop and @ref
+ * top members.
+ *
+ * The value type of the local queue must be a model of the @ref
+ * GlobalDescriptor concept. The @ref push operation of the
+ * distributed queue passes (via a message) the value to its owning
+ * processor. Thus, the elements within a particular local queue are
+ * guaranteed to have the process owning that local queue as an owner.
+ *
+ * Synchronization of distributed queues occurs in the @ref empty and
+ * @ref size functions, which will only return "empty" values (true or
+ * 0, respectively) when the entire distributed queue is empty. If the
+ * local queue is empty but the distributed queue is not, the
+ * operation will block until either condition changes. When the @ref
+ * size function of a nonempty queue returns, it returns the size of
+ * the local queue. These semantics were selected so that sequential
+ * code that processes elements in the queue via the following idiom
+ * can be parallelized via introduction of a distributed queue:
+ *
+ *   distributed_queue<...> Q;
+ *   Q.push(x);
+ *   while (!Q.empty()) {
+ *     // do something, that may push a value onto Q
+ *   }
+ *
+ * In the parallel version, the initial @ref push operation will place
+ * the value @c x onto its owner's queue. All processes will
+ * synchronize at the call to empty, and only the process owning @c x
+ * will be allowed to execute the loop (@ref Q.empty() returns
+ * false). This iteration may in turn push values onto other remote
+ * queues, so when that process finishes execution of the loop body
+ * and all processes synchronize again in @ref empty, more processes
+ * may have nonempty local queues to execute. Once all local queues
+ * are empty, @ref Q.empty() returns @c false for all processes.
+ *
+ * The distributed queue can receive messages at two different times:
+ * during synchronization and when polling @ref empty. Messages are
+ * always received during synchronization, to ensure that accurate
+ * local queue sizes can be determines. However, whether @ref empty
+ * should poll for messages is specified as an option to the
+ * constructor. Polling may be desired when the order in which
+ * elements in the queue are processed is not important, because it
+ * permits fewer synchronization steps and less communication
+ * overhead. However, when more strict ordering guarantees are
+ * required, polling may be semantically incorrect. By disabling
+ * polling, one ensures that parallel execution using the idiom above
+ * will not process an element at a later "level" before an earlier
+ * "level".
+ *
+ * The distributed queue nearly models the @ref Buffer
+ * concept. However, the @ref push routine does not necessarily
+ * increase the result of @c size() by one (although the size of the
+ * global queue does increase by one).
+ */
+template<typename ProcessGroup, typename OwnerMap, typename Buffer,
+         typename UnaryPredicate = always_push>
+class distributed_queue
+{
+  typedef distributed_queue self_type;
+
+  enum {
+    /** Message indicating a remote push. The message contains a
+     * single value x of type value_type that is to be pushed on the
+     * receiver's queue.
+     */
+    msg_push,
+    /** Push many elements at once. */
+    msg_multipush
+  };
+
+ public:
+  typedef ProcessGroup                     process_group_type;
+  typedef Buffer                           buffer_type;
+  typedef typename buffer_type::value_type value_type;
+  typedef typename buffer_type::size_type  size_type;
+
+  /** Construct a new distributed queue.
+   *
+   * Build a new distributed queue that communicates over the given @p
+   * process_group, whose local queue is initialized via @p buffer and
+   * which may or may not poll for messages.
+   */
+  explicit
+  distributed_queue(const ProcessGroup& process_group,
+                    const OwnerMap& owner,
+                    const Buffer& buffer,
+                    bool polling = false);
+
+  /** Construct a new distributed queue.
+   *
+   * Build a new distributed queue that communicates over the given @p
+   * process_group, whose local queue is initialized via @p buffer and
+   * which may or may not poll for messages.
+   */
+  explicit
+  distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
+                    const OwnerMap& owner = OwnerMap(),
+                    const Buffer& buffer = Buffer(),
+                    const UnaryPredicate& pred = UnaryPredicate(),
+                    bool polling = false);
+
+  /** Construct a new distributed queue.
+   *
+   * Build a new distributed queue that communicates over the given @p
+   * process_group, whose local queue is default-initalized and which
+   * may or may not poll for messages.
+   */
+  distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
+                    const UnaryPredicate& pred, bool polling = false);
+
+  /** Virtual destructor required with virtual functions.
+   *
+   */
+  virtual ~distributed_queue() {}
+
+  /** Push an element onto the distributed queue.
+   *
+   * The element will be sent to its owner process to be added to that
+   * process's local queue. If polling is enabled for this queue and
+   * the owner process is the current process, the value will be
+   * immediately pushed onto the local queue.
+   *
+   * Complexity: O(1) messages of size O(sizeof(value_type)) will be
+   * transmitted.
+   */
+  void push(const value_type& x);
+
+  /** Pop an element off the local queue.
+   *
+   * @p @c !empty()
+   */
+  void pop() { buffer.pop(); }
+
+  /**
+   * Return the element at the top of the local queue.
+   *
+   * @p @c !empty()
+   */
+  value_type& top() { return buffer.top(); }
+
+  /**
+   * \overload
+   */
+  const value_type& top() const { return buffer.top(); }
+
+  /** Determine if the queue is empty.
+   *
+   * When the local queue is nonempty, returns @c true. If the local
+   * queue is empty, synchronizes with all other processes in the
+   * process group until either (1) the local queue is nonempty
+   * (returns @c true) (2) the entire distributed queue is empty
+   * (returns @c false).
+   */
+  bool empty() const;
+
+  /** Determine the size of the local queue.
+   *
+   * The behavior of this routine is equivalent to the behavior of
+   * @ref empty, except that when @ref empty returns true this
+   * function returns the size of the local queue and when @ref empty
+   * returns false this function returns zero.
+   */
+  size_type size() const;
+
+  // private:
+  /** Synchronize the distributed queue and determine if all queues
+   * are empty.
+   *
+   * \returns \c true when all local queues are empty, or false if at least
+   * one of the local queues is nonempty.
+   * Defined as virtual for derived classes like depth_limited_distributed_queue.
+   */
+  virtual bool do_synchronize() const;
+
+ private:
+  // Setup triggers
+  void setup_triggers();
+
+  // Message handlers
+  void 
+  handle_push(int source, int tag, const value_type& value, 
+              trigger_receive_context);
+
+  void 
+  handle_multipush(int source, int tag, const std::vector<value_type>& values, 
+                   trigger_receive_context);
+
+  mutable ProcessGroup process_group;
+  OwnerMap owner;
+  mutable Buffer buffer;
+  UnaryPredicate pred;
+  bool polling;
+
+  typedef std::vector<value_type> outgoing_buffer_t;
+  typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
+  shared_ptr<outgoing_buffers_t> outgoing_buffers;
+};
+
+/// Helper macro containing the normal names for the template
+/// parameters to distributed_queue.
+#define BOOST_DISTRIBUTED_QUEUE_PARMS                           \
+  typename ProcessGroup, typename OwnerMap, typename Buffer,    \
+  typename UnaryPredicate
+
+/// Helper macro containing the normal template-id for
+/// distributed_queue.
+#define BOOST_DISTRIBUTED_QUEUE_TYPE                                    \
+  distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
+
+/** Synchronize all processes involved with the given distributed queue.
+ *
+ * This function will synchronize all of the local queues for a given
+ * distributed queue, by ensuring that no additional messages are in
+ * transit. It is rarely required by the user, because most
+ * synchronization of distributed queues occurs via the @c empty or @c
+ * size methods.
+ */
+template<BOOST_DISTRIBUTED_QUEUE_PARMS>
+inline void
+synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
+{ Q.do_synchronize(); }
+
+/// Construct a new distributed queue.
+template<typename ProcessGroup, typename OwnerMap, typename Buffer>
+inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
+make_distributed_queue(const ProcessGroup& process_group,
+                       const OwnerMap& owner,
+                       const Buffer& buffer,
+                       bool polling = false)
+{
+  typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
+  return result_type(process_group, owner, buffer, polling);
+}
+
+} } } // end namespace boost::graph::distributed
+
+#include <boost/graph/distributed/detail/queue.ipp>
+
+#undef BOOST_DISTRIBUTED_QUEUE_TYPE
+#undef BOOST_DISTRIBUTED_QUEUE_PARMS
+
+#endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP