Chris@16
|
1 // -*- C++ -*-
|
Chris@16
|
2
|
Chris@16
|
3 // Copyright (C) 2004-2008 The Trustees of Indiana University.
|
Chris@16
|
4 // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
|
Chris@16
|
5 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
|
Chris@16
|
6
|
Chris@16
|
7 // Use, modification and distribution is subject to the Boost Software
|
Chris@16
|
8 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
9 // http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
10
|
Chris@16
|
11 // Authors: Douglas Gregor
|
Chris@16
|
12 // Andrew Lumsdaine
|
Chris@16
|
13 // Matthias Troyer
|
Chris@16
|
14
|
Chris@16
|
15 //#define PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
16
|
Chris@16
|
17 #ifndef BOOST_GRAPH_USE_MPI
|
Chris@16
|
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
Chris@16
|
19 #endif
|
Chris@16
|
20
|
Chris@16
|
21 #include <boost/assert.hpp>
|
Chris@16
|
22 #include <algorithm>
|
Chris@16
|
23 #include <boost/graph/parallel/detail/untracked_pair.hpp>
|
Chris@16
|
24 #include <numeric>
|
Chris@16
|
25 #include <iterator>
|
Chris@16
|
26 #include <functional>
|
Chris@16
|
27 #include <vector>
|
Chris@16
|
28 #include <queue>
|
Chris@16
|
29 #include <stack>
|
Chris@16
|
30 #include <boost/graph/distributed/detail/tag_allocator.hpp>
|
Chris@16
|
31 #include <stdio.h>
|
Chris@16
|
32
|
Chris@16
|
33 // #define PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
34
|
Chris@16
|
35 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
36 # include <iostream>
|
Chris@16
|
37 #endif
|
Chris@16
|
38
|
Chris@16
|
39 namespace boost { namespace graph { namespace distributed {
|
Chris@16
|
40
|
Chris@16
|
41 struct mpi_process_group::impl
|
Chris@16
|
42 {
|
Chris@16
|
43
|
Chris@16
|
44 typedef mpi_process_group::message_header message_header;
|
Chris@16
|
45 typedef mpi_process_group::outgoing_messages outgoing_messages;
|
Chris@16
|
46
|
Chris@16
|
47 /**
|
Chris@16
|
48 * Stores the incoming messages from a particular processor.
|
Chris@16
|
49 *
|
Chris@16
|
50 * @todo Evaluate whether we should use a deque instance, which
|
Chris@16
|
51 * would reduce could reduce the cost of "receiving" messages and
|
Chris@16
|
52 allow us to deallocate memory earlier, but increases the time
|
Chris@16
|
53 spent in the synchronization step.
|
Chris@16
|
54 */
|
Chris@16
|
55 struct incoming_messages {
|
Chris@16
|
56 incoming_messages();
|
Chris@16
|
57 ~incoming_messages() {}
|
Chris@16
|
58
|
Chris@16
|
59 std::vector<message_header> headers;
|
Chris@16
|
60 buffer_type buffer;
|
Chris@16
|
61 std::vector<std::vector<message_header>::iterator> next_header;
|
Chris@16
|
62 };
|
Chris@16
|
63
|
Chris@16
|
64 struct batch_request {
|
Chris@16
|
65 MPI_Request request;
|
Chris@16
|
66 buffer_type buffer;
|
Chris@16
|
67 };
|
Chris@16
|
68
|
Chris@16
|
69 // send once we have a certain number of messages or bytes in the buffer
|
Chris@16
|
70 // these numbers need to be tuned, we keep them small at first for testing
|
Chris@16
|
71 std::size_t batch_header_number;
|
Chris@16
|
72 std::size_t batch_buffer_size;
|
Chris@16
|
73 std::size_t batch_message_size;
|
Chris@16
|
74
|
Chris@16
|
75 /**
|
Chris@16
|
76 * The actual MPI communicator used to transmit data.
|
Chris@16
|
77 */
|
Chris@16
|
78 boost::mpi::communicator comm;
|
Chris@16
|
79
|
Chris@16
|
80 /**
|
Chris@16
|
81 * The MPI communicator used to transmit out-of-band replies.
|
Chris@16
|
82 */
|
Chris@16
|
83 boost::mpi::communicator oob_reply_comm;
|
Chris@16
|
84
|
Chris@16
|
85 /// Outgoing message information, indexed by destination processor.
|
Chris@16
|
86 std::vector<outgoing_messages> outgoing;
|
Chris@16
|
87
|
Chris@16
|
88 /// Incoming message information, indexed by source processor.
|
Chris@16
|
89 std::vector<incoming_messages> incoming;
|
Chris@16
|
90
|
Chris@16
|
91 /// The numbers of processors that have entered a synchronization stage
|
Chris@16
|
92 std::vector<int> processors_synchronizing_stage;
|
Chris@16
|
93
|
Chris@16
|
94 /// The synchronization stage of a processor
|
Chris@16
|
95 std::vector<int> synchronizing_stage;
|
Chris@16
|
96
|
Chris@16
|
97 /// Number of processors still sending messages
|
Chris@16
|
98 std::vector<int> synchronizing_unfinished;
|
Chris@16
|
99
|
Chris@16
|
100 /// Number of batches sent since last synchronization stage
|
Chris@16
|
101 std::vector<int> number_sent_batches;
|
Chris@16
|
102
|
Chris@16
|
103 /// Number of batches received minus number of expected batches
|
Chris@16
|
104 std::vector<int> number_received_batches;
|
Chris@16
|
105
|
Chris@16
|
106
|
Chris@16
|
107 /// The context of the currently-executing trigger, or @c trc_none
|
Chris@16
|
108 /// if no trigger is executing.
|
Chris@16
|
109 trigger_receive_context trigger_context;
|
Chris@16
|
110
|
Chris@16
|
111 /// Non-zero indicates that we're processing batches
|
Chris@16
|
112 /// Increment this when processing patches,
|
Chris@16
|
113 /// decrement it when you're done.
|
Chris@16
|
114 int processing_batches;
|
Chris@16
|
115
|
Chris@16
|
116 /**
|
Chris@16
|
117 * Contains all of the active blocks corresponding to attached
|
Chris@16
|
118 * distributed data structures.
|
Chris@16
|
119 */
|
Chris@16
|
120 blocks_type blocks;
|
Chris@16
|
121
|
Chris@16
|
122 /// Whether we are currently synchronizing
|
Chris@16
|
123 bool synchronizing;
|
Chris@16
|
124
|
Chris@16
|
125 /// The MPI requests for posted sends of oob messages
|
Chris@16
|
126 std::vector<MPI_Request> requests;
|
Chris@16
|
127
|
Chris@16
|
128 /// The MPI buffers for posted irecvs of oob messages
|
Chris@16
|
129 std::map<int,buffer_type> buffers;
|
Chris@16
|
130
|
Chris@16
|
131 /// Queue for message batches received while already processing messages
|
Chris@16
|
132 std::queue<std::pair<int,outgoing_messages> > new_batches;
|
Chris@16
|
133 /// Maximum encountered size of the new_batches queue
|
Chris@16
|
134 std::size_t max_received;
|
Chris@16
|
135
|
Chris@16
|
136 /// The MPI requests and buffers for batchess being sent
|
Chris@16
|
137 std::list<batch_request> sent_batches;
|
Chris@16
|
138 /// Maximum encountered size of the sent_batches list
|
Chris@16
|
139 std::size_t max_sent;
|
Chris@16
|
140
|
Chris@16
|
141 /// Pre-allocated requests in a pool
|
Chris@16
|
142 std::vector<batch_request> batch_pool;
|
Chris@16
|
143 /// A stack controlling which batches are available
|
Chris@16
|
144 std::stack<std::size_t> free_batches;
|
Chris@16
|
145
|
Chris@16
|
146 void free_sent_batches();
|
Chris@16
|
147
|
Chris@16
|
148 // Tag allocator
|
Chris@16
|
149 detail::tag_allocator allocated_tags;
|
Chris@16
|
150
|
Chris@16
|
151 impl(std::size_t num_headers, std::size_t buffers_size,
|
Chris@16
|
152 communicator_type parent_comm);
|
Chris@16
|
153 ~impl();
|
Chris@16
|
154
|
Chris@16
|
155 private:
|
Chris@16
|
156 void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
|
Chris@16
|
157 };
|
Chris@16
|
158
|
Chris@16
|
159 inline trigger_receive_context mpi_process_group::trigger_context() const
|
Chris@16
|
160 {
|
Chris@16
|
161 return impl_->trigger_context;
|
Chris@16
|
162 }
|
Chris@16
|
163
|
Chris@16
|
164 template<typename T>
|
Chris@16
|
165 void
|
Chris@16
|
166 mpi_process_group::send_impl(int dest, int tag, const T& value,
|
Chris@16
|
167 mpl::true_ /*is_mpi_datatype*/) const
|
Chris@16
|
168 {
|
Chris@16
|
169 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
|
Chris@16
|
170
|
Chris@16
|
171 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
|
Chris@16
|
172
|
Chris@16
|
173 // Start constructing the message header
|
Chris@16
|
174 impl::message_header header;
|
Chris@16
|
175 header.source = process_id(*this);
|
Chris@16
|
176 header.tag = tag;
|
Chris@16
|
177 header.offset = outgoing.buffer.size();
|
Chris@16
|
178
|
Chris@16
|
179 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
|
Chris@16
|
180 oa << value;
|
Chris@16
|
181
|
Chris@16
|
182 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
183 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
|
Chris@16
|
184 << tag << ", bytes = " << packed_size << std::endl;
|
Chris@16
|
185 #endif
|
Chris@16
|
186
|
Chris@16
|
187 // Store the header
|
Chris@16
|
188 header.bytes = outgoing.buffer.size() - header.offset;
|
Chris@16
|
189 outgoing.headers.push_back(header);
|
Chris@16
|
190
|
Chris@16
|
191 maybe_send_batch(dest);
|
Chris@16
|
192 }
|
Chris@16
|
193
|
Chris@16
|
194
|
Chris@16
|
195 template<typename T>
|
Chris@16
|
196 void
|
Chris@16
|
197 mpi_process_group::send_impl(int dest, int tag, const T& value,
|
Chris@16
|
198 mpl::false_ /*is_mpi_datatype*/) const
|
Chris@16
|
199 {
|
Chris@16
|
200 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
|
Chris@16
|
201
|
Chris@16
|
202 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
|
Chris@16
|
203
|
Chris@16
|
204 // Start constructing the message header
|
Chris@16
|
205 impl::message_header header;
|
Chris@16
|
206 header.source = process_id(*this);
|
Chris@16
|
207 header.tag = tag;
|
Chris@16
|
208 header.offset = outgoing.buffer.size();
|
Chris@16
|
209
|
Chris@16
|
210 // Serialize into the buffer
|
Chris@16
|
211 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
|
Chris@16
|
212 out << value;
|
Chris@16
|
213
|
Chris@16
|
214 // Store the header
|
Chris@16
|
215 header.bytes = outgoing.buffer.size() - header.offset;
|
Chris@16
|
216 outgoing.headers.push_back(header);
|
Chris@16
|
217 maybe_send_batch(dest);
|
Chris@16
|
218
|
Chris@16
|
219 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
220 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
|
Chris@16
|
221 << tag << ", bytes = " << header.bytes << std::endl;
|
Chris@16
|
222 #endif
|
Chris@16
|
223 }
|
Chris@16
|
224
|
Chris@16
|
225 template<typename T>
|
Chris@16
|
226 inline void
|
Chris@16
|
227 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
228 int tag, const T& value)
|
Chris@16
|
229 {
|
Chris@16
|
230 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
|
Chris@16
|
231 boost::mpi::is_mpi_datatype<T>());
|
Chris@16
|
232 }
|
Chris@16
|
233
|
Chris@16
|
234 template<typename T>
|
Chris@16
|
235 typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
|
Chris@16
|
236 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
237 int tag, const T values[], std::size_t n)
|
Chris@16
|
238 {
|
Chris@16
|
239 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
240 boost::serialization::make_array(values,n),
|
Chris@16
|
241 boost::mpl::true_());
|
Chris@16
|
242 }
|
Chris@16
|
243
|
Chris@16
|
244 template<typename T>
|
Chris@16
|
245 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
|
Chris@16
|
246 mpi_process_group::
|
Chris@16
|
247 array_send_impl(int dest, int tag, const T values[], std::size_t n) const
|
Chris@16
|
248 {
|
Chris@16
|
249 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
|
Chris@16
|
250
|
Chris@16
|
251 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
|
Chris@16
|
252
|
Chris@16
|
253 // Start constructing the message header
|
Chris@16
|
254 impl::message_header header;
|
Chris@16
|
255 header.source = process_id(*this);
|
Chris@16
|
256 header.tag = tag;
|
Chris@16
|
257 header.offset = outgoing.buffer.size();
|
Chris@16
|
258
|
Chris@16
|
259 // Serialize into the buffer
|
Chris@16
|
260 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
|
Chris@16
|
261 out << n;
|
Chris@16
|
262
|
Chris@16
|
263 for (std::size_t i = 0; i < n; ++i)
|
Chris@16
|
264 out << values[i];
|
Chris@16
|
265
|
Chris@16
|
266 // Store the header
|
Chris@16
|
267 header.bytes = outgoing.buffer.size() - header.offset;
|
Chris@16
|
268 outgoing.headers.push_back(header);
|
Chris@16
|
269 maybe_send_batch(dest);
|
Chris@16
|
270
|
Chris@16
|
271 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
272 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
|
Chris@16
|
273 << tag << ", bytes = " << header.bytes << std::endl;
|
Chris@16
|
274 #endif
|
Chris@16
|
275 }
|
Chris@16
|
276
|
Chris@16
|
277 template<typename T>
|
Chris@16
|
278 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
|
Chris@16
|
279 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
280 int tag, const T values[], std::size_t n)
|
Chris@16
|
281 {
|
Chris@16
|
282 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
283 values, n);
|
Chris@16
|
284 }
|
Chris@16
|
285
|
Chris@16
|
286 template<typename InputIterator>
|
Chris@16
|
287 void
|
Chris@16
|
288 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
|
Chris@16
|
289 int tag, InputIterator first, InputIterator last)
|
Chris@16
|
290 {
|
Chris@16
|
291 typedef typename std::iterator_traits<InputIterator>::value_type value_type;
|
Chris@16
|
292 std::vector<value_type> values(first, last);
|
Chris@16
|
293 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
|
Chris@16
|
294 else send(pg, dest, tag, &values[0], values.size());
|
Chris@16
|
295 }
|
Chris@16
|
296
|
Chris@16
|
297 template<typename T>
|
Chris@16
|
298 bool
|
Chris@16
|
299 mpi_process_group::receive_impl(int source, int tag, T& value,
|
Chris@16
|
300 mpl::true_ /*is_mpi_datatype*/) const
|
Chris@16
|
301 {
|
Chris@16
|
302 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
303 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
|
Chris@16
|
304 << tag << std::endl;
|
Chris@16
|
305 #endif
|
Chris@16
|
306
|
Chris@16
|
307 impl::incoming_messages& incoming = impl_->incoming[source];
|
Chris@16
|
308
|
Chris@16
|
309 // Find the next header with the right tag
|
Chris@16
|
310 std::vector<impl::message_header>::iterator header =
|
Chris@16
|
311 incoming.next_header[my_block_number()];
|
Chris@16
|
312 while (header != incoming.headers.end() && header->tag != tag) ++header;
|
Chris@16
|
313
|
Chris@16
|
314 // If no header is found, notify the caller
|
Chris@16
|
315 if (header == incoming.headers.end()) return false;
|
Chris@16
|
316
|
Chris@16
|
317 // Unpack the data
|
Chris@16
|
318 if (header->bytes > 0) {
|
Chris@16
|
319 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
|
Chris@16
|
320 archive::no_header, header->offset);
|
Chris@16
|
321 ia >> value;
|
Chris@16
|
322 }
|
Chris@16
|
323
|
Chris@16
|
324 // Mark this message as received
|
Chris@16
|
325 header->tag = -1;
|
Chris@16
|
326
|
Chris@16
|
327 // Move the "next header" indicator to the next unreceived message
|
Chris@16
|
328 while (incoming.next_header[my_block_number()] != incoming.headers.end()
|
Chris@16
|
329 && incoming.next_header[my_block_number()]->tag == -1)
|
Chris@16
|
330 ++incoming.next_header[my_block_number()];
|
Chris@16
|
331
|
Chris@16
|
332 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
|
Chris@16
|
333 bool finished = true;
|
Chris@16
|
334 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
|
Chris@16
|
335 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
|
Chris@16
|
336 }
|
Chris@16
|
337
|
Chris@16
|
338 if (finished) {
|
Chris@16
|
339 std::vector<impl::message_header> no_headers;
|
Chris@16
|
340 incoming.headers.swap(no_headers);
|
Chris@16
|
341 buffer_type empty_buffer;
|
Chris@16
|
342 incoming.buffer.swap(empty_buffer);
|
Chris@16
|
343 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
|
Chris@16
|
344 incoming.next_header[i] = incoming.headers.end();
|
Chris@16
|
345 }
|
Chris@16
|
346 }
|
Chris@16
|
347
|
Chris@16
|
348 return true;
|
Chris@16
|
349 }
|
Chris@16
|
350
|
Chris@16
|
351 template<typename T>
|
Chris@16
|
352 bool
|
Chris@16
|
353 mpi_process_group::receive_impl(int source, int tag, T& value,
|
Chris@16
|
354 mpl::false_ /*is_mpi_datatype*/) const
|
Chris@16
|
355 {
|
Chris@16
|
356 impl::incoming_messages& incoming = impl_->incoming[source];
|
Chris@16
|
357
|
Chris@16
|
358 // Find the next header with the right tag
|
Chris@16
|
359 std::vector<impl::message_header>::iterator header =
|
Chris@16
|
360 incoming.next_header[my_block_number()];
|
Chris@16
|
361 while (header != incoming.headers.end() && header->tag != tag) ++header;
|
Chris@16
|
362
|
Chris@16
|
363 // If no header is found, notify the caller
|
Chris@16
|
364 if (header == incoming.headers.end()) return false;
|
Chris@16
|
365
|
Chris@16
|
366 // Deserialize the data
|
Chris@16
|
367 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
|
Chris@16
|
368 archive::no_header, header->offset);
|
Chris@16
|
369 in >> value;
|
Chris@16
|
370
|
Chris@16
|
371 // Mark this message as received
|
Chris@16
|
372 header->tag = -1;
|
Chris@16
|
373
|
Chris@16
|
374 // Move the "next header" indicator to the next unreceived message
|
Chris@16
|
375 while (incoming.next_header[my_block_number()] != incoming.headers.end()
|
Chris@16
|
376 && incoming.next_header[my_block_number()]->tag == -1)
|
Chris@16
|
377 ++incoming.next_header[my_block_number()];
|
Chris@16
|
378
|
Chris@16
|
379 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
|
Chris@16
|
380 bool finished = true;
|
Chris@16
|
381 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
|
Chris@16
|
382 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
|
Chris@16
|
383 }
|
Chris@16
|
384
|
Chris@16
|
385 if (finished) {
|
Chris@16
|
386 std::vector<impl::message_header> no_headers;
|
Chris@16
|
387 incoming.headers.swap(no_headers);
|
Chris@16
|
388 buffer_type empty_buffer;
|
Chris@16
|
389 incoming.buffer.swap(empty_buffer);
|
Chris@16
|
390 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
|
Chris@16
|
391 incoming.next_header[i] = incoming.headers.end();
|
Chris@16
|
392 }
|
Chris@16
|
393 }
|
Chris@16
|
394
|
Chris@16
|
395 return true;
|
Chris@16
|
396 }
|
Chris@16
|
397
|
Chris@16
|
398 template<typename T>
|
Chris@16
|
399 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
|
Chris@16
|
400 mpi_process_group::
|
Chris@16
|
401 array_receive_impl(int source, int tag, T* values, std::size_t& n) const
|
Chris@16
|
402 {
|
Chris@16
|
403 impl::incoming_messages& incoming = impl_->incoming[source];
|
Chris@16
|
404
|
Chris@16
|
405 // Find the next header with the right tag
|
Chris@16
|
406 std::vector<impl::message_header>::iterator header =
|
Chris@16
|
407 incoming.next_header[my_block_number()];
|
Chris@16
|
408 while (header != incoming.headers.end() && header->tag != tag) ++header;
|
Chris@16
|
409
|
Chris@16
|
410 // If no header is found, notify the caller
|
Chris@16
|
411 if (header == incoming.headers.end()) return false;
|
Chris@16
|
412
|
Chris@16
|
413 // Deserialize the data
|
Chris@16
|
414 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
|
Chris@16
|
415 archive::no_header, header->offset);
|
Chris@16
|
416 std::size_t num_sent;
|
Chris@16
|
417 in >> num_sent;
|
Chris@16
|
418 if (num_sent > n)
|
Chris@16
|
419 std::cerr << "ERROR: Have " << num_sent << " items but only space for "
|
Chris@16
|
420 << n << " items\n";
|
Chris@16
|
421
|
Chris@16
|
422 for (std::size_t i = 0; i < num_sent; ++i)
|
Chris@16
|
423 in >> values[i];
|
Chris@16
|
424 n = num_sent;
|
Chris@16
|
425
|
Chris@16
|
426 // Mark this message as received
|
Chris@16
|
427 header->tag = -1;
|
Chris@16
|
428
|
Chris@16
|
429 // Move the "next header" indicator to the next unreceived message
|
Chris@16
|
430 while (incoming.next_header[my_block_number()] != incoming.headers.end()
|
Chris@16
|
431 && incoming.next_header[my_block_number()]->tag == -1)
|
Chris@16
|
432 ++incoming.next_header[my_block_number()];
|
Chris@16
|
433
|
Chris@16
|
434 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
|
Chris@16
|
435 bool finished = true;
|
Chris@16
|
436 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
|
Chris@16
|
437 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
|
Chris@16
|
438 }
|
Chris@16
|
439
|
Chris@16
|
440 if (finished) {
|
Chris@16
|
441 std::vector<impl::message_header> no_headers;
|
Chris@16
|
442 incoming.headers.swap(no_headers);
|
Chris@16
|
443 buffer_type empty_buffer;
|
Chris@16
|
444 incoming.buffer.swap(empty_buffer);
|
Chris@16
|
445 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
|
Chris@16
|
446 incoming.next_header[i] = incoming.headers.end();
|
Chris@16
|
447 }
|
Chris@16
|
448 }
|
Chris@16
|
449
|
Chris@16
|
450 return true;
|
Chris@16
|
451 }
|
Chris@16
|
452
|
Chris@16
|
453 // Construct triggers
|
Chris@16
|
454 template<typename Type, typename Handler>
|
Chris@16
|
455 void mpi_process_group::trigger(int tag, const Handler& handler)
|
Chris@16
|
456 {
|
Chris@16
|
457 BOOST_ASSERT(block_num);
|
Chris@16
|
458 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
|
Chris@16
|
459 new trigger_launcher<Type, Handler>(*this, tag, handler)));
|
Chris@16
|
460 }
|
Chris@16
|
461
|
Chris@16
|
462 template<typename Type, typename Handler>
|
Chris@16
|
463 void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
|
Chris@16
|
464 {
|
Chris@16
|
465 BOOST_ASSERT(block_num);
|
Chris@16
|
466 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
|
Chris@16
|
467 new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
|
Chris@16
|
468 }
|
Chris@16
|
469
|
Chris@16
|
470 template<typename Type, typename Handler>
|
Chris@16
|
471 void mpi_process_group::global_trigger(int tag, const Handler& handler,
|
Chris@16
|
472 std::size_t sz)
|
Chris@16
|
473 {
|
Chris@16
|
474 if (sz==0) // normal trigger
|
Chris@16
|
475 install_trigger(tag,0,shared_ptr<trigger_base>(
|
Chris@16
|
476 new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
|
Chris@16
|
477 else // trigger with irecv
|
Chris@16
|
478 install_trigger(tag,0,shared_ptr<trigger_base>(
|
Chris@16
|
479 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
|
Chris@16
|
480
|
Chris@16
|
481 }
|
Chris@16
|
482
|
Chris@16
|
483 namespace detail {
|
Chris@16
|
484
|
Chris@16
|
485 template<typename Type>
|
Chris@16
|
486 void do_oob_receive(mpi_process_group const& self,
|
Chris@16
|
487 int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
|
Chris@16
|
488 {
|
Chris@16
|
489 using boost::mpi::get_mpi_datatype;
|
Chris@16
|
490
|
Chris@16
|
491 //self.impl_->comm.recv(source,tag,data);
|
Chris@16
|
492 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
|
Chris@16
|
493 MPI_STATUS_IGNORE);
|
Chris@16
|
494 }
|
Chris@16
|
495
|
Chris@16
|
496 template<typename Type>
|
Chris@16
|
497 void do_oob_receive(mpi_process_group const& self,
|
Chris@16
|
498 int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
|
Chris@16
|
499 {
|
Chris@16
|
500 // self.impl_->comm.recv(source,tag,data);
|
Chris@16
|
501 // Receive the size of the data packet
|
Chris@16
|
502 boost::mpi::status status;
|
Chris@16
|
503 status = self.impl_->comm.probe(source, tag);
|
Chris@16
|
504
|
Chris@16
|
505 #if BOOST_VERSION >= 103600
|
Chris@16
|
506 int size = status.count<boost::mpi::packed>().get();
|
Chris@16
|
507 #else
|
Chris@16
|
508 int size;
|
Chris@16
|
509 MPI_Status& mpi_status = status;
|
Chris@16
|
510 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
|
Chris@16
|
511 #endif
|
Chris@16
|
512
|
Chris@16
|
513 // Receive the data packed itself
|
Chris@16
|
514 boost::mpi::packed_iarchive in(self.impl_->comm);
|
Chris@16
|
515 in.resize(size);
|
Chris@16
|
516 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
|
Chris@16
|
517 MPI_STATUS_IGNORE);
|
Chris@16
|
518
|
Chris@16
|
519 // Deserialize the data
|
Chris@16
|
520 in >> data;
|
Chris@16
|
521 }
|
Chris@16
|
522
|
Chris@16
|
523 template<typename Type>
|
Chris@16
|
524 void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
|
Chris@16
|
525 {
|
Chris@16
|
526 do_oob_receive(self, source, tag, data,
|
Chris@16
|
527 boost::mpi::is_mpi_datatype<Type>());
|
Chris@16
|
528 }
|
Chris@16
|
529
|
Chris@16
|
530
|
Chris@16
|
531 } // namespace detail
|
Chris@16
|
532
|
Chris@16
|
533
|
Chris@16
|
534 template<typename Type, typename Handler>
|
Chris@16
|
535 void
|
Chris@16
|
536 mpi_process_group::trigger_launcher<Type, Handler>::
|
Chris@16
|
537 receive(mpi_process_group const&, int source, int tag,
|
Chris@16
|
538 trigger_receive_context context, int block) const
|
Chris@16
|
539 {
|
Chris@16
|
540 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
541 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
|
Chris@16
|
542 << " receive from source " << source << " and tag " << tag
|
Chris@16
|
543 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
|
Chris@16
|
544 #endif
|
Chris@16
|
545
|
Chris@16
|
546 Type data;
|
Chris@16
|
547
|
Chris@16
|
548 if (context == trc_out_of_band) {
|
Chris@16
|
549 // Receive the message directly off the wire
|
Chris@16
|
550 int realtag = self.encode_tag(
|
Chris@16
|
551 block == -1 ? self.my_block_number() : block, tag);
|
Chris@16
|
552 detail::do_oob_receive(self,source,realtag,data);
|
Chris@16
|
553 }
|
Chris@16
|
554 else
|
Chris@16
|
555 // Receive the message out of the local buffer
|
Chris@16
|
556 boost::graph::distributed::receive(self, source, tag, data);
|
Chris@16
|
557
|
Chris@16
|
558 // Pass the message off to the handler
|
Chris@16
|
559 handler(source, tag, data, context);
|
Chris@16
|
560 }
|
Chris@16
|
561
|
Chris@16
|
562 template<typename Type, typename Handler>
|
Chris@16
|
563 void
|
Chris@16
|
564 mpi_process_group::reply_trigger_launcher<Type, Handler>::
|
Chris@16
|
565 receive(mpi_process_group const&, int source, int tag,
|
Chris@16
|
566 trigger_receive_context context, int block) const
|
Chris@16
|
567 {
|
Chris@16
|
568 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
569 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
|
Chris@16
|
570 << " receive from source " << source << " and tag " << tag
|
Chris@16
|
571 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
|
Chris@16
|
572 #endif
|
Chris@16
|
573 BOOST_ASSERT(context == trc_out_of_band);
|
Chris@16
|
574
|
Chris@16
|
575 boost::parallel::detail::untracked_pair<int, Type> data;
|
Chris@16
|
576
|
Chris@16
|
577 // Receive the message directly off the wire
|
Chris@16
|
578 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
|
Chris@16
|
579 tag);
|
Chris@16
|
580 detail::do_oob_receive(self, source, realtag, data);
|
Chris@16
|
581
|
Chris@16
|
582 // Pass the message off to the handler and send the result back to
|
Chris@16
|
583 // the source.
|
Chris@16
|
584 send_oob(self, source, data.first,
|
Chris@16
|
585 handler(source, tag, data.second, context), -2);
|
Chris@16
|
586 }
|
Chris@16
|
587
|
Chris@16
|
588 template<typename Type, typename Handler>
|
Chris@16
|
589 void
|
Chris@16
|
590 mpi_process_group::global_trigger_launcher<Type, Handler>::
|
Chris@16
|
591 receive(mpi_process_group const& self, int source, int tag,
|
Chris@16
|
592 trigger_receive_context context, int block) const
|
Chris@16
|
593 {
|
Chris@16
|
594 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
595 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
|
Chris@16
|
596 << " receive from source " << source << " and tag " << tag
|
Chris@16
|
597 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
|
Chris@16
|
598 #endif
|
Chris@16
|
599
|
Chris@16
|
600 Type data;
|
Chris@16
|
601
|
Chris@16
|
602 if (context == trc_out_of_band) {
|
Chris@16
|
603 // Receive the message directly off the wire
|
Chris@16
|
604 int realtag = self.encode_tag(
|
Chris@16
|
605 block == -1 ? self.my_block_number() : block, tag);
|
Chris@16
|
606 detail::do_oob_receive(self,source,realtag,data);
|
Chris@16
|
607 }
|
Chris@16
|
608 else
|
Chris@16
|
609 // Receive the message out of the local buffer
|
Chris@16
|
610 boost::graph::distributed::receive(self, source, tag, data);
|
Chris@16
|
611
|
Chris@16
|
612 // Pass the message off to the handler
|
Chris@16
|
613 handler(self, source, tag, data, context);
|
Chris@16
|
614 }
|
Chris@16
|
615
|
Chris@16
|
616
|
Chris@16
|
617 template<typename Type, typename Handler>
|
Chris@16
|
618 void
|
Chris@16
|
619 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
|
Chris@16
|
620 receive(mpi_process_group const& self, int source, int tag,
|
Chris@16
|
621 trigger_receive_context context, int block) const
|
Chris@16
|
622 {
|
Chris@16
|
623 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
624 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
|
Chris@16
|
625 << " receive from source " << source << " and tag " << tag
|
Chris@16
|
626 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
|
Chris@16
|
627 #endif
|
Chris@16
|
628
|
Chris@16
|
629 Type data;
|
Chris@16
|
630
|
Chris@16
|
631 if (context == trc_out_of_band) {
|
Chris@16
|
632 return;
|
Chris@16
|
633 }
|
Chris@16
|
634 BOOST_ASSERT (context == trc_irecv_out_of_band);
|
Chris@16
|
635
|
Chris@16
|
636 // force posting of new MPI_Irecv, even though buffer is already allocated
|
Chris@16
|
637 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
|
Chris@16
|
638 ia >> data;
|
Chris@16
|
639 // Start a new receive
|
Chris@16
|
640 prepare_receive(self,tag,true);
|
Chris@16
|
641 // Pass the message off to the handler
|
Chris@16
|
642 handler(self, source, tag, data, context);
|
Chris@16
|
643 }
|
Chris@16
|
644
|
Chris@16
|
645
|
Chris@16
|
646 template<typename Type, typename Handler>
|
Chris@16
|
647 void
|
Chris@16
|
648 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
|
Chris@16
|
649 prepare_receive(mpi_process_group const& self, int tag, bool force) const
|
Chris@16
|
650 {
|
Chris@16
|
651 #ifdef PBGL_PROCESS_GROUP_DEBUG
|
Chris@16
|
652 std::cerr << ("Posting Irecv for trigger")
|
Chris@16
|
653 << " receive with tag " << tag << std::endl;
|
Chris@16
|
654 #endif
|
Chris@16
|
655 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
|
Chris@16
|
656 self.impl_->buffers[tag].resize(buffer_size);
|
Chris@16
|
657 force = true;
|
Chris@16
|
658 }
|
Chris@16
|
659 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
|
Chris@16
|
660
|
Chris@16
|
661 //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
|
Chris@16
|
662 if (force) {
|
Chris@16
|
663 self.impl_->requests.push_back(MPI_Request());
|
Chris@16
|
664 MPI_Request* request = &self.impl_->requests.back();
|
Chris@16
|
665 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
|
Chris@16
|
666 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
|
Chris@16
|
667 }
|
Chris@16
|
668 }
|
Chris@16
|
669
|
Chris@16
|
670
|
Chris@16
|
671 template<typename T>
|
Chris@16
|
672 inline mpi_process_group::process_id_type
|
Chris@16
|
673 receive(const mpi_process_group& pg, int tag, T& value)
|
Chris@16
|
674 {
|
Chris@16
|
675 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
|
Chris@16
|
676 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
677 value, boost::mpi::is_mpi_datatype<T>()))
|
Chris@16
|
678 return source;
|
Chris@16
|
679 }
|
Chris@16
|
680 BOOST_ASSERT (false);
|
Chris@16
|
681 }
|
Chris@16
|
682
|
Chris@16
|
683 template<typename T>
|
Chris@16
|
684 typename
|
Chris@16
|
685 enable_if<boost::mpi::is_mpi_datatype<T>,
|
Chris@16
|
686 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
|
Chris@16
|
687 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
|
Chris@16
|
688 {
|
Chris@16
|
689 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
|
Chris@16
|
690 bool result =
|
Chris@16
|
691 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
692 boost::serialization::make_array(values,n),
|
Chris@16
|
693 boost::mpl::true_());
|
Chris@16
|
694 if (result)
|
Chris@16
|
695 return std::make_pair(source, n);
|
Chris@16
|
696 }
|
Chris@16
|
697 BOOST_ASSERT(false);
|
Chris@16
|
698 }
|
Chris@16
|
699
|
Chris@16
|
700 template<typename T>
|
Chris@16
|
701 typename
|
Chris@16
|
702 disable_if<boost::mpi::is_mpi_datatype<T>,
|
Chris@16
|
703 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
|
Chris@16
|
704 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
|
Chris@16
|
705 {
|
Chris@16
|
706 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
|
Chris@16
|
707 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
708 values, n))
|
Chris@16
|
709 return std::make_pair(source, n);
|
Chris@16
|
710 }
|
Chris@16
|
711 BOOST_ASSERT(false);
|
Chris@16
|
712 }
|
Chris@16
|
713
|
Chris@16
|
714 template<typename T>
|
Chris@16
|
715 mpi_process_group::process_id_type
|
Chris@16
|
716 receive(const mpi_process_group& pg,
|
Chris@16
|
717 mpi_process_group::process_id_type source, int tag, T& value)
|
Chris@16
|
718 {
|
Chris@16
|
719 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
720 value, boost::mpi::is_mpi_datatype<T>()))
|
Chris@16
|
721 return source;
|
Chris@16
|
722 else {
|
Chris@16
|
723 fprintf(stderr,
|
Chris@16
|
724 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
|
Chris@16
|
725 process_id(pg), source, tag, pg.my_block_number());
|
Chris@16
|
726
|
Chris@16
|
727 BOOST_ASSERT(false);
|
Chris@16
|
728 abort();
|
Chris@16
|
729 }
|
Chris@16
|
730 }
|
Chris@16
|
731
|
Chris@16
|
732 template<typename T>
|
Chris@16
|
733 typename
|
Chris@16
|
734 enable_if<boost::mpi::is_mpi_datatype<T>,
|
Chris@16
|
735 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
|
Chris@16
|
736 receive(const mpi_process_group& pg, int source, int tag, T values[],
|
Chris@16
|
737 std::size_t n)
|
Chris@16
|
738 {
|
Chris@16
|
739 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
740 boost::serialization::make_array(values,n),
|
Chris@16
|
741 boost::mpl::true_()))
|
Chris@16
|
742 return std::make_pair(source,n);
|
Chris@16
|
743 else {
|
Chris@16
|
744 fprintf(stderr,
|
Chris@16
|
745 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
|
Chris@16
|
746 process_id(pg), source, tag, pg.my_block_number());
|
Chris@16
|
747
|
Chris@16
|
748 BOOST_ASSERT(false);
|
Chris@16
|
749 abort();
|
Chris@16
|
750 }
|
Chris@16
|
751 }
|
Chris@16
|
752
|
Chris@16
|
753 template<typename T>
|
Chris@16
|
754 typename
|
Chris@16
|
755 disable_if<boost::mpi::is_mpi_datatype<T>,
|
Chris@16
|
756 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
|
Chris@16
|
757 receive(const mpi_process_group& pg, int source, int tag, T values[],
|
Chris@16
|
758 std::size_t n)
|
Chris@16
|
759 {
|
Chris@16
|
760 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
|
Chris@16
|
761 values, n);
|
Chris@16
|
762
|
Chris@16
|
763 return std::make_pair(source, n);
|
Chris@16
|
764 }
|
Chris@16
|
765
|
Chris@16
|
766 template<typename T, typename BinaryOperation>
|
Chris@16
|
767 T*
|
Chris@16
|
768 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
|
Chris@16
|
769 BinaryOperation bin_op)
|
Chris@16
|
770 {
|
Chris@16
|
771 synchronize(pg);
|
Chris@16
|
772
|
Chris@16
|
773 bool inplace = first == out;
|
Chris@16
|
774
|
Chris@16
|
775 if (inplace) out = new T [last-first];
|
Chris@16
|
776
|
Chris@16
|
777 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
|
Chris@16
|
778 boost::mpi::comm_attach),
|
Chris@16
|
779 first, last-first, out, bin_op);
|
Chris@16
|
780
|
Chris@16
|
781 if (inplace) {
|
Chris@16
|
782 std::copy(out, out + (last-first), first);
|
Chris@16
|
783 delete [] out;
|
Chris@16
|
784 return last;
|
Chris@16
|
785 }
|
Chris@16
|
786
|
Chris@16
|
787 return out;
|
Chris@16
|
788 }
|
Chris@16
|
789
|
Chris@16
|
790 template<typename T>
|
Chris@16
|
791 void
|
Chris@16
|
792 broadcast(const mpi_process_group& pg, T& val,
|
Chris@16
|
793 mpi_process_group::process_id_type root)
|
Chris@16
|
794 {
|
Chris@16
|
795 // broadcast the seed
|
Chris@16
|
796 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
|
Chris@16
|
797 boost::mpi::broadcast(comm,val,root);
|
Chris@16
|
798 }
|
Chris@16
|
799
|
Chris@16
|
800
|
Chris@16
|
801 template<typename T, typename BinaryOperation>
|
Chris@16
|
802 T*
|
Chris@16
|
803 scan(const mpi_process_group& pg, T* first, T* last, T* out,
|
Chris@16
|
804 BinaryOperation bin_op)
|
Chris@16
|
805 {
|
Chris@16
|
806 synchronize(pg);
|
Chris@16
|
807
|
Chris@16
|
808 bool inplace = first == out;
|
Chris@16
|
809
|
Chris@16
|
810 if (inplace) out = new T [last-first];
|
Chris@16
|
811
|
Chris@16
|
812 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
|
Chris@16
|
813
|
Chris@16
|
814 if (inplace) {
|
Chris@16
|
815 std::copy(out, out + (last-first), first);
|
Chris@16
|
816 delete [] out;
|
Chris@16
|
817 return last;
|
Chris@16
|
818 }
|
Chris@16
|
819
|
Chris@16
|
820 return out;
|
Chris@16
|
821 }
|
Chris@16
|
822
|
Chris@16
|
823
|
Chris@16
|
824 template<typename InputIterator, typename T>
|
Chris@16
|
825 void
|
Chris@16
|
826 all_gather(const mpi_process_group& pg, InputIterator first,
|
Chris@16
|
827 InputIterator last, std::vector<T>& out)
|
Chris@16
|
828 {
|
Chris@16
|
829 synchronize(pg);
|
Chris@16
|
830
|
Chris@16
|
831 // Stick a copy of the local values into a vector, so we can broadcast it
|
Chris@16
|
832 std::vector<T> local_values(first, last);
|
Chris@16
|
833
|
Chris@16
|
834 // Collect the number of vertices stored in each process
|
Chris@16
|
835 int size = local_values.size();
|
Chris@16
|
836 std::vector<int> sizes(num_processes(pg));
|
Chris@16
|
837 int result = MPI_Allgather(&size, 1, MPI_INT,
|
Chris@16
|
838 &sizes[0], 1, MPI_INT,
|
Chris@16
|
839 communicator(pg));
|
Chris@16
|
840 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
841
|
Chris@16
|
842 // Adjust sizes based on the number of bytes
|
Chris@16
|
843 std::transform(sizes.begin(), sizes.end(), sizes.begin(),
|
Chris@16
|
844 std::bind2nd(std::multiplies<int>(), sizeof(T)));
|
Chris@16
|
845
|
Chris@16
|
846 // Compute displacements
|
Chris@16
|
847 std::vector<int> displacements;
|
Chris@16
|
848 displacements.reserve(sizes.size() + 1);
|
Chris@16
|
849 displacements.push_back(0);
|
Chris@16
|
850 std::partial_sum(sizes.begin(), sizes.end(),
|
Chris@16
|
851 std::back_inserter(displacements));
|
Chris@16
|
852
|
Chris@16
|
853 // Gather all of the values
|
Chris@16
|
854 out.resize(displacements.back() / sizeof(T));
|
Chris@16
|
855 if (!out.empty()) {
|
Chris@16
|
856 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
|
Chris@16
|
857 /* local results */: (void*)&local_values[0],
|
Chris@16
|
858 local_values.size() * sizeof(T),
|
Chris@16
|
859 MPI_BYTE,
|
Chris@16
|
860 &out[0], &sizes[0], &displacements[0], MPI_BYTE,
|
Chris@16
|
861 communicator(pg));
|
Chris@16
|
862 }
|
Chris@16
|
863 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
864 }
|
Chris@16
|
865
|
Chris@16
|
866 template<typename InputIterator>
|
Chris@16
|
867 mpi_process_group
|
Chris@16
|
868 process_subgroup(const mpi_process_group& pg,
|
Chris@16
|
869 InputIterator first, InputIterator last)
|
Chris@16
|
870 {
|
Chris@16
|
871 /*
|
Chris@16
|
872 boost::mpi::group current_group = communicator(pg).group();
|
Chris@16
|
873 boost::mpi::group new_group = current_group.include(first,last);
|
Chris@16
|
874 boost::mpi::communicator new_comm(communicator(pg),new_group);
|
Chris@16
|
875 return mpi_process_group(new_comm);
|
Chris@16
|
876 */
|
Chris@16
|
877 std::vector<int> ranks(first, last);
|
Chris@16
|
878
|
Chris@16
|
879 MPI_Group current_group;
|
Chris@16
|
880 int result = MPI_Comm_group(communicator(pg), ¤t_group);
|
Chris@16
|
881 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
882
|
Chris@16
|
883 MPI_Group new_group;
|
Chris@16
|
884 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
|
Chris@16
|
885 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
886
|
Chris@16
|
887 MPI_Comm new_comm;
|
Chris@16
|
888 result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
|
Chris@16
|
889 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
890
|
Chris@16
|
891 result = MPI_Group_free(&new_group);
|
Chris@16
|
892 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
893 result = MPI_Group_free(¤t_group);
|
Chris@16
|
894 BOOST_ASSERT(result == MPI_SUCCESS);
|
Chris@16
|
895
|
Chris@16
|
896 if (new_comm != MPI_COMM_NULL) {
|
Chris@16
|
897 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
|
Chris@16
|
898 result = MPI_Comm_free(&new_comm);
|
Chris@16
|
899 BOOST_ASSERT(result == 0);
|
Chris@16
|
900 return result_pg;
|
Chris@16
|
901 } else {
|
Chris@16
|
902 return mpi_process_group(mpi_process_group::create_empty());
|
Chris@16
|
903 }
|
Chris@16
|
904
|
Chris@16
|
905 }
|
Chris@16
|
906
|
Chris@16
|
907
|
Chris@16
|
908 template<typename Receiver>
|
Chris@16
|
909 Receiver* mpi_process_group::get_receiver()
|
Chris@16
|
910 {
|
Chris@16
|
911 return impl_->blocks[my_block_number()]->on_receive
|
Chris@16
|
912 .template target<Receiver>();
|
Chris@16
|
913 }
|
Chris@16
|
914
|
Chris@16
|
915 template<typename T>
|
Chris@16
|
916 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
917 receive_oob(const mpi_process_group& pg,
|
Chris@16
|
918 mpi_process_group::process_id_type source, int tag, T& value, int block)
|
Chris@16
|
919 {
|
Chris@16
|
920 using boost::mpi::get_mpi_datatype;
|
Chris@16
|
921
|
Chris@16
|
922 // Determine the actual message we expect to receive, and which
|
Chris@16
|
923 // communicator it will come by.
|
Chris@16
|
924 std::pair<boost::mpi::communicator, int> actual
|
Chris@16
|
925 = pg.actual_communicator_and_tag(tag, block);
|
Chris@16
|
926
|
Chris@16
|
927 // Post a non-blocking receive that waits until we complete this request.
|
Chris@16
|
928 MPI_Request request;
|
Chris@16
|
929 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
|
Chris@16
|
930 source, actual.second, actual.first, &request);
|
Chris@16
|
931
|
Chris@16
|
932 int done = 0;
|
Chris@16
|
933 do {
|
Chris@16
|
934 MPI_Test(&request, &done, MPI_STATUS_IGNORE);
|
Chris@16
|
935 if (!done)
|
Chris@16
|
936 pg.poll(/*wait=*/false, block);
|
Chris@16
|
937 } while (!done);
|
Chris@16
|
938 }
|
Chris@16
|
939
|
Chris@16
|
940 template<typename T>
|
Chris@16
|
941 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
|
Chris@16
|
942 receive_oob(const mpi_process_group& pg,
|
Chris@16
|
943 mpi_process_group::process_id_type source, int tag, T& value, int block)
|
Chris@16
|
944 {
|
Chris@16
|
945 // Determine the actual message we expect to receive, and which
|
Chris@16
|
946 // communicator it will come by.
|
Chris@16
|
947 std::pair<boost::mpi::communicator, int> actual
|
Chris@16
|
948 = pg.actual_communicator_and_tag(tag, block);
|
Chris@16
|
949
|
Chris@16
|
950 boost::optional<boost::mpi::status> status;
|
Chris@16
|
951 do {
|
Chris@16
|
952 status = actual.first.iprobe(source, actual.second);
|
Chris@16
|
953 if (!status)
|
Chris@16
|
954 pg.poll();
|
Chris@16
|
955 } while (!status);
|
Chris@16
|
956
|
Chris@16
|
957 //actual.first.recv(status->source(), status->tag(),value);
|
Chris@16
|
958
|
Chris@16
|
959 // Allocate the receive buffer
|
Chris@16
|
960 boost::mpi::packed_iarchive in(actual.first);
|
Chris@16
|
961
|
Chris@16
|
962 #if BOOST_VERSION >= 103600
|
Chris@16
|
963 in.resize(status->count<boost::mpi::packed>().get());
|
Chris@16
|
964 #else
|
Chris@16
|
965 int size;
|
Chris@16
|
966 MPI_Status mpi_status = *status;
|
Chris@16
|
967 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
|
Chris@16
|
968 in.resize(size);
|
Chris@16
|
969 #endif
|
Chris@16
|
970
|
Chris@16
|
971 // Receive the message data
|
Chris@16
|
972 MPI_Recv(in.address(), in.size(), MPI_PACKED,
|
Chris@16
|
973 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
|
Chris@16
|
974
|
Chris@16
|
975 // Unpack the message data
|
Chris@16
|
976 in >> value;
|
Chris@16
|
977 }
|
Chris@16
|
978
|
Chris@16
|
979
|
Chris@16
|
980 template<typename SendT, typename ReplyT>
|
Chris@16
|
981 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
|
Chris@16
|
982 send_oob_with_reply(const mpi_process_group& pg,
|
Chris@16
|
983 mpi_process_group::process_id_type dest,
|
Chris@16
|
984 int tag, const SendT& send_value, ReplyT& reply_value,
|
Chris@16
|
985 int block)
|
Chris@16
|
986 {
|
Chris@16
|
987 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
|
Chris@16
|
988 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
|
Chris@16
|
989 (int)reply_tag, send_value), block);
|
Chris@16
|
990 receive_oob(pg, dest, reply_tag, reply_value);
|
Chris@16
|
991 }
|
Chris@16
|
992
|
Chris@16
|
993 template<typename SendT, typename ReplyT>
|
Chris@16
|
994 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
|
Chris@16
|
995 send_oob_with_reply(const mpi_process_group& pg,
|
Chris@16
|
996 mpi_process_group::process_id_type dest,
|
Chris@16
|
997 int tag, const SendT& send_value, ReplyT& reply_value,
|
Chris@16
|
998 int block)
|
Chris@16
|
999 {
|
Chris@16
|
1000 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
|
Chris@16
|
1001 send_oob(pg, dest, tag,
|
Chris@16
|
1002 boost::parallel::detail::make_untracked_pair((int)reply_tag,
|
Chris@16
|
1003 send_value), block);
|
Chris@16
|
1004 receive_oob(pg, dest, reply_tag, reply_value);
|
Chris@16
|
1005 }
|
Chris@16
|
1006
|
Chris@16
|
1007 } } } // end namespace boost::graph::distributed
|