annotate DEPENDENCIES/generic/include/boost/graph/distributed/mpi_process_group.hpp @ 125:34e428693f5d vext

Vext -> Repoint
author Chris Cannam
date Thu, 14 Jun 2018 11:15:39 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
Chris@16 2 // Copyright (C) 2007 Douglas Gregor
Chris@16 3 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
Chris@16 4
Chris@16 5 // Use, modification and distribution is subject to the Boost Software
Chris@16 6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 7 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 8
Chris@16 9 // Authors: Douglas Gregor
Chris@16 10 // Matthias Troyer
Chris@16 11 // Andrew Lumsdaine
Chris@16 12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
Chris@16 13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
Chris@16 14
Chris@16 15 #ifndef BOOST_GRAPH_USE_MPI
Chris@16 16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
Chris@16 17 #endif
Chris@16 18
Chris@16 19 //#define NO_SPLIT_BATCHES
Chris@16 20 #define SEND_OOB_BSEND
Chris@16 21
Chris@16 22 #include <boost/optional.hpp>
Chris@16 23 #include <boost/shared_ptr.hpp>
Chris@16 24 #include <boost/weak_ptr.hpp>
Chris@16 25 #include <utility>
Chris@16 26 #include <memory>
Chris@16 27 #include <boost/function/function1.hpp>
Chris@16 28 #include <boost/function/function2.hpp>
Chris@16 29 #include <boost/function/function0.hpp>
Chris@16 30 #include <boost/mpi.hpp>
Chris@101 31 #include <boost/property_map/parallel/process_group.hpp>
Chris@16 32 #include <boost/utility/enable_if.hpp>
Chris@16 33
Chris@16 34 namespace boost { namespace graph { namespace distributed {
Chris@16 35
Chris@16 36 // Process group tags
Chris@101 37 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
Chris@16 38
Chris@16 39 class mpi_process_group
Chris@16 40 {
Chris@16 41 struct impl;
Chris@16 42
Chris@16 43 public:
Chris@16 44 /// Number of tags available to each data structure.
Chris@16 45 static const int max_tags = 256;
Chris@16 46
Chris@16 47 /**
Chris@16 48 * The type of a "receive" handler, that will be provided with
Chris@16 49 * (source, tag) pairs when a message is received. Users can provide a
Chris@16 50 * receive handler for a distributed data structure, for example, to
Chris@16 51 * automatically pick up and respond to messages as needed.
Chris@16 52 */
Chris@16 53 typedef function<void(int source, int tag)> receiver_type;
Chris@16 54
Chris@16 55 /**
Chris@16 56 * The type of a handler for the on-synchronize event, which will be
Chris@16 57 * executed at the beginning of synchronize().
Chris@16 58 */
Chris@16 59 typedef function0<void> on_synchronize_event_type;
Chris@16 60
Chris@16 61 /// Used as a tag to help create an "empty" process group.
Chris@16 62 struct create_empty {};
Chris@16 63
Chris@16 64 /// The type used to buffer message data
Chris@16 65 typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
Chris@16 66
Chris@16 67 /// The type used to identify a process
Chris@16 68 typedef int process_id_type;
Chris@16 69
Chris@16 70 /// The type used to count the number of processes
Chris@16 71 typedef int process_size_type;
Chris@16 72
Chris@16 73 /// The type of communicator used to transmit data via MPI
Chris@16 74 typedef boost::mpi::communicator communicator_type;
Chris@16 75
Chris@16 76 /// Classification of the capabilities of this process group
Chris@16 77 struct communication_category
Chris@101 78 : virtual boost::parallel::bsp_process_group_tag,
Chris@16 79 virtual mpi_process_group_tag { };
Chris@16 80
Chris@16 81 // TBD: We can eliminate the "source" field and possibly the
Chris@16 82 // "offset" field.
Chris@16 83 struct message_header {
Chris@16 84 /// The process that sent the message
Chris@16 85 process_id_type source;
Chris@16 86
Chris@16 87 /// The message tag
Chris@16 88 int tag;
Chris@16 89
Chris@16 90 /// The offset of the message into the buffer
Chris@16 91 std::size_t offset;
Chris@16 92
Chris@16 93 /// The length of the message in the buffer, in bytes
Chris@16 94 std::size_t bytes;
Chris@16 95
Chris@16 96 template <class Archive>
Chris@16 97 void serialize(Archive& ar, int)
Chris@16 98 {
Chris@16 99 ar & source & tag & offset & bytes;
Chris@16 100 }
Chris@16 101 };
Chris@16 102
Chris@16 103 /**
Chris@16 104 * Stores the outgoing messages for a particular processor.
Chris@16 105 *
Chris@16 106 * @todo Evaluate whether we should use a deque instance, which
Chris@16 107 * would reduce could reduce the cost of "sending" messages but
Chris@16 108 * increases the time spent in the synchronization step.
Chris@16 109 */
Chris@16 110 struct outgoing_messages {
Chris@16 111 outgoing_messages() {}
Chris@16 112 ~outgoing_messages() {}
Chris@16 113
Chris@16 114 std::vector<message_header> headers;
Chris@16 115 buffer_type buffer;
Chris@16 116
Chris@16 117 template <class Archive>
Chris@16 118 void serialize(Archive& ar, int)
Chris@16 119 {
Chris@16 120 ar & headers & buffer;
Chris@16 121 }
Chris@16 122
Chris@16 123 void swap(outgoing_messages& x)
Chris@16 124 {
Chris@16 125 headers.swap(x.headers);
Chris@16 126 buffer.swap(x.buffer);
Chris@16 127 }
Chris@16 128 };
Chris@16 129
Chris@16 130 private:
Chris@16 131 /**
Chris@16 132 * Virtual base from which every trigger will be launched. See @c
Chris@16 133 * trigger_launcher for more information.
Chris@16 134 */
Chris@16 135 class trigger_base : boost::noncopyable
Chris@16 136 {
Chris@16 137 public:
Chris@16 138 explicit trigger_base(int tag) : tag_(tag) { }
Chris@16 139
Chris@16 140 /// Retrieve the tag associated with this trigger
Chris@16 141 int tag() const { return tag_; }
Chris@16 142
Chris@16 143 virtual ~trigger_base() { }
Chris@16 144
Chris@16 145 /**
Chris@16 146 * Invoked to receive a message that matches a particular trigger.
Chris@16 147 *
Chris@16 148 * @param source the source of the message
Chris@16 149 * @param tag the (local) tag of the message
Chris@16 150 * @param context the context under which the trigger is being
Chris@16 151 * invoked
Chris@16 152 */
Chris@16 153 virtual void
Chris@16 154 receive(mpi_process_group const& pg, int source, int tag,
Chris@16 155 trigger_receive_context context, int block=-1) const = 0;
Chris@16 156
Chris@16 157 protected:
Chris@16 158 // The message tag associated with this trigger
Chris@16 159 int tag_;
Chris@16 160 };
Chris@16 161
Chris@16 162 /**
Chris@16 163 * Launches a specific handler in response to a trigger. This
Chris@16 164 * function object wraps up the handler function object and a buffer
Chris@16 165 * for incoming data.
Chris@16 166 */
Chris@16 167 template<typename Type, typename Handler>
Chris@16 168 class trigger_launcher : public trigger_base
Chris@16 169 {
Chris@16 170 public:
Chris@16 171 explicit trigger_launcher(mpi_process_group& self, int tag,
Chris@16 172 const Handler& handler)
Chris@16 173 : trigger_base(tag), self(self), handler(handler)
Chris@16 174 {}
Chris@16 175
Chris@16 176 void
Chris@16 177 receive(mpi_process_group const& pg, int source, int tag,
Chris@16 178 trigger_receive_context context, int block=-1) const;
Chris@16 179
Chris@16 180 private:
Chris@16 181 mpi_process_group& self;
Chris@16 182 mutable Handler handler;
Chris@16 183 };
Chris@16 184
Chris@16 185 /**
Chris@16 186 * Launches a specific handler with a message reply in response to a
Chris@16 187 * trigger. This function object wraps up the handler function
Chris@16 188 * object and a buffer for incoming data.
Chris@16 189 */
Chris@16 190 template<typename Type, typename Handler>
Chris@16 191 class reply_trigger_launcher : public trigger_base
Chris@16 192 {
Chris@16 193 public:
Chris@16 194 explicit reply_trigger_launcher(mpi_process_group& self, int tag,
Chris@16 195 const Handler& handler)
Chris@16 196 : trigger_base(tag), self(self), handler(handler)
Chris@16 197 {}
Chris@16 198
Chris@16 199 void
Chris@16 200 receive(mpi_process_group const& pg, int source, int tag,
Chris@16 201 trigger_receive_context context, int block=-1) const;
Chris@16 202
Chris@16 203 private:
Chris@16 204 mpi_process_group& self;
Chris@16 205 mutable Handler handler;
Chris@16 206 };
Chris@16 207
Chris@16 208 template<typename Type, typename Handler>
Chris@16 209 class global_trigger_launcher : public trigger_base
Chris@16 210 {
Chris@16 211 public:
Chris@16 212 explicit global_trigger_launcher(mpi_process_group& self, int tag,
Chris@16 213 const Handler& handler)
Chris@16 214 : trigger_base(tag), handler(handler)
Chris@16 215 {
Chris@16 216 }
Chris@16 217
Chris@16 218 void
Chris@16 219 receive(mpi_process_group const& pg, int source, int tag,
Chris@16 220 trigger_receive_context context, int block=-1) const;
Chris@16 221
Chris@16 222 private:
Chris@16 223 mutable Handler handler;
Chris@16 224 // TBD: do not forget to cancel any outstanding Irecv when deleted,
Chris@16 225 // if we decide to use Irecv
Chris@16 226 };
Chris@16 227
Chris@16 228 template<typename Type, typename Handler>
Chris@16 229 class global_irecv_trigger_launcher : public trigger_base
Chris@16 230 {
Chris@16 231 public:
Chris@16 232 explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
Chris@16 233 const Handler& handler, int sz)
Chris@16 234 : trigger_base(tag), handler(handler), buffer_size(sz)
Chris@16 235 {
Chris@16 236 prepare_receive(self,tag);
Chris@16 237 }
Chris@16 238
Chris@16 239 void
Chris@16 240 receive(mpi_process_group const& pg, int source, int tag,
Chris@16 241 trigger_receive_context context, int block=-1) const;
Chris@16 242
Chris@16 243 private:
Chris@16 244 void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
Chris@16 245 Handler handler;
Chris@16 246 int buffer_size;
Chris@16 247 // TBD: do not forget to cancel any outstanding Irecv when deleted,
Chris@16 248 // if we decide to use Irecv
Chris@16 249 };
Chris@16 250
Chris@16 251 public:
Chris@16 252 /**
Chris@16 253 * Construct a new BSP process group from an MPI communicator. The
Chris@16 254 * MPI communicator will be duplicated to create a new communicator
Chris@16 255 * for this process group to use.
Chris@16 256 */
Chris@16 257 mpi_process_group(communicator_type parent_comm = communicator_type());
Chris@16 258
Chris@16 259 /**
Chris@16 260 * Construct a new BSP process group from an MPI communicator. The
Chris@16 261 * MPI communicator will be duplicated to create a new communicator
Chris@16 262 * for this process group to use. This constructor allows to tune the
Chris@16 263 * size of message batches.
Chris@16 264 *
Chris@16 265 * @param num_headers The maximum number of headers in a message batch
Chris@16 266 *
Chris@16 267 * @param buffer_size The maximum size of the message buffer in a batch.
Chris@16 268 *
Chris@16 269 */
Chris@16 270 mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
Chris@16 271 communicator_type parent_comm = communicator_type());
Chris@16 272
Chris@16 273 /**
Chris@16 274 * Construct a copy of the BSP process group for a new distributed
Chris@16 275 * data structure. This data structure will synchronize with all
Chris@16 276 * other members of the process group's equivalence class (including
Chris@16 277 * @p other), but will have its own set of tags.
Chris@16 278 *
Chris@16 279 * @param other The process group that this new process group will
Chris@16 280 * be based on, using a different set of tags within the same
Chris@16 281 * communication and synchronization space.
Chris@16 282 *
Chris@16 283 * @param handler A message handler that will be passed (source,
Chris@16 284 * tag) pairs for each message received by this data
Chris@16 285 * structure. The handler is expected to receive the messages
Chris@16 286 * immediately. The handler can be changed after-the-fact by
Chris@16 287 * calling @c replace_handler.
Chris@16 288 *
Chris@16 289 * @param out_of_band_receive An anachronism. TODO: remove this.
Chris@16 290 */
Chris@16 291 mpi_process_group(const mpi_process_group& other,
Chris@16 292 const receiver_type& handler,
Chris@16 293 bool out_of_band_receive = false);
Chris@16 294
Chris@16 295 /**
Chris@16 296 * Construct a copy of the BSP process group for a new distributed
Chris@16 297 * data structure. This data structure will synchronize with all
Chris@16 298 * other members of the process group's equivalence class (including
Chris@16 299 * @p other), but will have its own set of tags.
Chris@16 300 */
Chris@16 301 mpi_process_group(const mpi_process_group& other,
Chris@16 302 attach_distributed_object,
Chris@16 303 bool out_of_band_receive = false);
Chris@16 304
Chris@16 305 /**
Chris@16 306 * Create an "empty" process group, with no information. This is an
Chris@16 307 * internal routine that users should never need.
Chris@16 308 */
Chris@16 309 explicit mpi_process_group(create_empty) {}
Chris@16 310
Chris@16 311 /**
Chris@16 312 * Destroys this copy of the process group.
Chris@16 313 */
Chris@16 314 ~mpi_process_group();
Chris@16 315
Chris@16 316 /**
Chris@16 317 * Replace the current message handler with a new message handler.
Chris@16 318 *
Chris@16 319 * @param handle The new message handler.
Chris@16 320 * @param out_of_band_receive An anachronism: remove this
Chris@16 321 */
Chris@16 322 void replace_handler(const receiver_type& handler,
Chris@16 323 bool out_of_band_receive = false);
Chris@16 324
Chris@16 325 /**
Chris@16 326 * Turns this process group into the process group for a new
Chris@16 327 * distributed data structure or object, allocating its own tag
Chris@16 328 * block.
Chris@16 329 */
Chris@16 330 void make_distributed_object();
Chris@16 331
Chris@16 332 /**
Chris@16 333 * Replace the handler to be invoked at the beginning of synchronize.
Chris@16 334 */
Chris@16 335 void
Chris@16 336 replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
Chris@16 337
Chris@16 338 /**
Chris@16 339 * Return the block number of the current data structure. A value of
Chris@16 340 * 0 indicates that this particular instance of the process group is
Chris@16 341 * not associated with any distributed data structure.
Chris@16 342 */
Chris@16 343 int my_block_number() const { return block_num? *block_num : 0; }
Chris@16 344
Chris@16 345 /**
Chris@16 346 * Encode a block number/tag pair into a single encoded tag for
Chris@16 347 * transmission.
Chris@16 348 */
Chris@16 349 int encode_tag(int block_num, int tag) const
Chris@16 350 { return block_num * max_tags + tag; }
Chris@16 351
Chris@16 352 /**
Chris@16 353 * Decode an encoded tag into a block number/tag pair.
Chris@16 354 */
Chris@16 355 std::pair<int, int> decode_tag(int encoded_tag) const
Chris@16 356 { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
Chris@16 357
Chris@16 358 // @todo Actually write up the friend declarations so these could be
Chris@16 359 // private.
Chris@16 360
Chris@16 361 // private:
Chris@16 362
Chris@16 363 /** Allocate a block of tags for this instance. The block should not
Chris@16 364 * have been allocated already, e.g., my_block_number() ==
Chris@16 365 * 0. Returns the newly-allocated block number.
Chris@16 366 */
Chris@16 367 int allocate_block(bool out_of_band_receive = false);
Chris@16 368
Chris@16 369 /** Potentially emit a receive event out of band. Returns true if an event
Chris@16 370 * was actually sent, false otherwise.
Chris@16 371 */
Chris@16 372 bool maybe_emit_receive(int process, int encoded_tag) const;
Chris@16 373
Chris@16 374 /** Emit a receive event. Returns true if an event was actually
Chris@16 375 * sent, false otherwise.
Chris@16 376 */
Chris@16 377 bool emit_receive(int process, int encoded_tag) const;
Chris@16 378
Chris@16 379 /** Emit an on-synchronize event to all block handlers. */
Chris@16 380 void emit_on_synchronize() const;
Chris@16 381
Chris@16 382 /** Retrieve a reference to the stored receiver in this block. */
Chris@16 383 template<typename Receiver>
Chris@16 384 Receiver* get_receiver();
Chris@16 385
Chris@16 386 template<typename T>
Chris@16 387 void
Chris@16 388 send_impl(int dest, int tag, const T& value,
Chris@16 389 mpl::true_ /*is_mpi_datatype*/) const;
Chris@16 390
Chris@16 391 template<typename T>
Chris@16 392 void
Chris@16 393 send_impl(int dest, int tag, const T& value,
Chris@16 394 mpl::false_ /*is_mpi_datatype*/) const;
Chris@16 395
Chris@16 396 template<typename T>
Chris@16 397 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
Chris@16 398 array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
Chris@16 399
Chris@16 400 template<typename T>
Chris@16 401 bool
Chris@16 402 receive_impl(int source, int tag, T& value,
Chris@16 403 mpl::true_ /*is_mpi_datatype*/) const;
Chris@16 404
Chris@16 405 template<typename T>
Chris@16 406 bool
Chris@16 407 receive_impl(int source, int tag, T& value,
Chris@16 408 mpl::false_ /*is_mpi_datatype*/) const;
Chris@16 409
Chris@16 410 // Receive an array of values
Chris@16 411 template<typename T>
Chris@16 412 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
Chris@16 413 array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
Chris@16 414
Chris@16 415 optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
Chris@16 416
Chris@16 417 void synchronize() const;
Chris@16 418
Chris@16 419 operator bool() { return bool(impl_); }
Chris@16 420
Chris@16 421 mpi_process_group base() const;
Chris@16 422
Chris@16 423 /**
Chris@16 424 * Create a new trigger for a specific message tag. Triggers handle
Chris@16 425 * out-of-band messaging, and the handler itself will be called
Chris@16 426 * whenever a message is available. The handler itself accepts four
Chris@16 427 * arguments: the source of the message, the message tag (which will
Chris@16 428 * be the same as @p tag), the message data (of type @c Type), and a
Chris@16 429 * boolean flag that states whether the message was received
Chris@16 430 * out-of-band. The last will be @c true for out-of-band receives,
Chris@16 431 * or @c false for receives at the end of a synchronization step.
Chris@16 432 */
Chris@16 433 template<typename Type, typename Handler>
Chris@16 434 void trigger(int tag, const Handler& handler);
Chris@16 435
Chris@16 436 /**
Chris@16 437 * Create a new trigger for a specific message tag, along with a way
Chris@16 438 * to send a reply with data back to the sender. Triggers handle
Chris@16 439 * out-of-band messaging, and the handler itself will be called
Chris@16 440 * whenever a message is available. The handler itself accepts four
Chris@16 441 * arguments: the source of the message, the message tag (which will
Chris@16 442 * be the same as @p tag), the message data (of type @c Type), and a
Chris@16 443 * boolean flag that states whether the message was received
Chris@16 444 * out-of-band. The last will be @c true for out-of-band receives,
Chris@16 445 * or @c false for receives at the end of a synchronization
Chris@16 446 * step. The handler also returns a value, which will be routed back
Chris@16 447 * to the sender.
Chris@16 448 */
Chris@16 449 template<typename Type, typename Handler>
Chris@16 450 void trigger_with_reply(int tag, const Handler& handler);
Chris@16 451
Chris@16 452 template<typename Type, typename Handler>
Chris@16 453 void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
Chris@16 454
Chris@16 455
Chris@16 456
Chris@16 457 /**
Chris@16 458 * Poll for any out-of-band messages. This routine will check if any
Chris@16 459 * out-of-band messages are available. Those that are available will
Chris@16 460 * be handled immediately, if possible.
Chris@16 461 *
Chris@16 462 * @returns if an out-of-band message has been received, but we are
Chris@16 463 * unable to actually receive the message, a (source, tag) pair will
Chris@16 464 * be returned. Otherwise, returns an empty optional.
Chris@16 465 *
Chris@16 466 * @param wait When true, we should block until a message comes in.
Chris@16 467 *
Chris@16 468 * @param synchronizing whether we are currently synchronizing the
Chris@16 469 * process group
Chris@16 470 */
Chris@16 471 optional<std::pair<int, int> >
Chris@16 472 poll(bool wait = false, int block = -1, bool synchronizing = false) const;
Chris@16 473
Chris@16 474 /**
Chris@16 475 * Determines the context of the trigger currently executing. If
Chris@16 476 * multiple triggers are executing (recursively), then the context
Chris@16 477 * for the most deeply nested trigger will be returned. If no
Chris@16 478 * triggers are executing, returns @c trc_none. This might be used,
Chris@16 479 * for example, to determine whether a reply to a message should
Chris@16 480 * itself be sent out-of-band or whether it can go via the normal,
Chris@16 481 * slower communication route.
Chris@16 482 */
Chris@16 483 trigger_receive_context trigger_context() const;
Chris@16 484
Chris@16 485 /// INTERNAL ONLY
Chris@16 486 void receive_batch(process_id_type source, outgoing_messages& batch) const;
Chris@16 487
Chris@16 488 /// INTERNAL ONLY
Chris@16 489 ///
Chris@16 490 /// Determine the actual communicator and tag will be used for a
Chris@16 491 /// transmission with the given tag.
Chris@16 492 std::pair<boost::mpi::communicator, int>
Chris@16 493 actual_communicator_and_tag(int tag, int block) const;
Chris@16 494
Chris@16 495 /// set the size of the message buffer used for buffered oob sends
Chris@16 496
Chris@16 497 static void set_message_buffer_size(std::size_t s);
Chris@16 498
Chris@16 499 /// get the size of the message buffer used for buffered oob sends
Chris@16 500
Chris@16 501 static std::size_t message_buffer_size();
Chris@16 502 static int old_buffer_size;
Chris@16 503 static void* old_buffer;
Chris@16 504 private:
Chris@16 505
Chris@16 506 void install_trigger(int tag, int block,
Chris@16 507 shared_ptr<trigger_base> const& launcher);
Chris@16 508
Chris@16 509 void poll_requests(int block=-1) const;
Chris@16 510
Chris@16 511
Chris@16 512 // send a batch if the buffer is full now or would get full
Chris@16 513 void maybe_send_batch(process_id_type dest) const;
Chris@16 514
Chris@16 515 // actually send a batch
Chris@16 516 void send_batch(process_id_type dest, outgoing_messages& batch) const;
Chris@16 517 void send_batch(process_id_type dest) const;
Chris@16 518
Chris@16 519 void pack_headers() const;
Chris@16 520
Chris@16 521 /**
Chris@16 522 * Process a batch of incoming messages immediately.
Chris@16 523 *
Chris@16 524 * @param source the source of these messages
Chris@16 525 */
Chris@16 526 void process_batch(process_id_type source) const;
Chris@16 527 void receive_batch(boost::mpi::status& status) const;
Chris@16 528
Chris@16 529 //void free_finished_sends() const;
Chris@16 530
Chris@16 531 /// Status messages used internally by the process group
Chris@16 532 enum status_messages {
Chris@16 533 /// the first of the reserved message tags
Chris@16 534 msg_reserved_first = 126,
Chris@16 535 /// Sent from a processor when sending batched messages
Chris@16 536 msg_batch = 126,
Chris@16 537 /// Sent from a processor when sending large batched messages, larger than
Chris@16 538 /// the maximum buffer size for messages to be received by MPI_Irecv
Chris@16 539 msg_large_batch = 127,
Chris@16 540 /// Sent from a source processor to everyone else when that
Chris@16 541 /// processor has entered the synchronize() function.
Chris@16 542 msg_synchronizing = 128,
Chris@16 543 /// the last of the reserved message tags
Chris@16 544 msg_reserved_last = 128
Chris@16 545 };
Chris@16 546
Chris@16 547 /**
Chris@16 548 * Description of a block of tags associated to a particular
Chris@16 549 * distributed data structure. This structure will live as long as
Chris@16 550 * the distributed data structure is around, and will be used to
Chris@16 551 * help send messages to the data structure.
Chris@16 552 */
Chris@16 553 struct block_type
Chris@16 554 {
Chris@16 555 block_type() { }
Chris@16 556
Chris@16 557 /// Handler for receive events
Chris@16 558 receiver_type on_receive;
Chris@16 559
Chris@16 560 /// Handler executed at the start of synchronization
Chris@16 561 on_synchronize_event_type on_synchronize;
Chris@16 562
Chris@16 563 /// Individual message triggers. Note: at present, this vector is
Chris@16 564 /// indexed by the (local) tag of the trigger. Any tags that
Chris@16 565 /// don't have triggers will have NULL pointers in that spot.
Chris@16 566 std::vector<shared_ptr<trigger_base> > triggers;
Chris@16 567 };
Chris@16 568
Chris@16 569 /**
Chris@16 570 * Data structure containing all of the blocks for the distributed
Chris@16 571 * data structures attached to a process group.
Chris@16 572 */
Chris@16 573 typedef std::vector<block_type*> blocks_type;
Chris@16 574
Chris@16 575 /// Iterator into @c blocks_type.
Chris@16 576 typedef blocks_type::iterator block_iterator;
Chris@16 577
Chris@16 578 /**
Chris@16 579 * Deleter used to deallocate a block when its distributed data
Chris@16 580 * structure is destroyed. This type will be used as the deleter for
Chris@16 581 * @c block_num.
Chris@16 582 */
Chris@16 583 struct deallocate_block;
Chris@16 584
Chris@16 585 static std::vector<char> message_buffer;
Chris@16 586
Chris@16 587 public:
Chris@16 588 /**
Chris@16 589 * Data associated with the process group and all of its attached
Chris@16 590 * distributed data structures.
Chris@16 591 */
Chris@16 592 shared_ptr<impl> impl_;
Chris@16 593
Chris@16 594 /**
Chris@16 595 * When non-null, indicates that this copy of the process group is
Chris@16 596 * associated with a particular distributed data structure. The
Chris@16 597 * integer value contains the block number (a value > 0) associated
Chris@16 598 * with that data structure. The deleter for this @c shared_ptr is a
Chris@16 599 * @c deallocate_block object that will deallocate the associated
Chris@16 600 * block in @c impl_->blocks.
Chris@16 601 */
Chris@16 602 shared_ptr<int> block_num;
Chris@16 603
Chris@16 604 /**
Chris@16 605 * Rank of this process, to avoid having to call rank() repeatedly.
Chris@16 606 */
Chris@16 607 int rank;
Chris@16 608
Chris@16 609 /**
Chris@16 610 * Number of processes in this process group, to avoid having to
Chris@16 611 * call communicator::size() repeatedly.
Chris@16 612 */
Chris@16 613 int size;
Chris@16 614 };
Chris@16 615
Chris@16 616
Chris@16 617
Chris@16 618 inline mpi_process_group::process_id_type
Chris@16 619 process_id(const mpi_process_group& pg)
Chris@16 620 { return pg.rank; }
Chris@16 621
Chris@16 622 inline mpi_process_group::process_size_type
Chris@16 623 num_processes(const mpi_process_group& pg)
Chris@16 624 { return pg.size; }
Chris@16 625
Chris@16 626 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
Chris@16 627
Chris@16 628 template<typename T>
Chris@16 629 void
Chris@16 630 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 631 int tag, const T& value);
Chris@16 632
Chris@16 633 template<typename InputIterator>
Chris@16 634 void
Chris@16 635 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 636 int tag, InputIterator first, InputIterator last);
Chris@16 637
Chris@16 638 template<typename T>
Chris@16 639 inline void
Chris@16 640 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 641 int tag, T* first, T* last)
Chris@16 642 { send(pg, dest, tag, first, last - first); }
Chris@16 643
Chris@16 644 template<typename T>
Chris@16 645 inline void
Chris@16 646 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 647 int tag, const T* first, const T* last)
Chris@16 648 { send(pg, dest, tag, first, last - first); }
Chris@16 649
Chris@16 650 template<typename T>
Chris@16 651 mpi_process_group::process_id_type
Chris@16 652 receive(const mpi_process_group& pg, int tag, T& value);
Chris@16 653
Chris@16 654 template<typename T>
Chris@16 655 mpi_process_group::process_id_type
Chris@16 656 receive(const mpi_process_group& pg,
Chris@16 657 mpi_process_group::process_id_type source, int tag, T& value);
Chris@16 658
Chris@16 659 optional<std::pair<mpi_process_group::process_id_type, int> >
Chris@16 660 probe(const mpi_process_group& pg);
Chris@16 661
Chris@16 662 void synchronize(const mpi_process_group& pg);
Chris@16 663
Chris@16 664 template<typename T, typename BinaryOperation>
Chris@16 665 T*
Chris@16 666 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
Chris@16 667 BinaryOperation bin_op);
Chris@16 668
Chris@16 669 template<typename T, typename BinaryOperation>
Chris@16 670 T*
Chris@16 671 scan(const mpi_process_group& pg, T* first, T* last, T* out,
Chris@16 672 BinaryOperation bin_op);
Chris@16 673
Chris@16 674 template<typename InputIterator, typename T>
Chris@16 675 void
Chris@16 676 all_gather(const mpi_process_group& pg,
Chris@16 677 InputIterator first, InputIterator last, std::vector<T>& out);
Chris@16 678
Chris@16 679 template<typename InputIterator>
Chris@16 680 mpi_process_group
Chris@16 681 process_subgroup(const mpi_process_group& pg,
Chris@16 682 InputIterator first, InputIterator last);
Chris@16 683
Chris@16 684 template<typename T>
Chris@16 685 void
Chris@16 686 broadcast(const mpi_process_group& pg, T& val,
Chris@16 687 mpi_process_group::process_id_type root);
Chris@16 688
Chris@16 689
Chris@16 690 /*******************************************************************
Chris@16 691 * Out-of-band communication *
Chris@16 692 *******************************************************************/
Chris@16 693
Chris@16 694 template<typename T>
Chris@16 695 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 696 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 697 int tag, const T& value, int block=-1)
Chris@16 698 {
Chris@16 699 using boost::mpi::get_mpi_datatype;
Chris@16 700
Chris@16 701 // Determine the actual message tag we will use for the send, and which
Chris@16 702 // communicator we will use.
Chris@16 703 std::pair<boost::mpi::communicator, int> actual
Chris@16 704 = pg.actual_communicator_and_tag(tag, block);
Chris@16 705
Chris@16 706 #ifdef SEND_OOB_BSEND
Chris@16 707 if (mpi_process_group::message_buffer_size()) {
Chris@16 708 MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
Chris@16 709 actual.second, actual.first);
Chris@16 710 return;
Chris@16 711 }
Chris@16 712 #endif
Chris@16 713 MPI_Request request;
Chris@16 714 MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
Chris@16 715 actual.second, actual.first, &request);
Chris@16 716
Chris@16 717 int done=0;
Chris@16 718 do {
Chris@16 719 pg.poll();
Chris@16 720 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
Chris@16 721 } while (!done);
Chris@16 722 }
Chris@16 723
Chris@16 724 template<typename T>
Chris@16 725 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 726 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
Chris@16 727 int tag, const T& value, int block=-1)
Chris@16 728 {
Chris@16 729 using boost::mpi::packed_oarchive;
Chris@16 730
Chris@16 731 // Determine the actual message tag we will use for the send, and which
Chris@16 732 // communicator we will use.
Chris@16 733 std::pair<boost::mpi::communicator, int> actual
Chris@16 734 = pg.actual_communicator_and_tag(tag, block);
Chris@16 735
Chris@16 736 // Serialize the data into a buffer
Chris@16 737 packed_oarchive out(actual.first);
Chris@16 738 out << value;
Chris@16 739 std::size_t size = out.size();
Chris@16 740
Chris@16 741 // Send the actual message data
Chris@16 742 #ifdef SEND_OOB_BSEND
Chris@16 743 if (mpi_process_group::message_buffer_size()) {
Chris@16 744 MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
Chris@16 745 dest, actual.second, actual.first);
Chris@16 746 return;
Chris@16 747 }
Chris@16 748 #endif
Chris@16 749 MPI_Request request;
Chris@16 750 MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
Chris@16 751 dest, actual.second, actual.first, &request);
Chris@16 752
Chris@16 753 int done=0;
Chris@16 754 do {
Chris@16 755 pg.poll();
Chris@16 756 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
Chris@16 757 } while (!done);
Chris@16 758 }
Chris@16 759
Chris@16 760 template<typename T>
Chris@16 761 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 762 receive_oob(const mpi_process_group& pg,
Chris@16 763 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
Chris@16 764
Chris@16 765 template<typename T>
Chris@16 766 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
Chris@16 767 receive_oob(const mpi_process_group& pg,
Chris@16 768 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
Chris@16 769
Chris@16 770 template<typename SendT, typename ReplyT>
Chris@16 771 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
Chris@16 772 send_oob_with_reply(const mpi_process_group& pg,
Chris@16 773 mpi_process_group::process_id_type dest,
Chris@16 774 int tag, const SendT& send_value, ReplyT& reply_value,
Chris@16 775 int block = -1);
Chris@16 776
Chris@16 777 template<typename SendT, typename ReplyT>
Chris@16 778 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
Chris@16 779 send_oob_with_reply(const mpi_process_group& pg,
Chris@16 780 mpi_process_group::process_id_type dest,
Chris@16 781 int tag, const SendT& send_value, ReplyT& reply_value,
Chris@16 782 int block = -1);
Chris@16 783
Chris@16 784 } } } // end namespace boost::graph::distributed
Chris@16 785
Chris@16 786 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
Chris@16 787 namespace boost { namespace mpi {
Chris@16 788 template<>
Chris@16 789 struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
Chris@16 790 } } // end namespace boost::mpi
Chris@16 791
Chris@16 792 namespace std {
Chris@16 793 /// optimized swap for outgoing messages
Chris@16 794 inline void
Chris@16 795 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
Chris@16 796 boost::graph::distributed::mpi_process_group::outgoing_messages& y)
Chris@16 797 {
Chris@16 798 x.swap(y);
Chris@16 799 }
Chris@16 800
Chris@16 801
Chris@16 802 }
Chris@16 803
Chris@16 804 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
Chris@16 805 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
Chris@16 806
Chris@16 807 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
Chris@16 808
Chris@16 809 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP