Chris@16
|
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
|
Chris@16
|
2
|
Chris@16
|
3 // Use, modification and distribution is subject to the Boost Software
|
Chris@16
|
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
5 // http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
6
|
Chris@16
|
7 // Authors: Douglas Gregor
|
Chris@16
|
8 // Andrew Lumsdaine
|
Chris@16
|
9 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
|
Chris@16
|
10 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
|
Chris@16
|
11
|
Chris@16
|
12 #ifndef BOOST_GRAPH_USE_MPI
|
Chris@16
|
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
Chris@16
|
14 #endif
|
Chris@16
|
15
|
Chris@16
|
16 #include <boost/graph/parallel/process_group.hpp>
|
Chris@16
|
17 #include <boost/optional.hpp>
|
Chris@16
|
18 #include <boost/shared_ptr.hpp>
|
Chris@16
|
19 #include <vector>
|
Chris@16
|
20
|
Chris@16
|
21 namespace boost { namespace graph { namespace distributed {
|
Chris@16
|
22
|
Chris@16
|
23 /// A unary predicate that always returns "true".
|
Chris@16
|
24 struct always_push
|
Chris@16
|
25 {
|
Chris@16
|
26 template<typename T> bool operator()(const T&) const { return true; }
|
Chris@16
|
27 };
|
Chris@16
|
28
|
Chris@16
|
29
|
Chris@16
|
30
|
Chris@16
|
31 /** A distributed queue adaptor.
|
Chris@16
|
32 *
|
Chris@16
|
33 * Class template @c distributed_queue implements a distributed queue
|
Chris@16
|
34 * across a process group. The distributed queue is an adaptor over an
|
Chris@16
|
35 * existing (local) queue, which must model the @ref Buffer
|
Chris@16
|
36 * concept. Each process stores a distinct copy of the local queue,
|
Chris@16
|
37 * from which it draws or removes elements via the @ref pop and @ref
|
Chris@16
|
38 * top members.
|
Chris@16
|
39 *
|
Chris@16
|
40 * The value type of the local queue must be a model of the @ref
|
Chris@16
|
41 * GlobalDescriptor concept. The @ref push operation of the
|
Chris@16
|
42 * distributed queue passes (via a message) the value to its owning
|
Chris@16
|
43 * processor. Thus, the elements within a particular local queue are
|
Chris@16
|
44 * guaranteed to have the process owning that local queue as an owner.
|
Chris@16
|
45 *
|
Chris@16
|
46 * Synchronization of distributed queues occurs in the @ref empty and
|
Chris@16
|
47 * @ref size functions, which will only return "empty" values (true or
|
Chris@16
|
48 * 0, respectively) when the entire distributed queue is empty. If the
|
Chris@16
|
49 * local queue is empty but the distributed queue is not, the
|
Chris@16
|
50 * operation will block until either condition changes. When the @ref
|
Chris@16
|
51 * size function of a nonempty queue returns, it returns the size of
|
Chris@16
|
52 * the local queue. These semantics were selected so that sequential
|
Chris@16
|
53 * code that processes elements in the queue via the following idiom
|
Chris@16
|
54 * can be parallelized via introduction of a distributed queue:
|
Chris@16
|
55 *
|
Chris@16
|
56 * distributed_queue<...> Q;
|
Chris@16
|
57 * Q.push(x);
|
Chris@16
|
58 * while (!Q.empty()) {
|
Chris@16
|
59 * // do something, that may push a value onto Q
|
Chris@16
|
60 * }
|
Chris@16
|
61 *
|
Chris@16
|
62 * In the parallel version, the initial @ref push operation will place
|
Chris@16
|
63 * the value @c x onto its owner's queue. All processes will
|
Chris@16
|
64 * synchronize at the call to empty, and only the process owning @c x
|
Chris@16
|
65 * will be allowed to execute the loop (@ref Q.empty() returns
|
Chris@16
|
66 * false). This iteration may in turn push values onto other remote
|
Chris@16
|
67 * queues, so when that process finishes execution of the loop body
|
Chris@16
|
68 * and all processes synchronize again in @ref empty, more processes
|
Chris@16
|
69 * may have nonempty local queues to execute. Once all local queues
|
Chris@16
|
70 * are empty, @ref Q.empty() returns @c false for all processes.
|
Chris@16
|
71 *
|
Chris@16
|
72 * The distributed queue can receive messages at two different times:
|
Chris@16
|
73 * during synchronization and when polling @ref empty. Messages are
|
Chris@16
|
74 * always received during synchronization, to ensure that accurate
|
Chris@16
|
75 * local queue sizes can be determines. However, whether @ref empty
|
Chris@16
|
76 * should poll for messages is specified as an option to the
|
Chris@16
|
77 * constructor. Polling may be desired when the order in which
|
Chris@16
|
78 * elements in the queue are processed is not important, because it
|
Chris@16
|
79 * permits fewer synchronization steps and less communication
|
Chris@16
|
80 * overhead. However, when more strict ordering guarantees are
|
Chris@16
|
81 * required, polling may be semantically incorrect. By disabling
|
Chris@16
|
82 * polling, one ensures that parallel execution using the idiom above
|
Chris@16
|
83 * will not process an element at a later "level" before an earlier
|
Chris@16
|
84 * "level".
|
Chris@16
|
85 *
|
Chris@16
|
86 * The distributed queue nearly models the @ref Buffer
|
Chris@16
|
87 * concept. However, the @ref push routine does not necessarily
|
Chris@16
|
88 * increase the result of @c size() by one (although the size of the
|
Chris@16
|
89 * global queue does increase by one).
|
Chris@16
|
90 */
|
Chris@16
|
91 template<typename ProcessGroup, typename OwnerMap, typename Buffer,
|
Chris@16
|
92 typename UnaryPredicate = always_push>
|
Chris@16
|
93 class distributed_queue
|
Chris@16
|
94 {
|
Chris@16
|
95 typedef distributed_queue self_type;
|
Chris@16
|
96
|
Chris@16
|
97 enum {
|
Chris@16
|
98 /** Message indicating a remote push. The message contains a
|
Chris@16
|
99 * single value x of type value_type that is to be pushed on the
|
Chris@16
|
100 * receiver's queue.
|
Chris@16
|
101 */
|
Chris@16
|
102 msg_push,
|
Chris@16
|
103 /** Push many elements at once. */
|
Chris@16
|
104 msg_multipush
|
Chris@16
|
105 };
|
Chris@16
|
106
|
Chris@16
|
107 public:
|
Chris@16
|
108 typedef ProcessGroup process_group_type;
|
Chris@16
|
109 typedef Buffer buffer_type;
|
Chris@16
|
110 typedef typename buffer_type::value_type value_type;
|
Chris@16
|
111 typedef typename buffer_type::size_type size_type;
|
Chris@16
|
112
|
Chris@16
|
113 /** Construct a new distributed queue.
|
Chris@16
|
114 *
|
Chris@16
|
115 * Build a new distributed queue that communicates over the given @p
|
Chris@16
|
116 * process_group, whose local queue is initialized via @p buffer and
|
Chris@16
|
117 * which may or may not poll for messages.
|
Chris@16
|
118 */
|
Chris@16
|
119 explicit
|
Chris@16
|
120 distributed_queue(const ProcessGroup& process_group,
|
Chris@16
|
121 const OwnerMap& owner,
|
Chris@16
|
122 const Buffer& buffer,
|
Chris@16
|
123 bool polling = false);
|
Chris@16
|
124
|
Chris@16
|
125 /** Construct a new distributed queue.
|
Chris@16
|
126 *
|
Chris@16
|
127 * Build a new distributed queue that communicates over the given @p
|
Chris@16
|
128 * process_group, whose local queue is initialized via @p buffer and
|
Chris@16
|
129 * which may or may not poll for messages.
|
Chris@16
|
130 */
|
Chris@16
|
131 explicit
|
Chris@16
|
132 distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
|
Chris@16
|
133 const OwnerMap& owner = OwnerMap(),
|
Chris@16
|
134 const Buffer& buffer = Buffer(),
|
Chris@16
|
135 const UnaryPredicate& pred = UnaryPredicate(),
|
Chris@16
|
136 bool polling = false);
|
Chris@16
|
137
|
Chris@16
|
138 /** Construct a new distributed queue.
|
Chris@16
|
139 *
|
Chris@16
|
140 * Build a new distributed queue that communicates over the given @p
|
Chris@16
|
141 * process_group, whose local queue is default-initalized and which
|
Chris@16
|
142 * may or may not poll for messages.
|
Chris@16
|
143 */
|
Chris@16
|
144 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
Chris@16
|
145 const UnaryPredicate& pred, bool polling = false);
|
Chris@16
|
146
|
Chris@16
|
147 /** Virtual destructor required with virtual functions.
|
Chris@16
|
148 *
|
Chris@16
|
149 */
|
Chris@16
|
150 virtual ~distributed_queue() {}
|
Chris@16
|
151
|
Chris@16
|
152 /** Push an element onto the distributed queue.
|
Chris@16
|
153 *
|
Chris@16
|
154 * The element will be sent to its owner process to be added to that
|
Chris@16
|
155 * process's local queue. If polling is enabled for this queue and
|
Chris@16
|
156 * the owner process is the current process, the value will be
|
Chris@16
|
157 * immediately pushed onto the local queue.
|
Chris@16
|
158 *
|
Chris@16
|
159 * Complexity: O(1) messages of size O(sizeof(value_type)) will be
|
Chris@16
|
160 * transmitted.
|
Chris@16
|
161 */
|
Chris@16
|
162 void push(const value_type& x);
|
Chris@16
|
163
|
Chris@16
|
164 /** Pop an element off the local queue.
|
Chris@16
|
165 *
|
Chris@16
|
166 * @p @c !empty()
|
Chris@16
|
167 */
|
Chris@16
|
168 void pop() { buffer.pop(); }
|
Chris@16
|
169
|
Chris@16
|
170 /**
|
Chris@16
|
171 * Return the element at the top of the local queue.
|
Chris@16
|
172 *
|
Chris@16
|
173 * @p @c !empty()
|
Chris@16
|
174 */
|
Chris@16
|
175 value_type& top() { return buffer.top(); }
|
Chris@16
|
176
|
Chris@16
|
177 /**
|
Chris@16
|
178 * \overload
|
Chris@16
|
179 */
|
Chris@16
|
180 const value_type& top() const { return buffer.top(); }
|
Chris@16
|
181
|
Chris@16
|
182 /** Determine if the queue is empty.
|
Chris@16
|
183 *
|
Chris@16
|
184 * When the local queue is nonempty, returns @c true. If the local
|
Chris@16
|
185 * queue is empty, synchronizes with all other processes in the
|
Chris@16
|
186 * process group until either (1) the local queue is nonempty
|
Chris@16
|
187 * (returns @c true) (2) the entire distributed queue is empty
|
Chris@16
|
188 * (returns @c false).
|
Chris@16
|
189 */
|
Chris@16
|
190 bool empty() const;
|
Chris@16
|
191
|
Chris@16
|
192 /** Determine the size of the local queue.
|
Chris@16
|
193 *
|
Chris@16
|
194 * The behavior of this routine is equivalent to the behavior of
|
Chris@16
|
195 * @ref empty, except that when @ref empty returns true this
|
Chris@16
|
196 * function returns the size of the local queue and when @ref empty
|
Chris@16
|
197 * returns false this function returns zero.
|
Chris@16
|
198 */
|
Chris@16
|
199 size_type size() const;
|
Chris@16
|
200
|
Chris@16
|
201 // private:
|
Chris@16
|
202 /** Synchronize the distributed queue and determine if all queues
|
Chris@16
|
203 * are empty.
|
Chris@16
|
204 *
|
Chris@16
|
205 * \returns \c true when all local queues are empty, or false if at least
|
Chris@16
|
206 * one of the local queues is nonempty.
|
Chris@16
|
207 * Defined as virtual for derived classes like depth_limited_distributed_queue.
|
Chris@16
|
208 */
|
Chris@16
|
209 virtual bool do_synchronize() const;
|
Chris@16
|
210
|
Chris@16
|
211 private:
|
Chris@16
|
212 // Setup triggers
|
Chris@16
|
213 void setup_triggers();
|
Chris@16
|
214
|
Chris@16
|
215 // Message handlers
|
Chris@16
|
216 void
|
Chris@16
|
217 handle_push(int source, int tag, const value_type& value,
|
Chris@16
|
218 trigger_receive_context);
|
Chris@16
|
219
|
Chris@16
|
220 void
|
Chris@16
|
221 handle_multipush(int source, int tag, const std::vector<value_type>& values,
|
Chris@16
|
222 trigger_receive_context);
|
Chris@16
|
223
|
Chris@16
|
224 mutable ProcessGroup process_group;
|
Chris@16
|
225 OwnerMap owner;
|
Chris@16
|
226 mutable Buffer buffer;
|
Chris@16
|
227 UnaryPredicate pred;
|
Chris@16
|
228 bool polling;
|
Chris@16
|
229
|
Chris@16
|
230 typedef std::vector<value_type> outgoing_buffer_t;
|
Chris@16
|
231 typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
|
Chris@16
|
232 shared_ptr<outgoing_buffers_t> outgoing_buffers;
|
Chris@16
|
233 };
|
Chris@16
|
234
|
Chris@16
|
235 /// Helper macro containing the normal names for the template
|
Chris@16
|
236 /// parameters to distributed_queue.
|
Chris@16
|
237 #define BOOST_DISTRIBUTED_QUEUE_PARMS \
|
Chris@16
|
238 typename ProcessGroup, typename OwnerMap, typename Buffer, \
|
Chris@16
|
239 typename UnaryPredicate
|
Chris@16
|
240
|
Chris@16
|
241 /// Helper macro containing the normal template-id for
|
Chris@16
|
242 /// distributed_queue.
|
Chris@16
|
243 #define BOOST_DISTRIBUTED_QUEUE_TYPE \
|
Chris@16
|
244 distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
|
Chris@16
|
245
|
Chris@16
|
246 /** Synchronize all processes involved with the given distributed queue.
|
Chris@16
|
247 *
|
Chris@16
|
248 * This function will synchronize all of the local queues for a given
|
Chris@16
|
249 * distributed queue, by ensuring that no additional messages are in
|
Chris@16
|
250 * transit. It is rarely required by the user, because most
|
Chris@16
|
251 * synchronization of distributed queues occurs via the @c empty or @c
|
Chris@16
|
252 * size methods.
|
Chris@16
|
253 */
|
Chris@16
|
254 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
255 inline void
|
Chris@16
|
256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
|
Chris@16
|
257 { Q.do_synchronize(); }
|
Chris@16
|
258
|
Chris@16
|
259 /// Construct a new distributed queue.
|
Chris@16
|
260 template<typename ProcessGroup, typename OwnerMap, typename Buffer>
|
Chris@16
|
261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
|
Chris@16
|
262 make_distributed_queue(const ProcessGroup& process_group,
|
Chris@16
|
263 const OwnerMap& owner,
|
Chris@16
|
264 const Buffer& buffer,
|
Chris@16
|
265 bool polling = false)
|
Chris@16
|
266 {
|
Chris@16
|
267 typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
|
Chris@16
|
268 return result_type(process_group, owner, buffer, polling);
|
Chris@16
|
269 }
|
Chris@16
|
270
|
Chris@16
|
271 } } } // end namespace boost::graph::distributed
|
Chris@16
|
272
|
Chris@16
|
273 #include <boost/graph/distributed/detail/queue.ipp>
|
Chris@16
|
274
|
Chris@16
|
275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE
|
Chris@16
|
276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS
|
Chris@16
|
277
|
Chris@16
|
278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
|