Chris@16
|
1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
|
Chris@16
|
2 // Copyright (C) 2007 Douglas Gregor
|
Chris@16
|
3 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
|
Chris@16
|
4
|
Chris@16
|
5 // Use, modification and distribution is subject to the Boost Software
|
Chris@16
|
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
7 // http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
8
|
Chris@16
|
9 // Authors: Douglas Gregor
|
Chris@16
|
10 // Matthias Troyer
|
Chris@16
|
11 // Andrew Lumsdaine
|
Chris@16
|
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
|
Chris@16
|
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
|
Chris@16
|
14
|
Chris@16
|
15 #ifndef BOOST_GRAPH_USE_MPI
|
Chris@16
|
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
Chris@16
|
17 #endif
|
Chris@16
|
18
|
Chris@16
|
19 //#define NO_SPLIT_BATCHES
|
Chris@16
|
20 #define SEND_OOB_BSEND
|
Chris@16
|
21
|
Chris@16
|
22 #include <boost/optional.hpp>
|
Chris@16
|
23 #include <boost/shared_ptr.hpp>
|
Chris@16
|
24 #include <boost/weak_ptr.hpp>
|
Chris@16
|
25 #include <utility>
|
Chris@16
|
26 #include <memory>
|
Chris@16
|
27 #include <boost/function/function1.hpp>
|
Chris@16
|
28 #include <boost/function/function2.hpp>
|
Chris@16
|
29 #include <boost/function/function0.hpp>
|
Chris@16
|
30 #include <boost/mpi.hpp>
|
Chris@101
|
31 #include <boost/property_map/parallel/process_group.hpp>
|
Chris@16
|
32 #include <boost/utility/enable_if.hpp>
|
Chris@16
|
33
|
Chris@16
|
34 namespace boost { namespace graph { namespace distributed {
|
Chris@16
|
35
|
Chris@16
|
36 // Process group tags
|
Chris@101
|
37 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
|
Chris@16
|
38
|
Chris@16
|
39 class mpi_process_group
|
Chris@16
|
40 {
|
Chris@16
|
41 struct impl;
|
Chris@16
|
42
|
Chris@16
|
43 public:
|
Chris@16
|
44 /// Number of tags available to each data structure.
|
Chris@16
|
45 static const int max_tags = 256;
|
Chris@16
|
46
|
Chris@16
|
47 /**
|
Chris@16
|
48 * The type of a "receive" handler, that will be provided with
|
Chris@16
|
49 * (source, tag) pairs when a message is received. Users can provide a
|
Chris@16
|
50 * receive handler for a distributed data structure, for example, to
|
Chris@16
|
51 * automatically pick up and respond to messages as needed.
|
Chris@16
|
52 */
|
Chris@16
|
53 typedef function<void(int source, int tag)> receiver_type;
|
Chris@16
|
54
|
Chris@16
|
55 /**
|
Chris@16
|
56 * The type of a handler for the on-synchronize event, which will be
|
Chris@16
|
57 * executed at the beginning of synchronize().
|
Chris@16
|
58 */
|
Chris@16
|
59 typedef function0<void> on_synchronize_event_type;
|
Chris@16
|
60
|
Chris@16
|
61 /// Used as a tag to help create an "empty" process group.
|
Chris@16
|
62 struct create_empty {};
|
Chris@16
|
63
|
Chris@16
|
64 /// The type used to buffer message data
|
Chris@16
|
65 typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
|
Chris@16
|
66
|
Chris@16
|
67 /// The type used to identify a process
|
Chris@16
|
68 typedef int process_id_type;
|
Chris@16
|
69
|
Chris@16
|
70 /// The type used to count the number of processes
|
Chris@16
|
71 typedef int process_size_type;
|
Chris@16
|
72
|
Chris@16
|
73 /// The type of communicator used to transmit data via MPI
|
Chris@16
|
74 typedef boost::mpi::communicator communicator_type;
|
Chris@16
|
75
|
Chris@16
|
76 /// Classification of the capabilities of this process group
|
Chris@16
|
77 struct communication_category
|
Chris@101
|
78 : virtual boost::parallel::bsp_process_group_tag,
|
Chris@16
|
79 virtual mpi_process_group_tag { };
|
Chris@16
|
80
|
Chris@16
|
81 // TBD: We can eliminate the "source" field and possibly the
|
Chris@16
|
82 // "offset" field.
|
Chris@16
|
83 struct message_header {
|
Chris@16
|
84 /// The process that sent the message
|
Chris@16
|
85 process_id_type source;
|
Chris@16
|
86
|
Chris@16
|
87 /// The message tag
|
Chris@16
|
88 int tag;
|
Chris@16
|
89
|
Chris@16
|
90 /// The offset of the message into the buffer
|
Chris@16
|
91 std::size_t offset;
|
Chris@16
|
92
|
Chris@16
|
93 /// The length of the message in the buffer, in bytes
|
Chris@16
|
94 std::size_t bytes;
|
Chris@16
|
95
|
Chris@16
|
96 template <class Archive>
|
Chris@16
|
97 void serialize(Archive& ar, int)
|
Chris@16
|
98 {
|
Chris@16
|
99 ar & source & tag & offset & bytes;
|
Chris@16
|
100 }
|
Chris@16
|
101 };
|
Chris@16
|
102
|
Chris@16
|
103 /**
|
Chris@16
|
104 * Stores the outgoing messages for a particular processor.
|
Chris@16
|
105 *
|
Chris@16
|
106 * @todo Evaluate whether we should use a deque instance, which
|
Chris@16
|
107 * would reduce could reduce the cost of "sending" messages but
|
Chris@16
|
108 * increases the time spent in the synchronization step.
|
Chris@16
|
109 */
|
Chris@16
|
110 struct outgoing_messages {
|
Chris@16
|
111 outgoing_messages() {}
|
Chris@16
|
112 ~outgoing_messages() {}
|
Chris@16
|
113
|
Chris@16
|
114 std::vector<message_header> headers;
|
Chris@16
|
115 buffer_type buffer;
|
Chris@16
|
116
|
Chris@16
|
117 template <class Archive>
|
Chris@16
|
118 void serialize(Archive& ar, int)
|
Chris@16
|
119 {
|
Chris@16
|
120 ar & headers & buffer;
|
Chris@16
|
121 }
|
Chris@16
|
122
|
Chris@16
|
123 void swap(outgoing_messages& x)
|
Chris@16
|
124 {
|
Chris@16
|
125 headers.swap(x.headers);
|
Chris@16
|
126 buffer.swap(x.buffer);
|
Chris@16
|
127 }
|
Chris@16
|
128 };
|
Chris@16
|
129
|
Chris@16
|
130 private:
|
Chris@16
|
131 /**
|
Chris@16
|
132 * Virtual base from which every trigger will be launched. See @c
|
Chris@16
|
133 * trigger_launcher for more information.
|
Chris@16
|
134 */
|
Chris@16
|
135 class trigger_base : boost::noncopyable
|
Chris@16
|
136 {
|
Chris@16
|
137 public:
|
Chris@16
|
138 explicit trigger_base(int tag) : tag_(tag) { }
|
Chris@16
|
139
|
Chris@16
|
140 /// Retrieve the tag associated with this trigger
|
Chris@16
|
141 int tag() const { return tag_; }
|
Chris@16
|
142
|
Chris@16
|
143 virtual ~trigger_base() { }
|
Chris@16
|
144
|
Chris@16
|
145 /**
|
Chris@16
|
146 * Invoked to receive a message that matches a particular trigger.
|
Chris@16
|
147 *
|
Chris@16
|
148 * @param source the source of the message
|
Chris@16
|
149 * @param tag the (local) tag of the message
|
Chris@16
|
150 * @param context the context under which the trigger is being
|
Chris@16
|
151 * invoked
|
Chris@16
|
152 */
|
Chris@16
|
153 virtual void
|
Chris@16
|
154 receive(mpi_process_group const& pg, int source, int tag,
|
Chris@16
|
155 trigger_receive_context context, int block=-1) const = 0;
|
Chris@16
|
156
|
Chris@16
|
157 protected:
|
Chris@16
|
158 // The message tag associated with this trigger
|
Chris@16
|
159 int tag_;
|
Chris@16
|
160 };
|
Chris@16
|
161
|
Chris@16
|
162 /**
|
Chris@16
|
163 * Launches a specific handler in response to a trigger. This
|
Chris@16
|
164 * function object wraps up the handler function object and a buffer
|
Chris@16
|
165 * for incoming data.
|
Chris@16
|
166 */
|
Chris@16
|
167 template<typename Type, typename Handler>
|
Chris@16
|
168 class trigger_launcher : public trigger_base
|
Chris@16
|
169 {
|
Chris@16
|
170 public:
|
Chris@16
|
171 explicit trigger_launcher(mpi_process_group& self, int tag,
|
Chris@16
|
172 const Handler& handler)
|
Chris@16
|
173 : trigger_base(tag), self(self), handler(handler)
|
Chris@16
|
174 {}
|
Chris@16
|
175
|
Chris@16
|
176 void
|
Chris@16
|
177 receive(mpi_process_group const& pg, int source, int tag,
|
Chris@16
|
178 trigger_receive_context context, int block=-1) const;
|
Chris@16
|
179
|
Chris@16
|
180 private:
|
Chris@16
|
181 mpi_process_group& self;
|
Chris@16
|
182 mutable Handler handler;
|
Chris@16
|
183 };
|
Chris@16
|
184
|
Chris@16
|
185 /**
|
Chris@16
|
186 * Launches a specific handler with a message reply in response to a
|
Chris@16
|
187 * trigger. This function object wraps up the handler function
|
Chris@16
|
188 * object and a buffer for incoming data.
|
Chris@16
|
189 */
|
Chris@16
|
190 template<typename Type, typename Handler>
|
Chris@16
|
191 class reply_trigger_launcher : public trigger_base
|
Chris@16
|
192 {
|
Chris@16
|
193 public:
|
Chris@16
|
194 explicit reply_trigger_launcher(mpi_process_group& self, int tag,
|
Chris@16
|
195 const Handler& handler)
|
Chris@16
|
196 : trigger_base(tag), self(self), handler(handler)
|
Chris@16
|
197 {}
|
Chris@16
|
198
|
Chris@16
|
199 void
|
Chris@16
|
200 receive(mpi_process_group const& pg, int source, int tag,
|
Chris@16
|
201 trigger_receive_context context, int block=-1) const;
|
Chris@16
|
202
|
Chris@16
|
203 private:
|
Chris@16
|
204 mpi_process_group& self;
|
Chris@16
|
205 mutable Handler handler;
|
Chris@16
|
206 };
|
Chris@16
|
207
|
Chris@16
|
208 template<typename Type, typename Handler>
|
Chris@16
|
209 class global_trigger_launcher : public trigger_base
|
Chris@16
|
210 {
|
Chris@16
|
211 public:
|
Chris@16
|
212 explicit global_trigger_launcher(mpi_process_group& self, int tag,
|
Chris@16
|
213 const Handler& handler)
|
Chris@16
|
214 : trigger_base(tag), handler(handler)
|
Chris@16
|
215 {
|
Chris@16
|
216 }
|
Chris@16
|
217
|
Chris@16
|
218 void
|
Chris@16
|
219 receive(mpi_process_group const& pg, int source, int tag,
|
Chris@16
|
220 trigger_receive_context context, int block=-1) const;
|
Chris@16
|
221
|
Chris@16
|
222 private:
|
Chris@16
|
223 mutable Handler handler;
|
Chris@16
|
224 // TBD: do not forget to cancel any outstanding Irecv when deleted,
|
Chris@16
|
225 // if we decide to use Irecv
|
Chris@16
|
226 };
|
Chris@16
|
227
|
Chris@16
|
228 template<typename Type, typename Handler>
|
Chris@16
|
229 class global_irecv_trigger_launcher : public trigger_base
|
Chris@16
|
230 {
|
Chris@16
|
231 public:
|
Chris@16
|
232 explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
|
Chris@16
|
233 const Handler& handler, int sz)
|
Chris@16
|
234 : trigger_base(tag), handler(handler), buffer_size(sz)
|
Chris@16
|
235 {
|
Chris@16
|
236 prepare_receive(self,tag);
|
Chris@16
|
237 }
|
Chris@16
|
238
|
Chris@16
|
239 void
|
Chris@16
|
240 receive(mpi_process_group const& pg, int source, int tag,
|
Chris@16
|
241 trigger_receive_context context, int block=-1) const;
|
Chris@16
|
242
|
Chris@16
|
243 private:
|
Chris@16
|
244 void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
|
Chris@16
|
245 Handler handler;
|
Chris@16
|
246 int buffer_size;
|
Chris@16
|
247 // TBD: do not forget to cancel any outstanding Irecv when deleted,
|
Chris@16
|
248 // if we decide to use Irecv
|
Chris@16
|
249 };
|
Chris@16
|
250
|
Chris@16
|
251 public:
|
Chris@16
|
252 /**
|
Chris@16
|
253 * Construct a new BSP process group from an MPI communicator. The
|
Chris@16
|
254 * MPI communicator will be duplicated to create a new communicator
|
Chris@16
|
255 * for this process group to use.
|
Chris@16
|
256 */
|
Chris@16
|
257 mpi_process_group(communicator_type parent_comm = communicator_type());
|
Chris@16
|
258
|
Chris@16
|
259 /**
|
Chris@16
|
260 * Construct a new BSP process group from an MPI communicator. The
|
Chris@16
|
261 * MPI communicator will be duplicated to create a new communicator
|
Chris@16
|
262 * for this process group to use. This constructor allows to tune the
|
Chris@16
|
263 * size of message batches.
|
Chris@16
|
264 *
|
Chris@16
|
265 * @param num_headers The maximum number of headers in a message batch
|
Chris@16
|
266 *
|
Chris@16
|
267 * @param buffer_size The maximum size of the message buffer in a batch.
|
Chris@16
|
268 *
|
Chris@16
|
269 */
|
Chris@16
|
270 mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
|
Chris@16
|
271 communicator_type parent_comm = communicator_type());
|
Chris@16
|
272
|
Chris@16
|
273 /**
|
Chris@16
|
274 * Construct a copy of the BSP process group for a new distributed
|
Chris@16
|
275 * data structure. This data structure will synchronize with all
|
Chris@16
|
276 * other members of the process group's equivalence class (including
|
Chris@16
|
277 * @p other), but will have its own set of tags.
|
Chris@16
|
278 *
|
Chris@16
|
279 * @param other The process group that this new process group will
|
Chris@16
|
280 * be based on, using a different set of tags within the same
|
Chris@16
|
281 * communication and synchronization space.
|
Chris@16
|
282 *
|
Chris@16
|
283 * @param handler A message handler that will be passed (source,
|
Chris@16
|
284 * tag) pairs for each message received by this data
|
Chris@16
|
285 * structure. The handler is expected to receive the messages
|
Chris@16
|
286 * immediately. The handler can be changed after-the-fact by
|
Chris@16
|
287 * calling @c replace_handler.
|
Chris@16
|
288 *
|
Chris@16
|
289 * @param out_of_band_receive An anachronism. TODO: remove this.
|
Chris@16
|
290 */
|
Chris@16
|
291 mpi_process_group(const mpi_process_group& other,
|
Chris@16
|
292 const receiver_type& handler,
|
Chris@16
|
293 bool out_of_band_receive = false);
|
Chris@16
|
294
|
Chris@16
|
295 /**
|
Chris@16
|
296 * Construct a copy of the BSP process group for a new distributed
|
Chris@16
|
297 * data structure. This data structure will synchronize with all
|
Chris@16
|
298 * other members of the process group's equivalence class (including
|
Chris@16
|
299 * @p other), but will have its own set of tags.
|
Chris@16
|
300 */
|
Chris@16
|
301 mpi_process_group(const mpi_process_group& other,
|
Chris@16
|
302 attach_distributed_object,
|
Chris@16
|
303 bool out_of_band_receive = false);
|
Chris@16
|
304
|
Chris@16
|
305 /**
|
Chris@16
|
306 * Create an "empty" process group, with no information. This is an
|
Chris@16
|
307 * internal routine that users should never need.
|
Chris@16
|
308 */
|
Chris@16
|
309 explicit mpi_process_group(create_empty) {}
|
Chris@16
|
310
|
Chris@16
|
311 /**
|
Chris@16
|
312 * Destroys this copy of the process group.
|
Chris@16
|
313 */
|
Chris@16
|
314 ~mpi_process_group();
|
Chris@16
|
315
|
Chris@16
|
316 /**
|
Chris@16
|
317 * Replace the current message handler with a new message handler.
|
Chris@16
|
318 *
|
Chris@16
|
319 * @param handle The new message handler.
|
Chris@16
|
320 * @param out_of_band_receive An anachronism: remove this
|
Chris@16
|
321 */
|
Chris@16
|
322 void replace_handler(const receiver_type& handler,
|
Chris@16
|
323 bool out_of_band_receive = false);
|
Chris@16
|
324
|
Chris@16
|
325 /**
|
Chris@16
|
326 * Turns this process group into the process group for a new
|
Chris@16
|
327 * distributed data structure or object, allocating its own tag
|
Chris@16
|
328 * block.
|
Chris@16
|
329 */
|
Chris@16
|
330 void make_distributed_object();
|
Chris@16
|
331
|
Chris@16
|
332 /**
|
Chris@16
|
333 * Replace the handler to be invoked at the beginning of synchronize.
|
Chris@16
|
334 */
|
Chris@16
|
335 void
|
Chris@16
|
336 replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
|
Chris@16
|
337
|
Chris@16
|
338 /**
|
Chris@16
|
339 * Return the block number of the current data structure. A value of
|
Chris@16
|
340 * 0 indicates that this particular instance of the process group is
|
Chris@16
|
341 * not associated with any distributed data structure.
|
Chris@16
|
342 */
|
Chris@16
|
343 int my_block_number() const { return block_num? *block_num : 0; }
|
Chris@16
|
344
|
Chris@16
|
345 /**
|
Chris@16
|
346 * Encode a block number/tag pair into a single encoded tag for
|
Chris@16
|
347 * transmission.
|
Chris@16
|
348 */
|
Chris@16
|
349 int encode_tag(int block_num, int tag) const
|
Chris@16
|
350 { return block_num * max_tags + tag; }
|
Chris@16
|
351
|
Chris@16
|
352 /**
|
Chris@16
|
353 * Decode an encoded tag into a block number/tag pair.
|
Chris@16
|
354 */
|
Chris@16
|
355 std::pair<int, int> decode_tag(int encoded_tag) const
|
Chris@16
|
356 { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
|
Chris@16
|
357
|
Chris@16
|
358 // @todo Actually write up the friend declarations so these could be
|
Chris@16
|
359 // private.
|
Chris@16
|
360
|
Chris@16
|
361 // private:
|
Chris@16
|
362
|
Chris@16
|
363 /** Allocate a block of tags for this instance. The block should not
|
Chris@16
|
364 * have been allocated already, e.g., my_block_number() ==
|
Chris@16
|
365 * 0. Returns the newly-allocated block number.
|
Chris@16
|
366 */
|
Chris@16
|
367 int allocate_block(bool out_of_band_receive = false);
|
Chris@16
|
368
|
Chris@16
|
369 /** Potentially emit a receive event out of band. Returns true if an event
|
Chris@16
|
370 * was actually sent, false otherwise.
|
Chris@16
|
371 */
|
Chris@16
|
372 bool maybe_emit_receive(int process, int encoded_tag) const;
|
Chris@16
|
373
|
Chris@16
|
374 /** Emit a receive event. Returns true if an event was actually
|
Chris@16
|
375 * sent, false otherwise.
|
Chris@16
|
376 */
|
Chris@16
|
377 bool emit_receive(int process, int encoded_tag) const;
|
Chris@16
|
378
|
Chris@16
|
379 /** Emit an on-synchronize event to all block handlers. */
|
Chris@16
|
380 void emit_on_synchronize() const;
|
Chris@16
|
381
|
Chris@16
|
382 /** Retrieve a reference to the stored receiver in this block. */
|
Chris@16
|
383 template<typename Receiver>
|
Chris@16
|
384 Receiver* get_receiver();
|
Chris@16
|
385
|
Chris@16
|
386 template<typename T>
|
Chris@16
|
387 void
|
Chris@16
|
388 send_impl(int dest, int tag, const T& value,
|
Chris@16
|
389 mpl::true_ /*is_mpi_datatype*/) const;
|
Chris@16
|
390
|
Chris@16
|
391 template<typename T>
|
Chris@16
|
392 void
|
Chris@16
|
393 send_impl(int dest, int tag, const T& value,
|
Chris@16
|
394 mpl::false_ /*is_mpi_datatype*/) const;
|
Chris@16
|
395
|
Chris@16
|
396 template<typename T>
|
Chris@16
|
397 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
|
Chris@16
|
398 array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
|
Chris@16
|
399
|
Chris@16
|
400 template<typename T>
|
Chris@16
|
401 bool
|
Chris@16
|
402 receive_impl(int source, int tag, T& value,
|
Chris@16
|
403 mpl::true_ /*is_mpi_datatype*/) const;
|
Chris@16
|
404
|
Chris@16
|
405 template<typename T>
|
Chris@16
|
406 bool
|
Chris@16
|
407 receive_impl(int source, int tag, T& value,
|
Chris@16
|
408 mpl::false_ /*is_mpi_datatype*/) const;
|
Chris@16
|
409
|
Chris@16
|
410 // Receive an array of values
|
Chris@16
|
411 template<typename T>
|
Chris@16
|
412 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
|
Chris@16
|
413 array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
|
Chris@16
|
414
|
Chris@16
|
415 optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
|
Chris@16
|
416
|
Chris@16
|
417 void synchronize() const;
|
Chris@16
|
418
|
Chris@16
|
419 operator bool() { return bool(impl_); }
|
Chris@16
|
420
|
Chris@16
|
421 mpi_process_group base() const;
|
Chris@16
|
422
|
Chris@16
|
423 /**
|
Chris@16
|
424 * Create a new trigger for a specific message tag. Triggers handle
|
Chris@16
|
425 * out-of-band messaging, and the handler itself will be called
|
Chris@16
|
426 * whenever a message is available. The handler itself accepts four
|
Chris@16
|
427 * arguments: the source of the message, the message tag (which will
|
Chris@16
|
428 * be the same as @p tag), the message data (of type @c Type), and a
|
Chris@16
|
429 * boolean flag that states whether the message was received
|
Chris@16
|
430 * out-of-band. The last will be @c true for out-of-band receives,
|
Chris@16
|
431 * or @c false for receives at the end of a synchronization step.
|
Chris@16
|
432 */
|
Chris@16
|
433 template<typename Type, typename Handler>
|
Chris@16
|
434 void trigger(int tag, const Handler& handler);
|
Chris@16
|
435
|
Chris@16
|
436 /**
|
Chris@16
|
437 * Create a new trigger for a specific message tag, along with a way
|
Chris@16
|
438 * to send a reply with data back to the sender. Triggers handle
|
Chris@16
|
439 * out-of-band messaging, and the handler itself will be called
|
Chris@16
|
440 * whenever a message is available. The handler itself accepts four
|
Chris@16
|
441 * arguments: the source of the message, the message tag (which will
|
Chris@16
|
442 * be the same as @p tag), the message data (of type @c Type), and a
|
Chris@16
|
443 * boolean flag that states whether the message was received
|
Chris@16
|
444 * out-of-band. The last will be @c true for out-of-band receives,
|
Chris@16
|
445 * or @c false for receives at the end of a synchronization
|
Chris@16
|
446 * step. The handler also returns a value, which will be routed back
|
Chris@16
|
447 * to the sender.
|
Chris@16
|
448 */
|
Chris@16
|
449 template<typename Type, typename Handler>
|
Chris@16
|
450 void trigger_with_reply(int tag, const Handler& handler);
|
Chris@16
|
451
|
Chris@16
|
452 template<typename Type, typename Handler>
|
Chris@16
|
453 void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
|
Chris@16
|
454
|
Chris@16
|
455
|
Chris@16
|
456
|
Chris@16
|
457 /**
|
Chris@16
|
458 * Poll for any out-of-band messages. This routine will check if any
|
Chris@16
|
459 * out-of-band messages are available. Those that are available will
|
Chris@16
|
460 * be handled immediately, if possible.
|
Chris@16
|
461 *
|
Chris@16
|
462 * @returns if an out-of-band message has been received, but we are
|
Chris@16
|
463 * unable to actually receive the message, a (source, tag) pair will
|
Chris@16
|
464 * be returned. Otherwise, returns an empty optional.
|
Chris@16
|
465 *
|
Chris@16
|
466 * @param wait When true, we should block until a message comes in.
|
Chris@16
|
467 *
|
Chris@16
|
468 * @param synchronizing whether we are currently synchronizing the
|
Chris@16
|
469 * process group
|
Chris@16
|
470 */
|
Chris@16
|
471 optional<std::pair<int, int> >
|
Chris@16
|
472 poll(bool wait = false, int block = -1, bool synchronizing = false) const;
|
Chris@16
|
473
|
Chris@16
|
474 /**
|
Chris@16
|
475 * Determines the context of the trigger currently executing. If
|
Chris@16
|
476 * multiple triggers are executing (recursively), then the context
|
Chris@16
|
477 * for the most deeply nested trigger will be returned. If no
|
Chris@16
|
478 * triggers are executing, returns @c trc_none. This might be used,
|
Chris@16
|
479 * for example, to determine whether a reply to a message should
|
Chris@16
|
480 * itself be sent out-of-band or whether it can go via the normal,
|
Chris@16
|
481 * slower communication route.
|
Chris@16
|
482 */
|
Chris@16
|
483 trigger_receive_context trigger_context() const;
|
Chris@16
|
484
|
Chris@16
|
485 /// INTERNAL ONLY
|
Chris@16
|
486 void receive_batch(process_id_type source, outgoing_messages& batch) const;
|
Chris@16
|
487
|
Chris@16
|
488 /// INTERNAL ONLY
|
Chris@16
|
489 ///
|
Chris@16
|
490 /// Determine the actual communicator and tag will be used for a
|
Chris@16
|
491 /// transmission with the given tag.
|
Chris@16
|
492 std::pair<boost::mpi::communicator, int>
|
Chris@16
|
493 actual_communicator_and_tag(int tag, int block) const;
|
Chris@16
|
494
|
Chris@16
|
495 /// set the size of the message buffer used for buffered oob sends
|
Chris@16
|
496
|
Chris@16
|
497 static void set_message_buffer_size(std::size_t s);
|
Chris@16
|
498
|
Chris@16
|
499 /// get the size of the message buffer used for buffered oob sends
|
Chris@16
|
500
|
Chris@16
|
501 static std::size_t message_buffer_size();
|
Chris@16
|
502 static int old_buffer_size;
|
Chris@16
|
503 static void* old_buffer;
|
Chris@16
|
504 private:
|
Chris@16
|
505
|
Chris@16
|
506 void install_trigger(int tag, int block,
|
Chris@16
|
507 shared_ptr<trigger_base> const& launcher);
|
Chris@16
|
508
|
Chris@16
|
509 void poll_requests(int block=-1) const;
|
Chris@16
|
510
|
Chris@16
|
511
|
Chris@16
|
512 // send a batch if the buffer is full now or would get full
|
Chris@16
|
513 void maybe_send_batch(process_id_type dest) const;
|
Chris@16
|
514
|
Chris@16
|
515 // actually send a batch
|
Chris@16
|
516 void send_batch(process_id_type dest, outgoing_messages& batch) const;
|
Chris@16
|
517 void send_batch(process_id_type dest) const;
|
Chris@16
|
518
|
Chris@16
|
519 void pack_headers() const;
|
Chris@16
|
520
|
Chris@16
|
521 /**
|
Chris@16
|
522 * Process a batch of incoming messages immediately.
|
Chris@16
|
523 *
|
Chris@16
|
524 * @param source the source of these messages
|
Chris@16
|
525 */
|
Chris@16
|
526 void process_batch(process_id_type source) const;
|
Chris@16
|
527 void receive_batch(boost::mpi::status& status) const;
|
Chris@16
|
528
|
Chris@16
|
529 //void free_finished_sends() const;
|
Chris@16
|
530
|
Chris@16
|
531 /// Status messages used internally by the process group
|
Chris@16
|
532 enum status_messages {
|
Chris@16
|
533 /// the first of the reserved message tags
|
Chris@16
|
534 msg_reserved_first = 126,
|
Chris@16
|
535 /// Sent from a processor when sending batched messages
|
Chris@16
|
536 msg_batch = 126,
|
Chris@16
|
537 /// Sent from a processor when sending large batched messages, larger than
|
Chris@16
|
538 /// the maximum buffer size for messages to be received by MPI_Irecv
|
Chris@16
|
539 msg_large_batch = 127,
|
Chris@16
|
540 /// Sent from a source processor to everyone else when that
|
Chris@16
|
541 /// processor has entered the synchronize() function.
|
Chris@16
|
542 msg_synchronizing = 128,
|
Chris@16
|
543 /// the last of the reserved message tags
|
Chris@16
|
544 msg_reserved_last = 128
|
Chris@16
|
545 };
|
Chris@16
|
546
|
Chris@16
|
547 /**
|
Chris@16
|
548 * Description of a block of tags associated to a particular
|
Chris@16
|
549 * distributed data structure. This structure will live as long as
|
Chris@16
|
550 * the distributed data structure is around, and will be used to
|
Chris@16
|
551 * help send messages to the data structure.
|
Chris@16
|
552 */
|
Chris@16
|
553 struct block_type
|
Chris@16
|
554 {
|
Chris@16
|
555 block_type() { }
|
Chris@16
|
556
|
Chris@16
|
557 /// Handler for receive events
|
Chris@16
|
558 receiver_type on_receive;
|
Chris@16
|
559
|
Chris@16
|
560 /// Handler executed at the start of synchronization
|
Chris@16
|
561 on_synchronize_event_type on_synchronize;
|
Chris@16
|
562
|
Chris@16
|
563 /// Individual message triggers. Note: at present, this vector is
|
Chris@16
|
564 /// indexed by the (local) tag of the trigger. Any tags that
|
Chris@16
|
565 /// don't have triggers will have NULL pointers in that spot.
|
Chris@16
|
566 std::vector<shared_ptr<trigger_base> > triggers;
|
Chris@16
|
567 };
|
Chris@16
|
568
|
Chris@16
|
569 /**
|
Chris@16
|
570 * Data structure containing all of the blocks for the distributed
|
Chris@16
|
571 * data structures attached to a process group.
|
Chris@16
|
572 */
|
Chris@16
|
573 typedef std::vector<block_type*> blocks_type;
|
Chris@16
|
574
|
Chris@16
|
575 /// Iterator into @c blocks_type.
|
Chris@16
|
576 typedef blocks_type::iterator block_iterator;
|
Chris@16
|
577
|
Chris@16
|
578 /**
|
Chris@16
|
579 * Deleter used to deallocate a block when its distributed data
|
Chris@16
|
580 * structure is destroyed. This type will be used as the deleter for
|
Chris@16
|
581 * @c block_num.
|
Chris@16
|
582 */
|
Chris@16
|
583 struct deallocate_block;
|
Chris@16
|
584
|
Chris@16
|
585 static std::vector<char> message_buffer;
|
Chris@16
|
586
|
Chris@16
|
587 public:
|
Chris@16
|
588 /**
|
Chris@16
|
589 * Data associated with the process group and all of its attached
|
Chris@16
|
590 * distributed data structures.
|
Chris@16
|
591 */
|
Chris@16
|
592 shared_ptr<impl> impl_;
|
Chris@16
|
593
|
Chris@16
|
594 /**
|
Chris@16
|
595 * When non-null, indicates that this copy of the process group is
|
Chris@16
|
596 * associated with a particular distributed data structure. The
|
Chris@16
|
597 * integer value contains the block number (a value > 0) associated
|
Chris@16
|
598 * with that data structure. The deleter for this @c shared_ptr is a
|
Chris@16
|
599 * @c deallocate_block object that will deallocate the associated
|
Chris@16
|
600 * block in @c impl_->blocks.
|
Chris@16
|
601 */
|
Chris@16
|
602 shared_ptr<int> block_num;
|
Chris@16
|
603
|
Chris@16
|
604 /**
|
Chris@16
|
605 * Rank of this process, to avoid having to call rank() repeatedly.
|
Chris@16
|
606 */
|
Chris@16
|
607 int rank;
|
Chris@16
|
608
|
Chris@16
|
609 /**
|
Chris@16
|
610 * Number of processes in this process group, to avoid having to
|
Chris@16
|
611 * call communicator::size() repeatedly.
|
Chris@16
|
612 */
|
Chris@16
|
613 int size;
|
Chris@16
|
614 };
|
Chris@16
|
615
|
Chris@16
|
616
|
Chris@16
|
617
|
Chris@16
|
618 inline mpi_process_group::process_id_type
|
Chris@16
|
619 process_id(const mpi_process_group& pg)
|
Chris@16
|
620 { return pg.rank; }
|
Chris@16
|
621
|
Chris@16
|
622 inline mpi_process_group::process_size_type
|
Chris@16
|
623 num_processes(const mpi_process_group& pg)
|
Chris@16
|
624 { return pg.size; }
|
Chris@16
|
625
|
Chris@16
|
626 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
|
Chris@16
|
627
|
Chris@16
|
628 template<typename T>
|
Chris@16
|
629 void
|
Chris@16
|
630 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
631 int tag, const T& value);
|
Chris@16
|
632
|
Chris@16
|
633 template<typename InputIterator>
|
Chris@16
|
634 void
|
Chris@16
|
635 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
636 int tag, InputIterator first, InputIterator last);
|
Chris@16
|
637
|
Chris@16
|
638 template<typename T>
|
Chris@16
|
639 inline void
|
Chris@16
|
640 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
641 int tag, T* first, T* last)
|
Chris@16
|
642 { send(pg, dest, tag, first, last - first); }
|
Chris@16
|
643
|
Chris@16
|
644 template<typename T>
|
Chris@16
|
645 inline void
|
Chris@16
|
646 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
647 int tag, const T* first, const T* last)
|
Chris@16
|
648 { send(pg, dest, tag, first, last - first); }
|
Chris@16
|
649
|
Chris@16
|
650 template<typename T>
|
Chris@16
|
651 mpi_process_group::process_id_type
|
Chris@16
|
652 receive(const mpi_process_group& pg, int tag, T& value);
|
Chris@16
|
653
|
Chris@16
|
654 template<typename T>
|
Chris@16
|
655 mpi_process_group::process_id_type
|
Chris@16
|
656 receive(const mpi_process_group& pg,
|
Chris@16
|
657 mpi_process_group::process_id_type source, int tag, T& value);
|
Chris@16
|
658
|
Chris@16
|
659 optional<std::pair<mpi_process_group::process_id_type, int> >
|
Chris@16
|
660 probe(const mpi_process_group& pg);
|
Chris@16
|
661
|
Chris@16
|
662 void synchronize(const mpi_process_group& pg);
|
Chris@16
|
663
|
Chris@16
|
664 template<typename T, typename BinaryOperation>
|
Chris@16
|
665 T*
|
Chris@16
|
666 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
|
Chris@16
|
667 BinaryOperation bin_op);
|
Chris@16
|
668
|
Chris@16
|
669 template<typename T, typename BinaryOperation>
|
Chris@16
|
670 T*
|
Chris@16
|
671 scan(const mpi_process_group& pg, T* first, T* last, T* out,
|
Chris@16
|
672 BinaryOperation bin_op);
|
Chris@16
|
673
|
Chris@16
|
674 template<typename InputIterator, typename T>
|
Chris@16
|
675 void
|
Chris@16
|
676 all_gather(const mpi_process_group& pg,
|
Chris@16
|
677 InputIterator first, InputIterator last, std::vector<T>& out);
|
Chris@16
|
678
|
Chris@16
|
679 template<typename InputIterator>
|
Chris@16
|
680 mpi_process_group
|
Chris@16
|
681 process_subgroup(const mpi_process_group& pg,
|
Chris@16
|
682 InputIterator first, InputIterator last);
|
Chris@16
|
683
|
Chris@16
|
684 template<typename T>
|
Chris@16
|
685 void
|
Chris@16
|
686 broadcast(const mpi_process_group& pg, T& val,
|
Chris@16
|
687 mpi_process_group::process_id_type root);
|
Chris@16
|
688
|
Chris@16
|
689
|
Chris@16
|
690 /*******************************************************************
|
Chris@16
|
691 * Out-of-band communication *
|
Chris@16
|
692 *******************************************************************/
|
Chris@16
|
693
|
Chris@16
|
694 template<typename T>
|
Chris@16
|
695 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
696 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
697 int tag, const T& value, int block=-1)
|
Chris@16
|
698 {
|
Chris@16
|
699 using boost::mpi::get_mpi_datatype;
|
Chris@16
|
700
|
Chris@16
|
701 // Determine the actual message tag we will use for the send, and which
|
Chris@16
|
702 // communicator we will use.
|
Chris@16
|
703 std::pair<boost::mpi::communicator, int> actual
|
Chris@16
|
704 = pg.actual_communicator_and_tag(tag, block);
|
Chris@16
|
705
|
Chris@16
|
706 #ifdef SEND_OOB_BSEND
|
Chris@16
|
707 if (mpi_process_group::message_buffer_size()) {
|
Chris@16
|
708 MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
|
Chris@16
|
709 actual.second, actual.first);
|
Chris@16
|
710 return;
|
Chris@16
|
711 }
|
Chris@16
|
712 #endif
|
Chris@16
|
713 MPI_Request request;
|
Chris@16
|
714 MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
|
Chris@16
|
715 actual.second, actual.first, &request);
|
Chris@16
|
716
|
Chris@16
|
717 int done=0;
|
Chris@16
|
718 do {
|
Chris@16
|
719 pg.poll();
|
Chris@16
|
720 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
|
Chris@16
|
721 } while (!done);
|
Chris@16
|
722 }
|
Chris@16
|
723
|
Chris@16
|
724 template<typename T>
|
Chris@16
|
725 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
726 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
727 int tag, const T& value, int block=-1)
|
Chris@16
|
728 {
|
Chris@16
|
729 using boost::mpi::packed_oarchive;
|
Chris@16
|
730
|
Chris@16
|
731 // Determine the actual message tag we will use for the send, and which
|
Chris@16
|
732 // communicator we will use.
|
Chris@16
|
733 std::pair<boost::mpi::communicator, int> actual
|
Chris@16
|
734 = pg.actual_communicator_and_tag(tag, block);
|
Chris@16
|
735
|
Chris@16
|
736 // Serialize the data into a buffer
|
Chris@16
|
737 packed_oarchive out(actual.first);
|
Chris@16
|
738 out << value;
|
Chris@16
|
739 std::size_t size = out.size();
|
Chris@16
|
740
|
Chris@16
|
741 // Send the actual message data
|
Chris@16
|
742 #ifdef SEND_OOB_BSEND
|
Chris@16
|
743 if (mpi_process_group::message_buffer_size()) {
|
Chris@16
|
744 MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
|
Chris@16
|
745 dest, actual.second, actual.first);
|
Chris@16
|
746 return;
|
Chris@16
|
747 }
|
Chris@16
|
748 #endif
|
Chris@16
|
749 MPI_Request request;
|
Chris@16
|
750 MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
|
Chris@16
|
751 dest, actual.second, actual.first, &request);
|
Chris@16
|
752
|
Chris@16
|
753 int done=0;
|
Chris@16
|
754 do {
|
Chris@16
|
755 pg.poll();
|
Chris@16
|
756 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
|
Chris@16
|
757 } while (!done);
|
Chris@16
|
758 }
|
Chris@16
|
759
|
Chris@16
|
760 template<typename T>
|
Chris@16
|
761 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
762 receive_oob(const mpi_process_group& pg,
|
Chris@16
|
763 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
|
Chris@16
|
764
|
Chris@16
|
765 template<typename T>
|
Chris@16
|
766 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
767 receive_oob(const mpi_process_group& pg,
|
Chris@16
|
768 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
|
Chris@16
|
769
|
Chris@16
|
770 template<typename SendT, typename ReplyT>
|
Chris@16
|
771 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
|
Chris@16
|
772 send_oob_with_reply(const mpi_process_group& pg,
|
Chris@16
|
773 mpi_process_group::process_id_type dest,
|
Chris@16
|
774 int tag, const SendT& send_value, ReplyT& reply_value,
|
Chris@16
|
775 int block = -1);
|
Chris@16
|
776
|
Chris@16
|
777 template<typename SendT, typename ReplyT>
|
Chris@16
|
778 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
|
Chris@16
|
779 send_oob_with_reply(const mpi_process_group& pg,
|
Chris@16
|
780 mpi_process_group::process_id_type dest,
|
Chris@16
|
781 int tag, const SendT& send_value, ReplyT& reply_value,
|
Chris@16
|
782 int block = -1);
|
Chris@16
|
783
|
Chris@16
|
784 } } } // end namespace boost::graph::distributed
|
Chris@16
|
785
|
Chris@16
|
786 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
|
Chris@16
|
787 namespace boost { namespace mpi {
|
Chris@16
|
788 template<>
|
Chris@16
|
789 struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
|
Chris@16
|
790 } } // end namespace boost::mpi
|
Chris@16
|
791
|
Chris@16
|
792 namespace std {
|
Chris@16
|
793 /// optimized swap for outgoing messages
|
Chris@16
|
794 inline void
|
Chris@16
|
795 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
|
Chris@16
|
796 boost::graph::distributed::mpi_process_group::outgoing_messages& y)
|
Chris@16
|
797 {
|
Chris@16
|
798 x.swap(y);
|
Chris@16
|
799 }
|
Chris@16
|
800
|
Chris@16
|
801
|
Chris@16
|
802 }
|
Chris@16
|
803
|
Chris@16
|
804 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
|
Chris@16
|
805 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
|
Chris@16
|
806
|
Chris@16
|
807 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
|
Chris@16
|
808
|
Chris@16
|
809 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
|