annotate DEPENDENCIES/generic/include/boost/graph/distributed/detail/mpi_process_group.ipp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents 2665513ce2d3
children
rev   line source
Chris@16 1 // -*- C++ -*-
Chris@16 2
Chris@16 3 // Copyright (C) 2004-2008 The Trustees of Indiana University.
Chris@16 4 // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
Chris@16 5 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
Chris@16 6
Chris@16 7 // Use, modification and distribution is subject to the Boost Software
Chris@16 8 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 9 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 10
Chris@16 11 // Authors: Douglas Gregor
Chris@16 12 // Andrew Lumsdaine
Chris@16 13 // Matthias Troyer
Chris@16 14
Chris@16 15 //#define PBGL_PROCESS_GROUP_DEBUG
Chris@16 16
Chris@16 17 #ifndef BOOST_GRAPH_USE_MPI
Chris@16 18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
Chris@16 19 #endif
Chris@16 20
Chris@16 21 #include <boost/assert.hpp>
Chris@16 22 #include <algorithm>
Chris@16 23 #include <boost/graph/parallel/detail/untracked_pair.hpp>
Chris@16 24 #include <numeric>
Chris@16 25 #include <iterator>
Chris@16 26 #include <functional>
Chris@16 27 #include <vector>
Chris@16 28 #include <queue>
Chris@16 29 #include <stack>
Chris@16 30 #include <boost/graph/distributed/detail/tag_allocator.hpp>
Chris@16 31 #include <stdio.h>
Chris@16 32
Chris@16 33 // #define PBGL_PROCESS_GROUP_DEBUG
Chris@16 34
Chris@16 35 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 36 # include <iostream>
Chris@16 37 #endif
Chris@16 38
Chris@16 39 namespace boost { namespace graph { namespace distributed {
Chris@16 40
Chris@16 41 struct mpi_process_group::impl
Chris@16 42 {
Chris@16 43
Chris@16 44 typedef mpi_process_group::message_header message_header;
Chris@16 45 typedef mpi_process_group::outgoing_messages outgoing_messages;
Chris@16 46
Chris@16 47 /**
Chris@16 48 * Stores the incoming messages from a particular processor.
Chris@16 49 *
Chris@16 50 * @todo Evaluate whether we should use a deque instance, which
Chris@16 51 * would reduce could reduce the cost of "receiving" messages and
Chris@16 52 allow us to deallocate memory earlier, but increases the time
Chris@16 53 spent in the synchronization step.
Chris@16 54 */
Chris@16 55 struct incoming_messages {
Chris@16 56 incoming_messages();
Chris@16 57 ~incoming_messages() {}
Chris@16 58
Chris@16 59 std::vector<message_header> headers;
Chris@16 60 buffer_type buffer;
Chris@16 61 std::vector<std::vector<message_header>::iterator> next_header;
Chris@16 62 };
Chris@16 63
Chris@16 64 struct batch_request {
Chris@16 65 MPI_Request request;
Chris@16 66 buffer_type buffer;
Chris@16 67 };
Chris@16 68
Chris@16 69 // send once we have a certain number of messages or bytes in the buffer
Chris@16 70 // these numbers need to be tuned, we keep them small at first for testing
Chris@16 71 std::size_t batch_header_number;
Chris@16 72 std::size_t batch_buffer_size;
Chris@16 73 std::size_t batch_message_size;
Chris@16 74
Chris@16 75 /**
Chris@16 76 * The actual MPI communicator used to transmit data.
Chris@16 77 */
Chris@16 78 boost::mpi::communicator comm;
Chris@16 79
Chris@16 80 /**
Chris@16 81 * The MPI communicator used to transmit out-of-band replies.
Chris@16 82 */
Chris@16 83 boost::mpi::communicator oob_reply_comm;
Chris@16 84
Chris@16 85 /// Outgoing message information, indexed by destination processor.
Chris@16 86 std::vector<outgoing_messages> outgoing;
Chris@16 87
Chris@16 88 /// Incoming message information, indexed by source processor.
Chris@16 89 std::vector<incoming_messages> incoming;
Chris@16 90
Chris@16 91 /// The numbers of processors that have entered a synchronization stage
Chris@16 92 std::vector<int> processors_synchronizing_stage;
Chris@16 93
Chris@16 94 /// The synchronization stage of a processor
Chris@16 95 std::vector<int> synchronizing_stage;
Chris@16 96
Chris@16 97 /// Number of processors still sending messages
Chris@16 98 std::vector<int> synchronizing_unfinished;
Chris@16 99
Chris@16 100 /// Number of batches sent since last synchronization stage
Chris@16 101 std::vector<int> number_sent_batches;
Chris@16 102
Chris@16 103 /// Number of batches received minus number of expected batches
Chris@16 104 std::vector<int> number_received_batches;
Chris@16 105
Chris@16 106
Chris@16 107 /// The context of the currently-executing trigger, or @c trc_none
Chris@16 108 /// if no trigger is executing.
Chris@16 109 trigger_receive_context trigger_context;
Chris@16 110
Chris@16 111 /// Non-zero indicates that we're processing batches
Chris@16 112 /// Increment this when processing patches,
Chris@16 113 /// decrement it when you're done.
Chris@16 114 int processing_batches;
Chris@16 115
Chris@16 116 /**
Chris@16 117 * Contains all of the active blocks corresponding to attached
Chris@16 118 * distributed data structures.
Chris@16 119 */
Chris@16 120 blocks_type blocks;
Chris@16 121
Chris@16 122 /// Whether we are currently synchronizing
Chris@16 123 bool synchronizing;
Chris@16 124
Chris@16 125 /// The MPI requests for posted sends of oob messages
Chris@16 126 std::vector<MPI_Request> requests;
Chris@16 127
Chris@16 128 /// The MPI buffers for posted irecvs of oob messages
Chris@16 129 std::map<int,buffer_type> buffers;
Chris@16 130
Chris@16 131 /// Queue for message batches received while already processing messages
Chris@16 132 std::queue<std::pair<int,outgoing_messages> > new_batches;
Chris@16 133 /// Maximum encountered size of the new_batches queue
Chris@16 134 std::size_t max_received;
Chris@16 135
Chris@16 136 /// The MPI requests and buffers for batchess being sent
Chris@16 137 std::list<batch_request> sent_batches;
Chris@16 138 /// Maximum encountered size of the sent_batches list
Chris@16 139 std::size_t max_sent;
Chris@16 140
Chris@16 141 /// Pre-allocated requests in a pool
Chris@16 142 std::vector<batch_request> batch_pool;
Chris@16 143 /// A stack controlling which batches are available
Chris@16 144 std::stack<std::size_t> free_batches;
Chris@16 145
Chris@16 146 void free_sent_batches();
Chris@16 147
Chris@16 148 // Tag allocator
Chris@16 149 detail::tag_allocator allocated_tags;
Chris@16 150
Chris@16 151 impl(std::size_t num_headers, std::size_t buffers_size,
Chris@16 152 communicator_type parent_comm);
Chris@16 153 ~impl();
Chris@16 154
Chris@16 155 private:
Chris@16 156 void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
Chris@16 157 };
Chris@16 158
Chris@16 159 inline trigger_receive_context mpi_process_group::trigger_context() const
Chris@16 160 {
Chris@16 161 return impl_->trigger_context;
Chris@16 162 }
Chris@16 163
Chris@16 164 template<typename T>
Chris@16 165 void
Chris@16 166 mpi_process_group::send_impl(int dest, int tag, const T& value,
Chris@16 167 mpl::true_ /*is_mpi_datatype*/) const
Chris@16 168 {
Chris@16 169 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
Chris@16 170
Chris@16 171 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
Chris@16 172
Chris@16 173 // Start constructing the message header
Chris@16 174 impl::message_header header;
Chris@16 175 header.source = process_id(*this);
Chris@16 176 header.tag = tag;
Chris@16 177 header.offset = outgoing.buffer.size();
Chris@16 178
Chris@16 179 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
Chris@16 180 oa << value;
Chris@16 181
Chris@16 182 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 183 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
Chris@16 184 << tag << ", bytes = " << packed_size << std::endl;
Chris@16 185 #endif
Chris@16 186
Chris@16 187 // Store the header
Chris@16 188 header.bytes = outgoing.buffer.size() - header.offset;
Chris@16 189 outgoing.headers.push_back(header);
Chris@16 190
Chris@16 191 maybe_send_batch(dest);
Chris@16 192 }
Chris@16 193
Chris@16 194
Chris@16 195 template<typename T>
Chris@16 196 void
Chris@16 197 mpi_process_group::send_impl(int dest, int tag, const T& value,
Chris@16 198 mpl::false_ /*is_mpi_datatype*/) const
Chris@16 199 {
Chris@16 200 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
Chris@16 201
Chris@16 202 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
Chris@16 203
Chris@16 204 // Start constructing the message header
Chris@16 205 impl::message_header header;
Chris@16 206 header.source = process_id(*this);
Chris@16 207 header.tag = tag;
Chris@16 208 header.offset = outgoing.buffer.size();
Chris@16 209
Chris@16 210 // Serialize into the buffer
Chris@16 211 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
Chris@16 212 out << value;
Chris@16 213
Chris@16 214 // Store the header
Chris@16 215 header.bytes = outgoing.buffer.size() - header.offset;
Chris@16 216 outgoing.headers.push_back(header);
Chris@16 217 maybe_send_batch(dest);
Chris@16 218
Chris@16 219 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 220 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
Chris@16 221 << tag << ", bytes = " << header.bytes << std::endl;
Chris@16 222 #endif
Chris@16 223 }
Chris@16 224
Chris@16 225 template<typename T>
Chris@16 226 inline void
Chris@16 227 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 228 int tag, const T& value)
Chris@16 229 {
Chris@16 230 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
Chris@16 231 boost::mpi::is_mpi_datatype<T>());
Chris@16 232 }
Chris@16 233
Chris@16 234 template<typename T>
Chris@16 235 typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
Chris@16 236 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 237 int tag, const T values[], std::size_t n)
Chris@16 238 {
Chris@16 239 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 240 boost::serialization::make_array(values,n),
Chris@16 241 boost::mpl::true_());
Chris@16 242 }
Chris@16 243
Chris@16 244 template<typename T>
Chris@16 245 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
Chris@16 246 mpi_process_group::
Chris@16 247 array_send_impl(int dest, int tag, const T values[], std::size_t n) const
Chris@16 248 {
Chris@16 249 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
Chris@16 250
Chris@16 251 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
Chris@16 252
Chris@16 253 // Start constructing the message header
Chris@16 254 impl::message_header header;
Chris@16 255 header.source = process_id(*this);
Chris@16 256 header.tag = tag;
Chris@16 257 header.offset = outgoing.buffer.size();
Chris@16 258
Chris@16 259 // Serialize into the buffer
Chris@16 260 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
Chris@16 261 out << n;
Chris@16 262
Chris@16 263 for (std::size_t i = 0; i < n; ++i)
Chris@16 264 out << values[i];
Chris@16 265
Chris@16 266 // Store the header
Chris@16 267 header.bytes = outgoing.buffer.size() - header.offset;
Chris@16 268 outgoing.headers.push_back(header);
Chris@16 269 maybe_send_batch(dest);
Chris@16 270
Chris@16 271 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 272 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
Chris@16 273 << tag << ", bytes = " << header.bytes << std::endl;
Chris@16 274 #endif
Chris@16 275 }
Chris@16 276
Chris@16 277 template<typename T>
Chris@16 278 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
Chris@16 279 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 280 int tag, const T values[], std::size_t n)
Chris@16 281 {
Chris@16 282 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 283 values, n);
Chris@16 284 }
Chris@16 285
Chris@16 286 template<typename InputIterator>
Chris@16 287 void
Chris@16 288 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 289 int tag, InputIterator first, InputIterator last)
Chris@16 290 {
Chris@16 291 typedef typename std::iterator_traits<InputIterator>::value_type value_type;
Chris@16 292 std::vector<value_type> values(first, last);
Chris@16 293 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
Chris@16 294 else send(pg, dest, tag, &values[0], values.size());
Chris@16 295 }
Chris@16 296
Chris@16 297 template<typename T>
Chris@16 298 bool
Chris@16 299 mpi_process_group::receive_impl(int source, int tag, T& value,
Chris@16 300 mpl::true_ /*is_mpi_datatype*/) const
Chris@16 301 {
Chris@16 302 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 303 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
Chris@16 304 << tag << std::endl;
Chris@16 305 #endif
Chris@16 306
Chris@16 307 impl::incoming_messages& incoming = impl_->incoming[source];
Chris@16 308
Chris@16 309 // Find the next header with the right tag
Chris@16 310 std::vector<impl::message_header>::iterator header =
Chris@16 311 incoming.next_header[my_block_number()];
Chris@16 312 while (header != incoming.headers.end() && header->tag != tag) ++header;
Chris@16 313
Chris@16 314 // If no header is found, notify the caller
Chris@16 315 if (header == incoming.headers.end()) return false;
Chris@16 316
Chris@16 317 // Unpack the data
Chris@16 318 if (header->bytes > 0) {
Chris@16 319 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
Chris@16 320 archive::no_header, header->offset);
Chris@16 321 ia >> value;
Chris@16 322 }
Chris@16 323
Chris@16 324 // Mark this message as received
Chris@16 325 header->tag = -1;
Chris@16 326
Chris@16 327 // Move the "next header" indicator to the next unreceived message
Chris@16 328 while (incoming.next_header[my_block_number()] != incoming.headers.end()
Chris@16 329 && incoming.next_header[my_block_number()]->tag == -1)
Chris@16 330 ++incoming.next_header[my_block_number()];
Chris@16 331
Chris@16 332 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
Chris@16 333 bool finished = true;
Chris@16 334 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
Chris@16 335 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
Chris@16 336 }
Chris@16 337
Chris@16 338 if (finished) {
Chris@16 339 std::vector<impl::message_header> no_headers;
Chris@16 340 incoming.headers.swap(no_headers);
Chris@16 341 buffer_type empty_buffer;
Chris@16 342 incoming.buffer.swap(empty_buffer);
Chris@16 343 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
Chris@16 344 incoming.next_header[i] = incoming.headers.end();
Chris@16 345 }
Chris@16 346 }
Chris@16 347
Chris@16 348 return true;
Chris@16 349 }
Chris@16 350
Chris@16 351 template<typename T>
Chris@16 352 bool
Chris@16 353 mpi_process_group::receive_impl(int source, int tag, T& value,
Chris@16 354 mpl::false_ /*is_mpi_datatype*/) const
Chris@16 355 {
Chris@16 356 impl::incoming_messages& incoming = impl_->incoming[source];
Chris@16 357
Chris@16 358 // Find the next header with the right tag
Chris@16 359 std::vector<impl::message_header>::iterator header =
Chris@16 360 incoming.next_header[my_block_number()];
Chris@16 361 while (header != incoming.headers.end() && header->tag != tag) ++header;
Chris@16 362
Chris@16 363 // If no header is found, notify the caller
Chris@16 364 if (header == incoming.headers.end()) return false;
Chris@16 365
Chris@16 366 // Deserialize the data
Chris@16 367 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
Chris@16 368 archive::no_header, header->offset);
Chris@16 369 in >> value;
Chris@16 370
Chris@16 371 // Mark this message as received
Chris@16 372 header->tag = -1;
Chris@16 373
Chris@16 374 // Move the "next header" indicator to the next unreceived message
Chris@16 375 while (incoming.next_header[my_block_number()] != incoming.headers.end()
Chris@16 376 && incoming.next_header[my_block_number()]->tag == -1)
Chris@16 377 ++incoming.next_header[my_block_number()];
Chris@16 378
Chris@16 379 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
Chris@16 380 bool finished = true;
Chris@16 381 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
Chris@16 382 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
Chris@16 383 }
Chris@16 384
Chris@16 385 if (finished) {
Chris@16 386 std::vector<impl::message_header> no_headers;
Chris@16 387 incoming.headers.swap(no_headers);
Chris@16 388 buffer_type empty_buffer;
Chris@16 389 incoming.buffer.swap(empty_buffer);
Chris@16 390 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
Chris@16 391 incoming.next_header[i] = incoming.headers.end();
Chris@16 392 }
Chris@16 393 }
Chris@16 394
Chris@16 395 return true;
Chris@16 396 }
Chris@16 397
Chris@16 398 template<typename T>
Chris@16 399 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
Chris@16 400 mpi_process_group::
Chris@16 401 array_receive_impl(int source, int tag, T* values, std::size_t& n) const
Chris@16 402 {
Chris@16 403 impl::incoming_messages& incoming = impl_->incoming[source];
Chris@16 404
Chris@16 405 // Find the next header with the right tag
Chris@16 406 std::vector<impl::message_header>::iterator header =
Chris@16 407 incoming.next_header[my_block_number()];
Chris@16 408 while (header != incoming.headers.end() && header->tag != tag) ++header;
Chris@16 409
Chris@16 410 // If no header is found, notify the caller
Chris@16 411 if (header == incoming.headers.end()) return false;
Chris@16 412
Chris@16 413 // Deserialize the data
Chris@16 414 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
Chris@16 415 archive::no_header, header->offset);
Chris@16 416 std::size_t num_sent;
Chris@16 417 in >> num_sent;
Chris@16 418 if (num_sent > n)
Chris@16 419 std::cerr << "ERROR: Have " << num_sent << " items but only space for "
Chris@16 420 << n << " items\n";
Chris@16 421
Chris@16 422 for (std::size_t i = 0; i < num_sent; ++i)
Chris@16 423 in >> values[i];
Chris@16 424 n = num_sent;
Chris@16 425
Chris@16 426 // Mark this message as received
Chris@16 427 header->tag = -1;
Chris@16 428
Chris@16 429 // Move the "next header" indicator to the next unreceived message
Chris@16 430 while (incoming.next_header[my_block_number()] != incoming.headers.end()
Chris@16 431 && incoming.next_header[my_block_number()]->tag == -1)
Chris@16 432 ++incoming.next_header[my_block_number()];
Chris@16 433
Chris@16 434 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
Chris@16 435 bool finished = true;
Chris@16 436 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
Chris@16 437 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
Chris@16 438 }
Chris@16 439
Chris@16 440 if (finished) {
Chris@16 441 std::vector<impl::message_header> no_headers;
Chris@16 442 incoming.headers.swap(no_headers);
Chris@16 443 buffer_type empty_buffer;
Chris@16 444 incoming.buffer.swap(empty_buffer);
Chris@16 445 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
Chris@16 446 incoming.next_header[i] = incoming.headers.end();
Chris@16 447 }
Chris@16 448 }
Chris@16 449
Chris@16 450 return true;
Chris@16 451 }
Chris@16 452
Chris@16 453 // Construct triggers
Chris@16 454 template<typename Type, typename Handler>
Chris@16 455 void mpi_process_group::trigger(int tag, const Handler& handler)
Chris@16 456 {
Chris@16 457 BOOST_ASSERT(block_num);
Chris@16 458 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
Chris@16 459 new trigger_launcher<Type, Handler>(*this, tag, handler)));
Chris@16 460 }
Chris@16 461
Chris@16 462 template<typename Type, typename Handler>
Chris@16 463 void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
Chris@16 464 {
Chris@16 465 BOOST_ASSERT(block_num);
Chris@16 466 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
Chris@16 467 new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
Chris@16 468 }
Chris@16 469
Chris@16 470 template<typename Type, typename Handler>
Chris@16 471 void mpi_process_group::global_trigger(int tag, const Handler& handler,
Chris@16 472 std::size_t sz)
Chris@16 473 {
Chris@16 474 if (sz==0) // normal trigger
Chris@16 475 install_trigger(tag,0,shared_ptr<trigger_base>(
Chris@16 476 new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
Chris@16 477 else // trigger with irecv
Chris@16 478 install_trigger(tag,0,shared_ptr<trigger_base>(
Chris@16 479 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
Chris@16 480
Chris@16 481 }
Chris@16 482
Chris@16 483 namespace detail {
Chris@16 484
Chris@16 485 template<typename Type>
Chris@16 486 void do_oob_receive(mpi_process_group const& self,
Chris@16 487 int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
Chris@16 488 {
Chris@16 489 using boost::mpi::get_mpi_datatype;
Chris@16 490
Chris@16 491 //self.impl_->comm.recv(source,tag,data);
Chris@16 492 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
Chris@16 493 MPI_STATUS_IGNORE);
Chris@16 494 }
Chris@16 495
Chris@16 496 template<typename Type>
Chris@16 497 void do_oob_receive(mpi_process_group const& self,
Chris@16 498 int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
Chris@16 499 {
Chris@16 500 // self.impl_->comm.recv(source,tag,data);
Chris@16 501 // Receive the size of the data packet
Chris@16 502 boost::mpi::status status;
Chris@16 503 status = self.impl_->comm.probe(source, tag);
Chris@16 504
Chris@16 505 #if BOOST_VERSION >= 103600
Chris@16 506 int size = status.count<boost::mpi::packed>().get();
Chris@16 507 #else
Chris@16 508 int size;
Chris@16 509 MPI_Status& mpi_status = status;
Chris@16 510 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
Chris@16 511 #endif
Chris@16 512
Chris@16 513 // Receive the data packed itself
Chris@16 514 boost::mpi::packed_iarchive in(self.impl_->comm);
Chris@16 515 in.resize(size);
Chris@16 516 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
Chris@16 517 MPI_STATUS_IGNORE);
Chris@16 518
Chris@16 519 // Deserialize the data
Chris@16 520 in >> data;
Chris@16 521 }
Chris@16 522
Chris@16 523 template<typename Type>
Chris@16 524 void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
Chris@16 525 {
Chris@16 526 do_oob_receive(self, source, tag, data,
Chris@16 527 boost::mpi::is_mpi_datatype<Type>());
Chris@16 528 }
Chris@16 529
Chris@16 530
Chris@16 531 } // namespace detail
Chris@16 532
Chris@16 533
Chris@16 534 template<typename Type, typename Handler>
Chris@16 535 void
Chris@16 536 mpi_process_group::trigger_launcher<Type, Handler>::
Chris@16 537 receive(mpi_process_group const&, int source, int tag,
Chris@16 538 trigger_receive_context context, int block) const
Chris@16 539 {
Chris@16 540 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 541 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
Chris@16 542 << " receive from source " << source << " and tag " << tag
Chris@16 543 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
Chris@16 544 #endif
Chris@16 545
Chris@16 546 Type data;
Chris@16 547
Chris@16 548 if (context == trc_out_of_band) {
Chris@16 549 // Receive the message directly off the wire
Chris@16 550 int realtag = self.encode_tag(
Chris@16 551 block == -1 ? self.my_block_number() : block, tag);
Chris@16 552 detail::do_oob_receive(self,source,realtag,data);
Chris@16 553 }
Chris@16 554 else
Chris@16 555 // Receive the message out of the local buffer
Chris@16 556 boost::graph::distributed::receive(self, source, tag, data);
Chris@16 557
Chris@16 558 // Pass the message off to the handler
Chris@16 559 handler(source, tag, data, context);
Chris@16 560 }
Chris@16 561
Chris@16 562 template<typename Type, typename Handler>
Chris@16 563 void
Chris@16 564 mpi_process_group::reply_trigger_launcher<Type, Handler>::
Chris@16 565 receive(mpi_process_group const&, int source, int tag,
Chris@16 566 trigger_receive_context context, int block) const
Chris@16 567 {
Chris@16 568 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 569 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
Chris@16 570 << " receive from source " << source << " and tag " << tag
Chris@16 571 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
Chris@16 572 #endif
Chris@16 573 BOOST_ASSERT(context == trc_out_of_band);
Chris@16 574
Chris@16 575 boost::parallel::detail::untracked_pair<int, Type> data;
Chris@16 576
Chris@16 577 // Receive the message directly off the wire
Chris@16 578 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
Chris@16 579 tag);
Chris@16 580 detail::do_oob_receive(self, source, realtag, data);
Chris@16 581
Chris@16 582 // Pass the message off to the handler and send the result back to
Chris@16 583 // the source.
Chris@16 584 send_oob(self, source, data.first,
Chris@16 585 handler(source, tag, data.second, context), -2);
Chris@16 586 }
Chris@16 587
Chris@16 588 template<typename Type, typename Handler>
Chris@16 589 void
Chris@16 590 mpi_process_group::global_trigger_launcher<Type, Handler>::
Chris@16 591 receive(mpi_process_group const& self, int source, int tag,
Chris@16 592 trigger_receive_context context, int block) const
Chris@16 593 {
Chris@16 594 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 595 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
Chris@16 596 << " receive from source " << source << " and tag " << tag
Chris@16 597 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
Chris@16 598 #endif
Chris@16 599
Chris@16 600 Type data;
Chris@16 601
Chris@16 602 if (context == trc_out_of_band) {
Chris@16 603 // Receive the message directly off the wire
Chris@16 604 int realtag = self.encode_tag(
Chris@16 605 block == -1 ? self.my_block_number() : block, tag);
Chris@16 606 detail::do_oob_receive(self,source,realtag,data);
Chris@16 607 }
Chris@16 608 else
Chris@16 609 // Receive the message out of the local buffer
Chris@16 610 boost::graph::distributed::receive(self, source, tag, data);
Chris@16 611
Chris@16 612 // Pass the message off to the handler
Chris@16 613 handler(self, source, tag, data, context);
Chris@16 614 }
Chris@16 615
Chris@16 616
Chris@16 617 template<typename Type, typename Handler>
Chris@16 618 void
Chris@16 619 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
Chris@16 620 receive(mpi_process_group const& self, int source, int tag,
Chris@16 621 trigger_receive_context context, int block) const
Chris@16 622 {
Chris@16 623 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 624 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
Chris@16 625 << " receive from source " << source << " and tag " << tag
Chris@16 626 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
Chris@16 627 #endif
Chris@16 628
Chris@16 629 Type data;
Chris@16 630
Chris@16 631 if (context == trc_out_of_band) {
Chris@16 632 return;
Chris@16 633 }
Chris@16 634 BOOST_ASSERT (context == trc_irecv_out_of_band);
Chris@16 635
Chris@16 636 // force posting of new MPI_Irecv, even though buffer is already allocated
Chris@16 637 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
Chris@16 638 ia >> data;
Chris@16 639 // Start a new receive
Chris@16 640 prepare_receive(self,tag,true);
Chris@16 641 // Pass the message off to the handler
Chris@16 642 handler(self, source, tag, data, context);
Chris@16 643 }
Chris@16 644
Chris@16 645
Chris@16 646 template<typename Type, typename Handler>
Chris@16 647 void
Chris@16 648 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
Chris@16 649 prepare_receive(mpi_process_group const& self, int tag, bool force) const
Chris@16 650 {
Chris@16 651 #ifdef PBGL_PROCESS_GROUP_DEBUG
Chris@16 652 std::cerr << ("Posting Irecv for trigger")
Chris@16 653 << " receive with tag " << tag << std::endl;
Chris@16 654 #endif
Chris@16 655 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
Chris@16 656 self.impl_->buffers[tag].resize(buffer_size);
Chris@16 657 force = true;
Chris@16 658 }
Chris@16 659 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
Chris@16 660
Chris@16 661 //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
Chris@16 662 if (force) {
Chris@16 663 self.impl_->requests.push_back(MPI_Request());
Chris@16 664 MPI_Request* request = &self.impl_->requests.back();
Chris@16 665 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
Chris@16 666 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
Chris@16 667 }
Chris@16 668 }
Chris@16 669
Chris@16 670
Chris@16 671 template<typename T>
Chris@16 672 inline mpi_process_group::process_id_type
Chris@16 673 receive(const mpi_process_group& pg, int tag, T& value)
Chris@16 674 {
Chris@16 675 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
Chris@16 676 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 677 value, boost::mpi::is_mpi_datatype<T>()))
Chris@16 678 return source;
Chris@16 679 }
Chris@16 680 BOOST_ASSERT (false);
Chris@16 681 }
Chris@16 682
Chris@16 683 template<typename T>
Chris@16 684 typename
Chris@16 685 enable_if<boost::mpi::is_mpi_datatype<T>,
Chris@16 686 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
Chris@16 687 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
Chris@16 688 {
Chris@16 689 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
Chris@16 690 bool result =
Chris@16 691 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 692 boost::serialization::make_array(values,n),
Chris@16 693 boost::mpl::true_());
Chris@16 694 if (result)
Chris@16 695 return std::make_pair(source, n);
Chris@16 696 }
Chris@16 697 BOOST_ASSERT(false);
Chris@16 698 }
Chris@16 699
Chris@16 700 template<typename T>
Chris@16 701 typename
Chris@16 702 disable_if<boost::mpi::is_mpi_datatype<T>,
Chris@16 703 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
Chris@16 704 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
Chris@16 705 {
Chris@16 706 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
Chris@16 707 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 708 values, n))
Chris@16 709 return std::make_pair(source, n);
Chris@16 710 }
Chris@16 711 BOOST_ASSERT(false);
Chris@16 712 }
Chris@16 713
Chris@16 714 template<typename T>
Chris@16 715 mpi_process_group::process_id_type
Chris@16 716 receive(const mpi_process_group& pg,
Chris@16 717 mpi_process_group::process_id_type source, int tag, T& value)
Chris@16 718 {
Chris@16 719 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 720 value, boost::mpi::is_mpi_datatype<T>()))
Chris@16 721 return source;
Chris@16 722 else {
Chris@16 723 fprintf(stderr,
Chris@16 724 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
Chris@16 725 process_id(pg), source, tag, pg.my_block_number());
Chris@16 726
Chris@16 727 BOOST_ASSERT(false);
Chris@16 728 abort();
Chris@16 729 }
Chris@16 730 }
Chris@16 731
Chris@16 732 template<typename T>
Chris@16 733 typename
Chris@16 734 enable_if<boost::mpi::is_mpi_datatype<T>,
Chris@16 735 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
Chris@16 736 receive(const mpi_process_group& pg, int source, int tag, T values[],
Chris@16 737 std::size_t n)
Chris@16 738 {
Chris@16 739 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 740 boost::serialization::make_array(values,n),
Chris@16 741 boost::mpl::true_()))
Chris@16 742 return std::make_pair(source,n);
Chris@16 743 else {
Chris@16 744 fprintf(stderr,
Chris@16 745 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
Chris@16 746 process_id(pg), source, tag, pg.my_block_number());
Chris@16 747
Chris@16 748 BOOST_ASSERT(false);
Chris@16 749 abort();
Chris@16 750 }
Chris@16 751 }
Chris@16 752
Chris@16 753 template<typename T>
Chris@16 754 typename
Chris@16 755 disable_if<boost::mpi::is_mpi_datatype<T>,
Chris@16 756 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
Chris@16 757 receive(const mpi_process_group& pg, int source, int tag, T values[],
Chris@16 758 std::size_t n)
Chris@16 759 {
Chris@16 760 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
Chris@16 761 values, n);
Chris@16 762
Chris@16 763 return std::make_pair(source, n);
Chris@16 764 }
Chris@16 765
Chris@16 766 template<typename T, typename BinaryOperation>
Chris@16 767 T*
Chris@16 768 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
Chris@16 769 BinaryOperation bin_op)
Chris@16 770 {
Chris@16 771 synchronize(pg);
Chris@16 772
Chris@16 773 bool inplace = first == out;
Chris@16 774
Chris@16 775 if (inplace) out = new T [last-first];
Chris@16 776
Chris@16 777 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
Chris@16 778 boost::mpi::comm_attach),
Chris@16 779 first, last-first, out, bin_op);
Chris@16 780
Chris@16 781 if (inplace) {
Chris@16 782 std::copy(out, out + (last-first), first);
Chris@16 783 delete [] out;
Chris@16 784 return last;
Chris@16 785 }
Chris@16 786
Chris@16 787 return out;
Chris@16 788 }
Chris@16 789
Chris@16 790 template<typename T>
Chris@16 791 void
Chris@16 792 broadcast(const mpi_process_group& pg, T& val,
Chris@16 793 mpi_process_group::process_id_type root)
Chris@16 794 {
Chris@16 795 // broadcast the seed
Chris@16 796 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
Chris@16 797 boost::mpi::broadcast(comm,val,root);
Chris@16 798 }
Chris@16 799
Chris@16 800
Chris@16 801 template<typename T, typename BinaryOperation>
Chris@16 802 T*
Chris@16 803 scan(const mpi_process_group& pg, T* first, T* last, T* out,
Chris@16 804 BinaryOperation bin_op)
Chris@16 805 {
Chris@16 806 synchronize(pg);
Chris@16 807
Chris@16 808 bool inplace = first == out;
Chris@16 809
Chris@16 810 if (inplace) out = new T [last-first];
Chris@16 811
Chris@16 812 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
Chris@16 813
Chris@16 814 if (inplace) {
Chris@16 815 std::copy(out, out + (last-first), first);
Chris@16 816 delete [] out;
Chris@16 817 return last;
Chris@16 818 }
Chris@16 819
Chris@16 820 return out;
Chris@16 821 }
Chris@16 822
Chris@16 823
Chris@16 824 template<typename InputIterator, typename T>
Chris@16 825 void
Chris@16 826 all_gather(const mpi_process_group& pg, InputIterator first,
Chris@16 827 InputIterator last, std::vector<T>& out)
Chris@16 828 {
Chris@16 829 synchronize(pg);
Chris@16 830
Chris@16 831 // Stick a copy of the local values into a vector, so we can broadcast it
Chris@16 832 std::vector<T> local_values(first, last);
Chris@16 833
Chris@16 834 // Collect the number of vertices stored in each process
Chris@16 835 int size = local_values.size();
Chris@16 836 std::vector<int> sizes(num_processes(pg));
Chris@16 837 int result = MPI_Allgather(&size, 1, MPI_INT,
Chris@16 838 &sizes[0], 1, MPI_INT,
Chris@16 839 communicator(pg));
Chris@16 840 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 841
Chris@16 842 // Adjust sizes based on the number of bytes
Chris@16 843 std::transform(sizes.begin(), sizes.end(), sizes.begin(),
Chris@16 844 std::bind2nd(std::multiplies<int>(), sizeof(T)));
Chris@16 845
Chris@16 846 // Compute displacements
Chris@16 847 std::vector<int> displacements;
Chris@16 848 displacements.reserve(sizes.size() + 1);
Chris@16 849 displacements.push_back(0);
Chris@16 850 std::partial_sum(sizes.begin(), sizes.end(),
Chris@16 851 std::back_inserter(displacements));
Chris@16 852
Chris@16 853 // Gather all of the values
Chris@16 854 out.resize(displacements.back() / sizeof(T));
Chris@16 855 if (!out.empty()) {
Chris@16 856 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
Chris@16 857 /* local results */: (void*)&local_values[0],
Chris@16 858 local_values.size() * sizeof(T),
Chris@16 859 MPI_BYTE,
Chris@16 860 &out[0], &sizes[0], &displacements[0], MPI_BYTE,
Chris@16 861 communicator(pg));
Chris@16 862 }
Chris@16 863 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 864 }
Chris@16 865
Chris@16 866 template<typename InputIterator>
Chris@16 867 mpi_process_group
Chris@16 868 process_subgroup(const mpi_process_group& pg,
Chris@16 869 InputIterator first, InputIterator last)
Chris@16 870 {
Chris@16 871 /*
Chris@16 872 boost::mpi::group current_group = communicator(pg).group();
Chris@16 873 boost::mpi::group new_group = current_group.include(first,last);
Chris@16 874 boost::mpi::communicator new_comm(communicator(pg),new_group);
Chris@16 875 return mpi_process_group(new_comm);
Chris@16 876 */
Chris@16 877 std::vector<int> ranks(first, last);
Chris@16 878
Chris@16 879 MPI_Group current_group;
Chris@16 880 int result = MPI_Comm_group(communicator(pg), &current_group);
Chris@16 881 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 882
Chris@16 883 MPI_Group new_group;
Chris@16 884 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
Chris@16 885 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 886
Chris@16 887 MPI_Comm new_comm;
Chris@16 888 result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
Chris@16 889 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 890
Chris@16 891 result = MPI_Group_free(&new_group);
Chris@16 892 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 893 result = MPI_Group_free(&current_group);
Chris@16 894 BOOST_ASSERT(result == MPI_SUCCESS);
Chris@16 895
Chris@16 896 if (new_comm != MPI_COMM_NULL) {
Chris@16 897 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
Chris@16 898 result = MPI_Comm_free(&new_comm);
Chris@16 899 BOOST_ASSERT(result == 0);
Chris@16 900 return result_pg;
Chris@16 901 } else {
Chris@16 902 return mpi_process_group(mpi_process_group::create_empty());
Chris@16 903 }
Chris@16 904
Chris@16 905 }
Chris@16 906
Chris@16 907
Chris@16 908 template<typename Receiver>
Chris@16 909 Receiver* mpi_process_group::get_receiver()
Chris@16 910 {
Chris@16 911 return impl_->blocks[my_block_number()]->on_receive
Chris@16 912 .template target<Receiver>();
Chris@16 913 }
Chris@16 914
Chris@16 915 template<typename T>
Chris@16 916 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 917 receive_oob(const mpi_process_group& pg,
Chris@16 918 mpi_process_group::process_id_type source, int tag, T& value, int block)
Chris@16 919 {
Chris@16 920 using boost::mpi::get_mpi_datatype;
Chris@16 921
Chris@16 922 // Determine the actual message we expect to receive, and which
Chris@16 923 // communicator it will come by.
Chris@16 924 std::pair<boost::mpi::communicator, int> actual
Chris@16 925 = pg.actual_communicator_and_tag(tag, block);
Chris@16 926
Chris@16 927 // Post a non-blocking receive that waits until we complete this request.
Chris@16 928 MPI_Request request;
Chris@16 929 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
Chris@16 930 source, actual.second, actual.first, &request);
Chris@16 931
Chris@16 932 int done = 0;
Chris@16 933 do {
Chris@16 934 MPI_Test(&request, &done, MPI_STATUS_IGNORE);
Chris@16 935 if (!done)
Chris@16 936 pg.poll(/*wait=*/false, block);
Chris@16 937 } while (!done);
Chris@16 938 }
Chris@16 939
Chris@16 940 template<typename T>
Chris@16 941 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 942 receive_oob(const mpi_process_group& pg,
Chris@16 943 mpi_process_group::process_id_type source, int tag, T& value, int block)
Chris@16 944 {
Chris@16 945 // Determine the actual message we expect to receive, and which
Chris@16 946 // communicator it will come by.
Chris@16 947 std::pair<boost::mpi::communicator, int> actual
Chris@16 948 = pg.actual_communicator_and_tag(tag, block);
Chris@16 949
Chris@16 950 boost::optional<boost::mpi::status> status;
Chris@16 951 do {
Chris@16 952 status = actual.first.iprobe(source, actual.second);
Chris@16 953 if (!status)
Chris@16 954 pg.poll();
Chris@16 955 } while (!status);
Chris@16 956
Chris@16 957 //actual.first.recv(status->source(), status->tag(),value);
Chris@16 958
Chris@16 959 // Allocate the receive buffer
Chris@16 960 boost::mpi::packed_iarchive in(actual.first);
Chris@16 961
Chris@16 962 #if BOOST_VERSION >= 103600
Chris@16 963 in.resize(status->count<boost::mpi::packed>().get());
Chris@16 964 #else
Chris@16 965 int size;
Chris@16 966 MPI_Status mpi_status = *status;
Chris@16 967 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
Chris@16 968 in.resize(size);
Chris@16 969 #endif
Chris@16 970
Chris@16 971 // Receive the message data
Chris@16 972 MPI_Recv(in.address(), in.size(), MPI_PACKED,
Chris@16 973 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
Chris@16 974
Chris@16 975 // Unpack the message data
Chris@16 976 in >> value;
Chris@16 977 }
Chris@16 978
Chris@16 979
Chris@16 980 template<typename SendT, typename ReplyT>
Chris@16 981 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
Chris@16 982 send_oob_with_reply(const mpi_process_group& pg,
Chris@16 983 mpi_process_group::process_id_type dest,
Chris@16 984 int tag, const SendT& send_value, ReplyT& reply_value,
Chris@16 985 int block)
Chris@16 986 {
Chris@16 987 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
Chris@16 988 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
Chris@16 989 (int)reply_tag, send_value), block);
Chris@16 990 receive_oob(pg, dest, reply_tag, reply_value);
Chris@16 991 }
Chris@16 992
Chris@16 993 template<typename SendT, typename ReplyT>
Chris@16 994 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
Chris@16 995 send_oob_with_reply(const mpi_process_group& pg,
Chris@16 996 mpi_process_group::process_id_type dest,
Chris@16 997 int tag, const SendT& send_value, ReplyT& reply_value,
Chris@16 998 int block)
Chris@16 999 {
Chris@16 1000 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
Chris@16 1001 send_oob(pg, dest, tag,
Chris@16 1002 boost::parallel::detail::make_untracked_pair((int)reply_tag,
Chris@16 1003 send_value), block);
Chris@16 1004 receive_oob(pg, dest, reply_tag, reply_value);
Chris@16 1005 }
Chris@16 1006
Chris@16 1007 } } } // end namespace boost::graph::distributed