Chris@16: // Copyright (C) 2004-2008 The Trustees of Indiana University. Chris@16: // Copyright (C) 2007 Douglas Gregor Chris@16: // Copyright (C) 2007 Matthias Troyer Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Matthias Troyer Chris@16: // Andrew Lumsdaine Chris@16: #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP Chris@16: #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: //#define NO_SPLIT_BATCHES Chris@16: #define SEND_OOB_BSEND Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: Chris@16: namespace boost { namespace graph { namespace distributed { Chris@16: Chris@16: // Process group tags Chris@101: struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { }; Chris@16: Chris@16: class mpi_process_group Chris@16: { Chris@16: struct impl; Chris@16: Chris@16: public: Chris@16: /// Number of tags available to each data structure. Chris@16: static const int max_tags = 256; Chris@16: Chris@16: /** Chris@16: * The type of a "receive" handler, that will be provided with Chris@16: * (source, tag) pairs when a message is received. Users can provide a Chris@16: * receive handler for a distributed data structure, for example, to Chris@16: * automatically pick up and respond to messages as needed. Chris@16: */ Chris@16: typedef function receiver_type; Chris@16: Chris@16: /** Chris@16: * The type of a handler for the on-synchronize event, which will be Chris@16: * executed at the beginning of synchronize(). Chris@16: */ Chris@16: typedef function0 on_synchronize_event_type; Chris@16: Chris@16: /// Used as a tag to help create an "empty" process group. Chris@16: struct create_empty {}; Chris@16: Chris@16: /// The type used to buffer message data Chris@16: typedef boost::mpi::packed_oprimitive::buffer_type buffer_type; Chris@16: Chris@16: /// The type used to identify a process Chris@16: typedef int process_id_type; Chris@16: Chris@16: /// The type used to count the number of processes Chris@16: typedef int process_size_type; Chris@16: Chris@16: /// The type of communicator used to transmit data via MPI Chris@16: typedef boost::mpi::communicator communicator_type; Chris@16: Chris@16: /// Classification of the capabilities of this process group Chris@16: struct communication_category Chris@101: : virtual boost::parallel::bsp_process_group_tag, Chris@16: virtual mpi_process_group_tag { }; Chris@16: Chris@16: // TBD: We can eliminate the "source" field and possibly the Chris@16: // "offset" field. Chris@16: struct message_header { Chris@16: /// The process that sent the message Chris@16: process_id_type source; Chris@16: Chris@16: /// The message tag Chris@16: int tag; Chris@16: Chris@16: /// The offset of the message into the buffer Chris@16: std::size_t offset; Chris@16: Chris@16: /// The length of the message in the buffer, in bytes Chris@16: std::size_t bytes; Chris@16: Chris@16: template Chris@16: void serialize(Archive& ar, int) Chris@16: { Chris@16: ar & source & tag & offset & bytes; Chris@16: } Chris@16: }; Chris@16: Chris@16: /** Chris@16: * Stores the outgoing messages for a particular processor. Chris@16: * Chris@16: * @todo Evaluate whether we should use a deque instance, which Chris@16: * would reduce could reduce the cost of "sending" messages but Chris@16: * increases the time spent in the synchronization step. Chris@16: */ Chris@16: struct outgoing_messages { Chris@16: outgoing_messages() {} Chris@16: ~outgoing_messages() {} Chris@16: Chris@16: std::vector headers; Chris@16: buffer_type buffer; Chris@16: Chris@16: template Chris@16: void serialize(Archive& ar, int) Chris@16: { Chris@16: ar & headers & buffer; Chris@16: } Chris@16: Chris@16: void swap(outgoing_messages& x) Chris@16: { Chris@16: headers.swap(x.headers); Chris@16: buffer.swap(x.buffer); Chris@16: } Chris@16: }; Chris@16: Chris@16: private: Chris@16: /** Chris@16: * Virtual base from which every trigger will be launched. See @c Chris@16: * trigger_launcher for more information. Chris@16: */ Chris@16: class trigger_base : boost::noncopyable Chris@16: { Chris@16: public: Chris@16: explicit trigger_base(int tag) : tag_(tag) { } Chris@16: Chris@16: /// Retrieve the tag associated with this trigger Chris@16: int tag() const { return tag_; } Chris@16: Chris@16: virtual ~trigger_base() { } Chris@16: Chris@16: /** Chris@16: * Invoked to receive a message that matches a particular trigger. Chris@16: * Chris@16: * @param source the source of the message Chris@16: * @param tag the (local) tag of the message Chris@16: * @param context the context under which the trigger is being Chris@16: * invoked Chris@16: */ Chris@16: virtual void Chris@16: receive(mpi_process_group const& pg, int source, int tag, Chris@16: trigger_receive_context context, int block=-1) const = 0; Chris@16: Chris@16: protected: Chris@16: // The message tag associated with this trigger Chris@16: int tag_; Chris@16: }; Chris@16: Chris@16: /** Chris@16: * Launches a specific handler in response to a trigger. This Chris@16: * function object wraps up the handler function object and a buffer Chris@16: * for incoming data. Chris@16: */ Chris@16: template Chris@16: class trigger_launcher : public trigger_base Chris@16: { Chris@16: public: Chris@16: explicit trigger_launcher(mpi_process_group& self, int tag, Chris@16: const Handler& handler) Chris@16: : trigger_base(tag), self(self), handler(handler) Chris@16: {} Chris@16: Chris@16: void Chris@16: receive(mpi_process_group const& pg, int source, int tag, Chris@16: trigger_receive_context context, int block=-1) const; Chris@16: Chris@16: private: Chris@16: mpi_process_group& self; Chris@16: mutable Handler handler; Chris@16: }; Chris@16: Chris@16: /** Chris@16: * Launches a specific handler with a message reply in response to a Chris@16: * trigger. This function object wraps up the handler function Chris@16: * object and a buffer for incoming data. Chris@16: */ Chris@16: template Chris@16: class reply_trigger_launcher : public trigger_base Chris@16: { Chris@16: public: Chris@16: explicit reply_trigger_launcher(mpi_process_group& self, int tag, Chris@16: const Handler& handler) Chris@16: : trigger_base(tag), self(self), handler(handler) Chris@16: {} Chris@16: Chris@16: void Chris@16: receive(mpi_process_group const& pg, int source, int tag, Chris@16: trigger_receive_context context, int block=-1) const; Chris@16: Chris@16: private: Chris@16: mpi_process_group& self; Chris@16: mutable Handler handler; Chris@16: }; Chris@16: Chris@16: template Chris@16: class global_trigger_launcher : public trigger_base Chris@16: { Chris@16: public: Chris@16: explicit global_trigger_launcher(mpi_process_group& self, int tag, Chris@16: const Handler& handler) Chris@16: : trigger_base(tag), handler(handler) Chris@16: { Chris@16: } Chris@16: Chris@16: void Chris@16: receive(mpi_process_group const& pg, int source, int tag, Chris@16: trigger_receive_context context, int block=-1) const; Chris@16: Chris@16: private: Chris@16: mutable Handler handler; Chris@16: // TBD: do not forget to cancel any outstanding Irecv when deleted, Chris@16: // if we decide to use Irecv Chris@16: }; Chris@16: Chris@16: template Chris@16: class global_irecv_trigger_launcher : public trigger_base Chris@16: { Chris@16: public: Chris@16: explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag, Chris@16: const Handler& handler, int sz) Chris@16: : trigger_base(tag), handler(handler), buffer_size(sz) Chris@16: { Chris@16: prepare_receive(self,tag); Chris@16: } Chris@16: Chris@16: void Chris@16: receive(mpi_process_group const& pg, int source, int tag, Chris@16: trigger_receive_context context, int block=-1) const; Chris@16: Chris@16: private: Chris@16: void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const; Chris@16: Handler handler; Chris@16: int buffer_size; Chris@16: // TBD: do not forget to cancel any outstanding Irecv when deleted, Chris@16: // if we decide to use Irecv Chris@16: }; Chris@16: Chris@16: public: Chris@16: /** Chris@16: * Construct a new BSP process group from an MPI communicator. The Chris@16: * MPI communicator will be duplicated to create a new communicator Chris@16: * for this process group to use. Chris@16: */ Chris@16: mpi_process_group(communicator_type parent_comm = communicator_type()); Chris@16: Chris@16: /** Chris@16: * Construct a new BSP process group from an MPI communicator. The Chris@16: * MPI communicator will be duplicated to create a new communicator Chris@16: * for this process group to use. This constructor allows to tune the Chris@16: * size of message batches. Chris@16: * Chris@16: * @param num_headers The maximum number of headers in a message batch Chris@16: * Chris@16: * @param buffer_size The maximum size of the message buffer in a batch. Chris@16: * Chris@16: */ Chris@16: mpi_process_group( std::size_t num_headers, std::size_t buffer_size, Chris@16: communicator_type parent_comm = communicator_type()); Chris@16: Chris@16: /** Chris@16: * Construct a copy of the BSP process group for a new distributed Chris@16: * data structure. This data structure will synchronize with all Chris@16: * other members of the process group's equivalence class (including Chris@16: * @p other), but will have its own set of tags. Chris@16: * Chris@16: * @param other The process group that this new process group will Chris@16: * be based on, using a different set of tags within the same Chris@16: * communication and synchronization space. Chris@16: * Chris@16: * @param handler A message handler that will be passed (source, Chris@16: * tag) pairs for each message received by this data Chris@16: * structure. The handler is expected to receive the messages Chris@16: * immediately. The handler can be changed after-the-fact by Chris@16: * calling @c replace_handler. Chris@16: * Chris@16: * @param out_of_band_receive An anachronism. TODO: remove this. Chris@16: */ Chris@16: mpi_process_group(const mpi_process_group& other, Chris@16: const receiver_type& handler, Chris@16: bool out_of_band_receive = false); Chris@16: Chris@16: /** Chris@16: * Construct a copy of the BSP process group for a new distributed Chris@16: * data structure. This data structure will synchronize with all Chris@16: * other members of the process group's equivalence class (including Chris@16: * @p other), but will have its own set of tags. Chris@16: */ Chris@16: mpi_process_group(const mpi_process_group& other, Chris@16: attach_distributed_object, Chris@16: bool out_of_band_receive = false); Chris@16: Chris@16: /** Chris@16: * Create an "empty" process group, with no information. This is an Chris@16: * internal routine that users should never need. Chris@16: */ Chris@16: explicit mpi_process_group(create_empty) {} Chris@16: Chris@16: /** Chris@16: * Destroys this copy of the process group. Chris@16: */ Chris@16: ~mpi_process_group(); Chris@16: Chris@16: /** Chris@16: * Replace the current message handler with a new message handler. Chris@16: * Chris@16: * @param handle The new message handler. Chris@16: * @param out_of_band_receive An anachronism: remove this Chris@16: */ Chris@16: void replace_handler(const receiver_type& handler, Chris@16: bool out_of_band_receive = false); Chris@16: Chris@16: /** Chris@16: * Turns this process group into the process group for a new Chris@16: * distributed data structure or object, allocating its own tag Chris@16: * block. Chris@16: */ Chris@16: void make_distributed_object(); Chris@16: Chris@16: /** Chris@16: * Replace the handler to be invoked at the beginning of synchronize. Chris@16: */ Chris@16: void Chris@16: replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0); Chris@16: Chris@16: /** Chris@16: * Return the block number of the current data structure. A value of Chris@16: * 0 indicates that this particular instance of the process group is Chris@16: * not associated with any distributed data structure. Chris@16: */ Chris@16: int my_block_number() const { return block_num? *block_num : 0; } Chris@16: Chris@16: /** Chris@16: * Encode a block number/tag pair into a single encoded tag for Chris@16: * transmission. Chris@16: */ Chris@16: int encode_tag(int block_num, int tag) const Chris@16: { return block_num * max_tags + tag; } Chris@16: Chris@16: /** Chris@16: * Decode an encoded tag into a block number/tag pair. Chris@16: */ Chris@16: std::pair decode_tag(int encoded_tag) const Chris@16: { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); } Chris@16: Chris@16: // @todo Actually write up the friend declarations so these could be Chris@16: // private. Chris@16: Chris@16: // private: Chris@16: Chris@16: /** Allocate a block of tags for this instance. The block should not Chris@16: * have been allocated already, e.g., my_block_number() == Chris@16: * 0. Returns the newly-allocated block number. Chris@16: */ Chris@16: int allocate_block(bool out_of_band_receive = false); Chris@16: Chris@16: /** Potentially emit a receive event out of band. Returns true if an event Chris@16: * was actually sent, false otherwise. Chris@16: */ Chris@16: bool maybe_emit_receive(int process, int encoded_tag) const; Chris@16: Chris@16: /** Emit a receive event. Returns true if an event was actually Chris@16: * sent, false otherwise. Chris@16: */ Chris@16: bool emit_receive(int process, int encoded_tag) const; Chris@16: Chris@16: /** Emit an on-synchronize event to all block handlers. */ Chris@16: void emit_on_synchronize() const; Chris@16: Chris@16: /** Retrieve a reference to the stored receiver in this block. */ Chris@16: template Chris@16: Receiver* get_receiver(); Chris@16: Chris@16: template Chris@16: void Chris@16: send_impl(int dest, int tag, const T& value, Chris@16: mpl::true_ /*is_mpi_datatype*/) const; Chris@16: Chris@16: template Chris@16: void Chris@16: send_impl(int dest, int tag, const T& value, Chris@16: mpl::false_ /*is_mpi_datatype*/) const; Chris@16: Chris@16: template Chris@16: typename disable_if, void>::type Chris@16: array_send_impl(int dest, int tag, const T values[], std::size_t n) const; Chris@16: Chris@16: template Chris@16: bool Chris@16: receive_impl(int source, int tag, T& value, Chris@16: mpl::true_ /*is_mpi_datatype*/) const; Chris@16: Chris@16: template Chris@16: bool Chris@16: receive_impl(int source, int tag, T& value, Chris@16: mpl::false_ /*is_mpi_datatype*/) const; Chris@16: Chris@16: // Receive an array of values Chris@16: template Chris@16: typename disable_if, bool>::type Chris@16: array_receive_impl(int source, int tag, T* values, std::size_t& n) const; Chris@16: Chris@16: optional > probe() const; Chris@16: Chris@16: void synchronize() const; Chris@16: Chris@16: operator bool() { return bool(impl_); } Chris@16: Chris@16: mpi_process_group base() const; Chris@16: Chris@16: /** Chris@16: * Create a new trigger for a specific message tag. Triggers handle Chris@16: * out-of-band messaging, and the handler itself will be called Chris@16: * whenever a message is available. The handler itself accepts four Chris@16: * arguments: the source of the message, the message tag (which will Chris@16: * be the same as @p tag), the message data (of type @c Type), and a Chris@16: * boolean flag that states whether the message was received Chris@16: * out-of-band. The last will be @c true for out-of-band receives, Chris@16: * or @c false for receives at the end of a synchronization step. Chris@16: */ Chris@16: template Chris@16: void trigger(int tag, const Handler& handler); Chris@16: Chris@16: /** Chris@16: * Create a new trigger for a specific message tag, along with a way Chris@16: * to send a reply with data back to the sender. Triggers handle Chris@16: * out-of-band messaging, and the handler itself will be called Chris@16: * whenever a message is available. The handler itself accepts four Chris@16: * arguments: the source of the message, the message tag (which will Chris@16: * be the same as @p tag), the message data (of type @c Type), and a Chris@16: * boolean flag that states whether the message was received Chris@16: * out-of-band. The last will be @c true for out-of-band receives, Chris@16: * or @c false for receives at the end of a synchronization Chris@16: * step. The handler also returns a value, which will be routed back Chris@16: * to the sender. Chris@16: */ Chris@16: template Chris@16: void trigger_with_reply(int tag, const Handler& handler); Chris@16: Chris@16: template Chris@16: void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0); Chris@16: Chris@16: Chris@16: Chris@16: /** Chris@16: * Poll for any out-of-band messages. This routine will check if any Chris@16: * out-of-band messages are available. Those that are available will Chris@16: * be handled immediately, if possible. Chris@16: * Chris@16: * @returns if an out-of-band message has been received, but we are Chris@16: * unable to actually receive the message, a (source, tag) pair will Chris@16: * be returned. Otherwise, returns an empty optional. Chris@16: * Chris@16: * @param wait When true, we should block until a message comes in. Chris@16: * Chris@16: * @param synchronizing whether we are currently synchronizing the Chris@16: * process group Chris@16: */ Chris@16: optional > Chris@16: poll(bool wait = false, int block = -1, bool synchronizing = false) const; Chris@16: Chris@16: /** Chris@16: * Determines the context of the trigger currently executing. If Chris@16: * multiple triggers are executing (recursively), then the context Chris@16: * for the most deeply nested trigger will be returned. If no Chris@16: * triggers are executing, returns @c trc_none. This might be used, Chris@16: * for example, to determine whether a reply to a message should Chris@16: * itself be sent out-of-band or whether it can go via the normal, Chris@16: * slower communication route. Chris@16: */ Chris@16: trigger_receive_context trigger_context() const; Chris@16: Chris@16: /// INTERNAL ONLY Chris@16: void receive_batch(process_id_type source, outgoing_messages& batch) const; Chris@16: Chris@16: /// INTERNAL ONLY Chris@16: /// Chris@16: /// Determine the actual communicator and tag will be used for a Chris@16: /// transmission with the given tag. Chris@16: std::pair Chris@16: actual_communicator_and_tag(int tag, int block) const; Chris@16: Chris@16: /// set the size of the message buffer used for buffered oob sends Chris@16: Chris@16: static void set_message_buffer_size(std::size_t s); Chris@16: Chris@16: /// get the size of the message buffer used for buffered oob sends Chris@16: Chris@16: static std::size_t message_buffer_size(); Chris@16: static int old_buffer_size; Chris@16: static void* old_buffer; Chris@16: private: Chris@16: Chris@16: void install_trigger(int tag, int block, Chris@16: shared_ptr const& launcher); Chris@16: Chris@16: void poll_requests(int block=-1) const; Chris@16: Chris@16: Chris@16: // send a batch if the buffer is full now or would get full Chris@16: void maybe_send_batch(process_id_type dest) const; Chris@16: Chris@16: // actually send a batch Chris@16: void send_batch(process_id_type dest, outgoing_messages& batch) const; Chris@16: void send_batch(process_id_type dest) const; Chris@16: Chris@16: void pack_headers() const; Chris@16: Chris@16: /** Chris@16: * Process a batch of incoming messages immediately. Chris@16: * Chris@16: * @param source the source of these messages Chris@16: */ Chris@16: void process_batch(process_id_type source) const; Chris@16: void receive_batch(boost::mpi::status& status) const; Chris@16: Chris@16: //void free_finished_sends() const; Chris@16: Chris@16: /// Status messages used internally by the process group Chris@16: enum status_messages { Chris@16: /// the first of the reserved message tags Chris@16: msg_reserved_first = 126, Chris@16: /// Sent from a processor when sending batched messages Chris@16: msg_batch = 126, Chris@16: /// Sent from a processor when sending large batched messages, larger than Chris@16: /// the maximum buffer size for messages to be received by MPI_Irecv Chris@16: msg_large_batch = 127, Chris@16: /// Sent from a source processor to everyone else when that Chris@16: /// processor has entered the synchronize() function. Chris@16: msg_synchronizing = 128, Chris@16: /// the last of the reserved message tags Chris@16: msg_reserved_last = 128 Chris@16: }; Chris@16: Chris@16: /** Chris@16: * Description of a block of tags associated to a particular Chris@16: * distributed data structure. This structure will live as long as Chris@16: * the distributed data structure is around, and will be used to Chris@16: * help send messages to the data structure. Chris@16: */ Chris@16: struct block_type Chris@16: { Chris@16: block_type() { } Chris@16: Chris@16: /// Handler for receive events Chris@16: receiver_type on_receive; Chris@16: Chris@16: /// Handler executed at the start of synchronization Chris@16: on_synchronize_event_type on_synchronize; Chris@16: Chris@16: /// Individual message triggers. Note: at present, this vector is Chris@16: /// indexed by the (local) tag of the trigger. Any tags that Chris@16: /// don't have triggers will have NULL pointers in that spot. Chris@16: std::vector > triggers; Chris@16: }; Chris@16: Chris@16: /** Chris@16: * Data structure containing all of the blocks for the distributed Chris@16: * data structures attached to a process group. Chris@16: */ Chris@16: typedef std::vector blocks_type; Chris@16: Chris@16: /// Iterator into @c blocks_type. Chris@16: typedef blocks_type::iterator block_iterator; Chris@16: Chris@16: /** Chris@16: * Deleter used to deallocate a block when its distributed data Chris@16: * structure is destroyed. This type will be used as the deleter for Chris@16: * @c block_num. Chris@16: */ Chris@16: struct deallocate_block; Chris@16: Chris@16: static std::vector message_buffer; Chris@16: Chris@16: public: Chris@16: /** Chris@16: * Data associated with the process group and all of its attached Chris@16: * distributed data structures. Chris@16: */ Chris@16: shared_ptr impl_; Chris@16: Chris@16: /** Chris@16: * When non-null, indicates that this copy of the process group is Chris@16: * associated with a particular distributed data structure. The Chris@16: * integer value contains the block number (a value > 0) associated Chris@16: * with that data structure. The deleter for this @c shared_ptr is a Chris@16: * @c deallocate_block object that will deallocate the associated Chris@16: * block in @c impl_->blocks. Chris@16: */ Chris@16: shared_ptr block_num; Chris@16: Chris@16: /** Chris@16: * Rank of this process, to avoid having to call rank() repeatedly. Chris@16: */ Chris@16: int rank; Chris@16: Chris@16: /** Chris@16: * Number of processes in this process group, to avoid having to Chris@16: * call communicator::size() repeatedly. Chris@16: */ Chris@16: int size; Chris@16: }; Chris@16: Chris@16: Chris@16: Chris@16: inline mpi_process_group::process_id_type Chris@16: process_id(const mpi_process_group& pg) Chris@16: { return pg.rank; } Chris@16: Chris@16: inline mpi_process_group::process_size_type Chris@16: num_processes(const mpi_process_group& pg) Chris@16: { return pg.size; } Chris@16: Chris@16: mpi_process_group::communicator_type communicator(const mpi_process_group& pg); Chris@16: Chris@16: template Chris@16: void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T& value); Chris@16: Chris@16: template Chris@16: void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, InputIterator first, InputIterator last); Chris@16: Chris@16: template Chris@16: inline void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, T* first, T* last) Chris@16: { send(pg, dest, tag, first, last - first); } Chris@16: Chris@16: template Chris@16: inline void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T* first, const T* last) Chris@16: { send(pg, dest, tag, first, last - first); } Chris@16: Chris@16: template Chris@16: mpi_process_group::process_id_type Chris@16: receive(const mpi_process_group& pg, int tag, T& value); Chris@16: Chris@16: template Chris@16: mpi_process_group::process_id_type Chris@16: receive(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value); Chris@16: Chris@16: optional > Chris@16: probe(const mpi_process_group& pg); Chris@16: Chris@16: void synchronize(const mpi_process_group& pg); Chris@16: Chris@16: template Chris@16: T* Chris@16: all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, Chris@16: BinaryOperation bin_op); Chris@16: Chris@16: template Chris@16: T* Chris@16: scan(const mpi_process_group& pg, T* first, T* last, T* out, Chris@16: BinaryOperation bin_op); Chris@16: Chris@16: template Chris@16: void Chris@16: all_gather(const mpi_process_group& pg, Chris@16: InputIterator first, InputIterator last, std::vector& out); Chris@16: Chris@16: template Chris@16: mpi_process_group Chris@16: process_subgroup(const mpi_process_group& pg, Chris@16: InputIterator first, InputIterator last); Chris@16: Chris@16: template Chris@16: void Chris@16: broadcast(const mpi_process_group& pg, T& val, Chris@16: mpi_process_group::process_id_type root); Chris@16: Chris@16: Chris@16: /******************************************************************* Chris@16: * Out-of-band communication * Chris@16: *******************************************************************/ Chris@16: Chris@16: template Chris@16: typename enable_if >::type Chris@16: send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T& value, int block=-1) Chris@16: { Chris@16: using boost::mpi::get_mpi_datatype; Chris@16: Chris@16: // Determine the actual message tag we will use for the send, and which Chris@16: // communicator we will use. Chris@16: std::pair actual Chris@16: = pg.actual_communicator_and_tag(tag, block); Chris@16: Chris@16: #ifdef SEND_OOB_BSEND Chris@16: if (mpi_process_group::message_buffer_size()) { Chris@16: MPI_Bsend(const_cast(&value), 1, get_mpi_datatype(value), dest, Chris@16: actual.second, actual.first); Chris@16: return; Chris@16: } Chris@16: #endif Chris@16: MPI_Request request; Chris@16: MPI_Isend(const_cast(&value), 1, get_mpi_datatype(value), dest, Chris@16: actual.second, actual.first, &request); Chris@16: Chris@16: int done=0; Chris@16: do { Chris@16: pg.poll(); Chris@16: MPI_Test(&request,&done,MPI_STATUS_IGNORE); Chris@16: } while (!done); Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if >::type Chris@16: send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T& value, int block=-1) Chris@16: { Chris@16: using boost::mpi::packed_oarchive; Chris@16: Chris@16: // Determine the actual message tag we will use for the send, and which Chris@16: // communicator we will use. Chris@16: std::pair actual Chris@16: = pg.actual_communicator_and_tag(tag, block); Chris@16: Chris@16: // Serialize the data into a buffer Chris@16: packed_oarchive out(actual.first); Chris@16: out << value; Chris@16: std::size_t size = out.size(); Chris@16: Chris@16: // Send the actual message data Chris@16: #ifdef SEND_OOB_BSEND Chris@16: if (mpi_process_group::message_buffer_size()) { Chris@16: MPI_Bsend(const_cast(out.address()), size, MPI_PACKED, Chris@16: dest, actual.second, actual.first); Chris@16: return; Chris@16: } Chris@16: #endif Chris@16: MPI_Request request; Chris@16: MPI_Isend(const_cast(out.address()), size, MPI_PACKED, Chris@16: dest, actual.second, actual.first, &request); Chris@16: Chris@16: int done=0; Chris@16: do { Chris@16: pg.poll(); Chris@16: MPI_Test(&request,&done,MPI_STATUS_IGNORE); Chris@16: } while (!done); Chris@16: } Chris@16: Chris@16: template Chris@16: typename enable_if >::type Chris@16: receive_oob(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value, int block=-1); Chris@16: Chris@16: template Chris@16: typename disable_if >::type Chris@16: receive_oob(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value, int block=-1); Chris@16: Chris@16: template Chris@16: typename enable_if >::type Chris@16: send_oob_with_reply(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type dest, Chris@16: int tag, const SendT& send_value, ReplyT& reply_value, Chris@16: int block = -1); Chris@16: Chris@16: template Chris@16: typename disable_if >::type Chris@16: send_oob_with_reply(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type dest, Chris@16: int tag, const SendT& send_value, ReplyT& reply_value, Chris@16: int block = -1); Chris@16: Chris@16: } } } // end namespace boost::graph::distributed Chris@16: Chris@16: BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header) Chris@16: namespace boost { namespace mpi { Chris@16: template<> Chris@16: struct is_mpi_datatype : mpl::true_ { }; Chris@16: } } // end namespace boost::mpi Chris@16: Chris@16: namespace std { Chris@16: /// optimized swap for outgoing messages Chris@16: inline void Chris@16: swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x, Chris@16: boost::graph::distributed::mpi_process_group::outgoing_messages& y) Chris@16: { Chris@16: x.swap(y); Chris@16: } Chris@16: Chris@16: Chris@16: } Chris@16: Chris@16: BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable) Chris@16: BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never) Chris@16: Chris@16: #include Chris@16: Chris@16: #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP