annotate DEPENDENCIES/generic/include/boost/graph/distributed/queue.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents 2665513ce2d3
children
rev   line source
Chris@16 1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
Chris@16 2
Chris@16 3 // Use, modification and distribution is subject to the Boost Software
Chris@16 4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6
Chris@16 7 // Authors: Douglas Gregor
Chris@16 8 // Andrew Lumsdaine
Chris@16 9 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
Chris@16 10 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
Chris@16 11
Chris@16 12 #ifndef BOOST_GRAPH_USE_MPI
Chris@16 13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
Chris@16 14 #endif
Chris@16 15
Chris@16 16 #include <boost/graph/parallel/process_group.hpp>
Chris@16 17 #include <boost/optional.hpp>
Chris@16 18 #include <boost/shared_ptr.hpp>
Chris@16 19 #include <vector>
Chris@16 20
Chris@16 21 namespace boost { namespace graph { namespace distributed {
Chris@16 22
Chris@16 23 /// A unary predicate that always returns "true".
Chris@16 24 struct always_push
Chris@16 25 {
Chris@16 26 template<typename T> bool operator()(const T&) const { return true; }
Chris@16 27 };
Chris@16 28
Chris@16 29
Chris@16 30
Chris@16 31 /** A distributed queue adaptor.
Chris@16 32 *
Chris@16 33 * Class template @c distributed_queue implements a distributed queue
Chris@16 34 * across a process group. The distributed queue is an adaptor over an
Chris@16 35 * existing (local) queue, which must model the @ref Buffer
Chris@16 36 * concept. Each process stores a distinct copy of the local queue,
Chris@16 37 * from which it draws or removes elements via the @ref pop and @ref
Chris@16 38 * top members.
Chris@16 39 *
Chris@16 40 * The value type of the local queue must be a model of the @ref
Chris@16 41 * GlobalDescriptor concept. The @ref push operation of the
Chris@16 42 * distributed queue passes (via a message) the value to its owning
Chris@16 43 * processor. Thus, the elements within a particular local queue are
Chris@16 44 * guaranteed to have the process owning that local queue as an owner.
Chris@16 45 *
Chris@16 46 * Synchronization of distributed queues occurs in the @ref empty and
Chris@16 47 * @ref size functions, which will only return "empty" values (true or
Chris@16 48 * 0, respectively) when the entire distributed queue is empty. If the
Chris@16 49 * local queue is empty but the distributed queue is not, the
Chris@16 50 * operation will block until either condition changes. When the @ref
Chris@16 51 * size function of a nonempty queue returns, it returns the size of
Chris@16 52 * the local queue. These semantics were selected so that sequential
Chris@16 53 * code that processes elements in the queue via the following idiom
Chris@16 54 * can be parallelized via introduction of a distributed queue:
Chris@16 55 *
Chris@16 56 * distributed_queue<...> Q;
Chris@16 57 * Q.push(x);
Chris@16 58 * while (!Q.empty()) {
Chris@16 59 * // do something, that may push a value onto Q
Chris@16 60 * }
Chris@16 61 *
Chris@16 62 * In the parallel version, the initial @ref push operation will place
Chris@16 63 * the value @c x onto its owner's queue. All processes will
Chris@16 64 * synchronize at the call to empty, and only the process owning @c x
Chris@16 65 * will be allowed to execute the loop (@ref Q.empty() returns
Chris@16 66 * false). This iteration may in turn push values onto other remote
Chris@16 67 * queues, so when that process finishes execution of the loop body
Chris@16 68 * and all processes synchronize again in @ref empty, more processes
Chris@16 69 * may have nonempty local queues to execute. Once all local queues
Chris@16 70 * are empty, @ref Q.empty() returns @c false for all processes.
Chris@16 71 *
Chris@16 72 * The distributed queue can receive messages at two different times:
Chris@16 73 * during synchronization and when polling @ref empty. Messages are
Chris@16 74 * always received during synchronization, to ensure that accurate
Chris@16 75 * local queue sizes can be determines. However, whether @ref empty
Chris@16 76 * should poll for messages is specified as an option to the
Chris@16 77 * constructor. Polling may be desired when the order in which
Chris@16 78 * elements in the queue are processed is not important, because it
Chris@16 79 * permits fewer synchronization steps and less communication
Chris@16 80 * overhead. However, when more strict ordering guarantees are
Chris@16 81 * required, polling may be semantically incorrect. By disabling
Chris@16 82 * polling, one ensures that parallel execution using the idiom above
Chris@16 83 * will not process an element at a later "level" before an earlier
Chris@16 84 * "level".
Chris@16 85 *
Chris@16 86 * The distributed queue nearly models the @ref Buffer
Chris@16 87 * concept. However, the @ref push routine does not necessarily
Chris@16 88 * increase the result of @c size() by one (although the size of the
Chris@16 89 * global queue does increase by one).
Chris@16 90 */
Chris@16 91 template<typename ProcessGroup, typename OwnerMap, typename Buffer,
Chris@16 92 typename UnaryPredicate = always_push>
Chris@16 93 class distributed_queue
Chris@16 94 {
Chris@16 95 typedef distributed_queue self_type;
Chris@16 96
Chris@16 97 enum {
Chris@16 98 /** Message indicating a remote push. The message contains a
Chris@16 99 * single value x of type value_type that is to be pushed on the
Chris@16 100 * receiver's queue.
Chris@16 101 */
Chris@16 102 msg_push,
Chris@16 103 /** Push many elements at once. */
Chris@16 104 msg_multipush
Chris@16 105 };
Chris@16 106
Chris@16 107 public:
Chris@16 108 typedef ProcessGroup process_group_type;
Chris@16 109 typedef Buffer buffer_type;
Chris@16 110 typedef typename buffer_type::value_type value_type;
Chris@16 111 typedef typename buffer_type::size_type size_type;
Chris@16 112
Chris@16 113 /** Construct a new distributed queue.
Chris@16 114 *
Chris@16 115 * Build a new distributed queue that communicates over the given @p
Chris@16 116 * process_group, whose local queue is initialized via @p buffer and
Chris@16 117 * which may or may not poll for messages.
Chris@16 118 */
Chris@16 119 explicit
Chris@16 120 distributed_queue(const ProcessGroup& process_group,
Chris@16 121 const OwnerMap& owner,
Chris@16 122 const Buffer& buffer,
Chris@16 123 bool polling = false);
Chris@16 124
Chris@16 125 /** Construct a new distributed queue.
Chris@16 126 *
Chris@16 127 * Build a new distributed queue that communicates over the given @p
Chris@16 128 * process_group, whose local queue is initialized via @p buffer and
Chris@16 129 * which may or may not poll for messages.
Chris@16 130 */
Chris@16 131 explicit
Chris@16 132 distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
Chris@16 133 const OwnerMap& owner = OwnerMap(),
Chris@16 134 const Buffer& buffer = Buffer(),
Chris@16 135 const UnaryPredicate& pred = UnaryPredicate(),
Chris@16 136 bool polling = false);
Chris@16 137
Chris@16 138 /** Construct a new distributed queue.
Chris@16 139 *
Chris@16 140 * Build a new distributed queue that communicates over the given @p
Chris@16 141 * process_group, whose local queue is default-initalized and which
Chris@16 142 * may or may not poll for messages.
Chris@16 143 */
Chris@16 144 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
Chris@16 145 const UnaryPredicate& pred, bool polling = false);
Chris@16 146
Chris@16 147 /** Virtual destructor required with virtual functions.
Chris@16 148 *
Chris@16 149 */
Chris@16 150 virtual ~distributed_queue() {}
Chris@16 151
Chris@16 152 /** Push an element onto the distributed queue.
Chris@16 153 *
Chris@16 154 * The element will be sent to its owner process to be added to that
Chris@16 155 * process's local queue. If polling is enabled for this queue and
Chris@16 156 * the owner process is the current process, the value will be
Chris@16 157 * immediately pushed onto the local queue.
Chris@16 158 *
Chris@16 159 * Complexity: O(1) messages of size O(sizeof(value_type)) will be
Chris@16 160 * transmitted.
Chris@16 161 */
Chris@16 162 void push(const value_type& x);
Chris@16 163
Chris@16 164 /** Pop an element off the local queue.
Chris@16 165 *
Chris@16 166 * @p @c !empty()
Chris@16 167 */
Chris@16 168 void pop() { buffer.pop(); }
Chris@16 169
Chris@16 170 /**
Chris@16 171 * Return the element at the top of the local queue.
Chris@16 172 *
Chris@16 173 * @p @c !empty()
Chris@16 174 */
Chris@16 175 value_type& top() { return buffer.top(); }
Chris@16 176
Chris@16 177 /**
Chris@16 178 * \overload
Chris@16 179 */
Chris@16 180 const value_type& top() const { return buffer.top(); }
Chris@16 181
Chris@16 182 /** Determine if the queue is empty.
Chris@16 183 *
Chris@16 184 * When the local queue is nonempty, returns @c true. If the local
Chris@16 185 * queue is empty, synchronizes with all other processes in the
Chris@16 186 * process group until either (1) the local queue is nonempty
Chris@16 187 * (returns @c true) (2) the entire distributed queue is empty
Chris@16 188 * (returns @c false).
Chris@16 189 */
Chris@16 190 bool empty() const;
Chris@16 191
Chris@16 192 /** Determine the size of the local queue.
Chris@16 193 *
Chris@16 194 * The behavior of this routine is equivalent to the behavior of
Chris@16 195 * @ref empty, except that when @ref empty returns true this
Chris@16 196 * function returns the size of the local queue and when @ref empty
Chris@16 197 * returns false this function returns zero.
Chris@16 198 */
Chris@16 199 size_type size() const;
Chris@16 200
Chris@16 201 // private:
Chris@16 202 /** Synchronize the distributed queue and determine if all queues
Chris@16 203 * are empty.
Chris@16 204 *
Chris@16 205 * \returns \c true when all local queues are empty, or false if at least
Chris@16 206 * one of the local queues is nonempty.
Chris@16 207 * Defined as virtual for derived classes like depth_limited_distributed_queue.
Chris@16 208 */
Chris@16 209 virtual bool do_synchronize() const;
Chris@16 210
Chris@16 211 private:
Chris@16 212 // Setup triggers
Chris@16 213 void setup_triggers();
Chris@16 214
Chris@16 215 // Message handlers
Chris@16 216 void
Chris@16 217 handle_push(int source, int tag, const value_type& value,
Chris@16 218 trigger_receive_context);
Chris@16 219
Chris@16 220 void
Chris@16 221 handle_multipush(int source, int tag, const std::vector<value_type>& values,
Chris@16 222 trigger_receive_context);
Chris@16 223
Chris@16 224 mutable ProcessGroup process_group;
Chris@16 225 OwnerMap owner;
Chris@16 226 mutable Buffer buffer;
Chris@16 227 UnaryPredicate pred;
Chris@16 228 bool polling;
Chris@16 229
Chris@16 230 typedef std::vector<value_type> outgoing_buffer_t;
Chris@16 231 typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
Chris@16 232 shared_ptr<outgoing_buffers_t> outgoing_buffers;
Chris@16 233 };
Chris@16 234
Chris@16 235 /// Helper macro containing the normal names for the template
Chris@16 236 /// parameters to distributed_queue.
Chris@16 237 #define BOOST_DISTRIBUTED_QUEUE_PARMS \
Chris@16 238 typename ProcessGroup, typename OwnerMap, typename Buffer, \
Chris@16 239 typename UnaryPredicate
Chris@16 240
Chris@16 241 /// Helper macro containing the normal template-id for
Chris@16 242 /// distributed_queue.
Chris@16 243 #define BOOST_DISTRIBUTED_QUEUE_TYPE \
Chris@16 244 distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
Chris@16 245
Chris@16 246 /** Synchronize all processes involved with the given distributed queue.
Chris@16 247 *
Chris@16 248 * This function will synchronize all of the local queues for a given
Chris@16 249 * distributed queue, by ensuring that no additional messages are in
Chris@16 250 * transit. It is rarely required by the user, because most
Chris@16 251 * synchronization of distributed queues occurs via the @c empty or @c
Chris@16 252 * size methods.
Chris@16 253 */
Chris@16 254 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 255 inline void
Chris@16 256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
Chris@16 257 { Q.do_synchronize(); }
Chris@16 258
Chris@16 259 /// Construct a new distributed queue.
Chris@16 260 template<typename ProcessGroup, typename OwnerMap, typename Buffer>
Chris@16 261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
Chris@16 262 make_distributed_queue(const ProcessGroup& process_group,
Chris@16 263 const OwnerMap& owner,
Chris@16 264 const Buffer& buffer,
Chris@16 265 bool polling = false)
Chris@16 266 {
Chris@16 267 typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
Chris@16 268 return result_type(process_group, owner, buffer, polling);
Chris@16 269 }
Chris@16 270
Chris@16 271 } } } // end namespace boost::graph::distributed
Chris@16 272
Chris@16 273 #include <boost/graph/distributed/detail/queue.ipp>
Chris@16 274
Chris@16 275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE
Chris@16 276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS
Chris@16 277
Chris@16 278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP