Chris@16: // -*- C++ -*- Chris@16: Chris@16: // Copyright (C) 2004-2008 The Trustees of Indiana University. Chris@16: // Copyright (C) 2007 Douglas Gregor Chris@16: // Copyright (C) 2007 Matthias Troyer Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: // Matthias Troyer Chris@16: Chris@16: //#define PBGL_PROCESS_GROUP_DEBUG Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: // #define PBGL_PROCESS_GROUP_DEBUG Chris@16: Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: # include Chris@16: #endif Chris@16: Chris@16: namespace boost { namespace graph { namespace distributed { Chris@16: Chris@16: struct mpi_process_group::impl Chris@16: { Chris@16: Chris@16: typedef mpi_process_group::message_header message_header; Chris@16: typedef mpi_process_group::outgoing_messages outgoing_messages; Chris@16: Chris@16: /** Chris@16: * Stores the incoming messages from a particular processor. Chris@16: * Chris@16: * @todo Evaluate whether we should use a deque instance, which Chris@16: * would reduce could reduce the cost of "receiving" messages and Chris@16: allow us to deallocate memory earlier, but increases the time Chris@16: spent in the synchronization step. Chris@16: */ Chris@16: struct incoming_messages { Chris@16: incoming_messages(); Chris@16: ~incoming_messages() {} Chris@16: Chris@16: std::vector headers; Chris@16: buffer_type buffer; Chris@16: std::vector::iterator> next_header; Chris@16: }; Chris@16: Chris@16: struct batch_request { Chris@16: MPI_Request request; Chris@16: buffer_type buffer; Chris@16: }; Chris@16: Chris@16: // send once we have a certain number of messages or bytes in the buffer Chris@16: // these numbers need to be tuned, we keep them small at first for testing Chris@16: std::size_t batch_header_number; Chris@16: std::size_t batch_buffer_size; Chris@16: std::size_t batch_message_size; Chris@16: Chris@16: /** Chris@16: * The actual MPI communicator used to transmit data. Chris@16: */ Chris@16: boost::mpi::communicator comm; Chris@16: Chris@16: /** Chris@16: * The MPI communicator used to transmit out-of-band replies. Chris@16: */ Chris@16: boost::mpi::communicator oob_reply_comm; Chris@16: Chris@16: /// Outgoing message information, indexed by destination processor. Chris@16: std::vector outgoing; Chris@16: Chris@16: /// Incoming message information, indexed by source processor. Chris@16: std::vector incoming; Chris@16: Chris@16: /// The numbers of processors that have entered a synchronization stage Chris@16: std::vector processors_synchronizing_stage; Chris@16: Chris@16: /// The synchronization stage of a processor Chris@16: std::vector synchronizing_stage; Chris@16: Chris@16: /// Number of processors still sending messages Chris@16: std::vector synchronizing_unfinished; Chris@16: Chris@16: /// Number of batches sent since last synchronization stage Chris@16: std::vector number_sent_batches; Chris@16: Chris@16: /// Number of batches received minus number of expected batches Chris@16: std::vector number_received_batches; Chris@16: Chris@16: Chris@16: /// The context of the currently-executing trigger, or @c trc_none Chris@16: /// if no trigger is executing. Chris@16: trigger_receive_context trigger_context; Chris@16: Chris@16: /// Non-zero indicates that we're processing batches Chris@16: /// Increment this when processing patches, Chris@16: /// decrement it when you're done. Chris@16: int processing_batches; Chris@16: Chris@16: /** Chris@16: * Contains all of the active blocks corresponding to attached Chris@16: * distributed data structures. Chris@16: */ Chris@16: blocks_type blocks; Chris@16: Chris@16: /// Whether we are currently synchronizing Chris@16: bool synchronizing; Chris@16: Chris@16: /// The MPI requests for posted sends of oob messages Chris@16: std::vector requests; Chris@16: Chris@16: /// The MPI buffers for posted irecvs of oob messages Chris@16: std::map buffers; Chris@16: Chris@16: /// Queue for message batches received while already processing messages Chris@16: std::queue > new_batches; Chris@16: /// Maximum encountered size of the new_batches queue Chris@16: std::size_t max_received; Chris@16: Chris@16: /// The MPI requests and buffers for batchess being sent Chris@16: std::list sent_batches; Chris@16: /// Maximum encountered size of the sent_batches list Chris@16: std::size_t max_sent; Chris@16: Chris@16: /// Pre-allocated requests in a pool Chris@16: std::vector batch_pool; Chris@16: /// A stack controlling which batches are available Chris@16: std::stack free_batches; Chris@16: Chris@16: void free_sent_batches(); Chris@16: Chris@16: // Tag allocator Chris@16: detail::tag_allocator allocated_tags; Chris@16: Chris@16: impl(std::size_t num_headers, std::size_t buffers_size, Chris@16: communicator_type parent_comm); Chris@16: ~impl(); Chris@16: Chris@16: private: Chris@16: void set_batch_size(std::size_t header_num, std::size_t buffer_sz); Chris@16: }; Chris@16: Chris@16: inline trigger_receive_context mpi_process_group::trigger_context() const Chris@16: { Chris@16: return impl_->trigger_context; Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::send_impl(int dest, int tag, const T& value, Chris@16: mpl::true_ /*is_mpi_datatype*/) const Chris@16: { Chris@16: BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); Chris@16: Chris@16: impl::outgoing_messages& outgoing = impl_->outgoing[dest]; Chris@16: Chris@16: // Start constructing the message header Chris@16: impl::message_header header; Chris@16: header.source = process_id(*this); Chris@16: header.tag = tag; Chris@16: header.offset = outgoing.buffer.size(); Chris@16: Chris@16: boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); Chris@16: oa << value; Chris@16: Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " Chris@16: << tag << ", bytes = " << packed_size << std::endl; Chris@16: #endif Chris@16: Chris@16: // Store the header Chris@16: header.bytes = outgoing.buffer.size() - header.offset; Chris@16: outgoing.headers.push_back(header); Chris@16: Chris@16: maybe_send_batch(dest); Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::send_impl(int dest, int tag, const T& value, Chris@16: mpl::false_ /*is_mpi_datatype*/) const Chris@16: { Chris@16: BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); Chris@16: Chris@16: impl::outgoing_messages& outgoing = impl_->outgoing[dest]; Chris@16: Chris@16: // Start constructing the message header Chris@16: impl::message_header header; Chris@16: header.source = process_id(*this); Chris@16: header.tag = tag; Chris@16: header.offset = outgoing.buffer.size(); Chris@16: Chris@16: // Serialize into the buffer Chris@16: boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); Chris@16: out << value; Chris@16: Chris@16: // Store the header Chris@16: header.bytes = outgoing.buffer.size() - header.offset; Chris@16: outgoing.headers.push_back(header); Chris@16: maybe_send_batch(dest); Chris@16: Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " Chris@16: << tag << ", bytes = " << header.bytes << std::endl; Chris@16: #endif Chris@16: } Chris@16: Chris@16: template Chris@16: inline void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T& value) Chris@16: { Chris@16: pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, Chris@16: boost::mpi::is_mpi_datatype()); Chris@16: } Chris@16: Chris@16: template Chris@16: typename enable_if, void>::type Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T values[], std::size_t n) Chris@16: { Chris@16: pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), Chris@16: boost::serialization::make_array(values,n), Chris@16: boost::mpl::true_()); Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if, void>::type Chris@16: mpi_process_group:: Chris@16: array_send_impl(int dest, int tag, const T values[], std::size_t n) const Chris@16: { Chris@16: BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); Chris@16: Chris@16: impl::outgoing_messages& outgoing = impl_->outgoing[dest]; Chris@16: Chris@16: // Start constructing the message header Chris@16: impl::message_header header; Chris@16: header.source = process_id(*this); Chris@16: header.tag = tag; Chris@16: header.offset = outgoing.buffer.size(); Chris@16: Chris@16: // Serialize into the buffer Chris@16: boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); Chris@16: out << n; Chris@16: Chris@16: for (std::size_t i = 0; i < n; ++i) Chris@16: out << values[i]; Chris@16: Chris@16: // Store the header Chris@16: header.bytes = outgoing.buffer.size() - header.offset; Chris@16: outgoing.headers.push_back(header); Chris@16: maybe_send_batch(dest); Chris@16: Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " Chris@16: << tag << ", bytes = " << header.bytes << std::endl; Chris@16: #endif Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if, void>::type Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, const T values[], std::size_t n) Chris@16: { Chris@16: pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), Chris@16: values, n); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, Chris@16: int tag, InputIterator first, InputIterator last) Chris@16: { Chris@16: typedef typename std::iterator_traits::value_type value_type; Chris@16: std::vector values(first, last); Chris@16: if (values.empty()) send(pg, dest, tag, static_cast(0), 0); Chris@16: else send(pg, dest, tag, &values[0], values.size()); Chris@16: } Chris@16: Chris@16: template Chris@16: bool Chris@16: mpi_process_group::receive_impl(int source, int tag, T& value, Chris@16: mpl::true_ /*is_mpi_datatype*/) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " Chris@16: << tag << std::endl; Chris@16: #endif Chris@16: Chris@16: impl::incoming_messages& incoming = impl_->incoming[source]; Chris@16: Chris@16: // Find the next header with the right tag Chris@16: std::vector::iterator header = Chris@16: incoming.next_header[my_block_number()]; Chris@16: while (header != incoming.headers.end() && header->tag != tag) ++header; Chris@16: Chris@16: // If no header is found, notify the caller Chris@16: if (header == incoming.headers.end()) return false; Chris@16: Chris@16: // Unpack the data Chris@16: if (header->bytes > 0) { Chris@16: boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, Chris@16: archive::no_header, header->offset); Chris@16: ia >> value; Chris@16: } Chris@16: Chris@16: // Mark this message as received Chris@16: header->tag = -1; Chris@16: Chris@16: // Move the "next header" indicator to the next unreceived message Chris@16: while (incoming.next_header[my_block_number()] != incoming.headers.end() Chris@16: && incoming.next_header[my_block_number()]->tag == -1) Chris@16: ++incoming.next_header[my_block_number()]; Chris@16: Chris@16: if (incoming.next_header[my_block_number()] == incoming.headers.end()) { Chris@16: bool finished = true; Chris@16: for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { Chris@16: if (incoming.next_header[i] != incoming.headers.end()) finished = false; Chris@16: } Chris@16: Chris@16: if (finished) { Chris@16: std::vector no_headers; Chris@16: incoming.headers.swap(no_headers); Chris@16: buffer_type empty_buffer; Chris@16: incoming.buffer.swap(empty_buffer); Chris@16: for (std::size_t i = 0; i < incoming.next_header.size(); ++i) Chris@16: incoming.next_header[i] = incoming.headers.end(); Chris@16: } Chris@16: } Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: template Chris@16: bool Chris@16: mpi_process_group::receive_impl(int source, int tag, T& value, Chris@16: mpl::false_ /*is_mpi_datatype*/) const Chris@16: { Chris@16: impl::incoming_messages& incoming = impl_->incoming[source]; Chris@16: Chris@16: // Find the next header with the right tag Chris@16: std::vector::iterator header = Chris@16: incoming.next_header[my_block_number()]; Chris@16: while (header != incoming.headers.end() && header->tag != tag) ++header; Chris@16: Chris@16: // If no header is found, notify the caller Chris@16: if (header == incoming.headers.end()) return false; Chris@16: Chris@16: // Deserialize the data Chris@16: boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, Chris@16: archive::no_header, header->offset); Chris@16: in >> value; Chris@16: Chris@16: // Mark this message as received Chris@16: header->tag = -1; Chris@16: Chris@16: // Move the "next header" indicator to the next unreceived message Chris@16: while (incoming.next_header[my_block_number()] != incoming.headers.end() Chris@16: && incoming.next_header[my_block_number()]->tag == -1) Chris@16: ++incoming.next_header[my_block_number()]; Chris@16: Chris@16: if (incoming.next_header[my_block_number()] == incoming.headers.end()) { Chris@16: bool finished = true; Chris@16: for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { Chris@16: if (incoming.next_header[i] != incoming.headers.end()) finished = false; Chris@16: } Chris@16: Chris@16: if (finished) { Chris@16: std::vector no_headers; Chris@16: incoming.headers.swap(no_headers); Chris@16: buffer_type empty_buffer; Chris@16: incoming.buffer.swap(empty_buffer); Chris@16: for (std::size_t i = 0; i < incoming.next_header.size(); ++i) Chris@16: incoming.next_header[i] = incoming.headers.end(); Chris@16: } Chris@16: } Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if, bool>::type Chris@16: mpi_process_group:: Chris@16: array_receive_impl(int source, int tag, T* values, std::size_t& n) const Chris@16: { Chris@16: impl::incoming_messages& incoming = impl_->incoming[source]; Chris@16: Chris@16: // Find the next header with the right tag Chris@16: std::vector::iterator header = Chris@16: incoming.next_header[my_block_number()]; Chris@16: while (header != incoming.headers.end() && header->tag != tag) ++header; Chris@16: Chris@16: // If no header is found, notify the caller Chris@16: if (header == incoming.headers.end()) return false; Chris@16: Chris@16: // Deserialize the data Chris@16: boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, Chris@16: archive::no_header, header->offset); Chris@16: std::size_t num_sent; Chris@16: in >> num_sent; Chris@16: if (num_sent > n) Chris@16: std::cerr << "ERROR: Have " << num_sent << " items but only space for " Chris@16: << n << " items\n"; Chris@16: Chris@16: for (std::size_t i = 0; i < num_sent; ++i) Chris@16: in >> values[i]; Chris@16: n = num_sent; Chris@16: Chris@16: // Mark this message as received Chris@16: header->tag = -1; Chris@16: Chris@16: // Move the "next header" indicator to the next unreceived message Chris@16: while (incoming.next_header[my_block_number()] != incoming.headers.end() Chris@16: && incoming.next_header[my_block_number()]->tag == -1) Chris@16: ++incoming.next_header[my_block_number()]; Chris@16: Chris@16: if (incoming.next_header[my_block_number()] == incoming.headers.end()) { Chris@16: bool finished = true; Chris@16: for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { Chris@16: if (incoming.next_header[i] != incoming.headers.end()) finished = false; Chris@16: } Chris@16: Chris@16: if (finished) { Chris@16: std::vector no_headers; Chris@16: incoming.headers.swap(no_headers); Chris@16: buffer_type empty_buffer; Chris@16: incoming.buffer.swap(empty_buffer); Chris@16: for (std::size_t i = 0; i < incoming.next_header.size(); ++i) Chris@16: incoming.next_header[i] = incoming.headers.end(); Chris@16: } Chris@16: } Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: // Construct triggers Chris@16: template Chris@16: void mpi_process_group::trigger(int tag, const Handler& handler) Chris@16: { Chris@16: BOOST_ASSERT(block_num); Chris@16: install_trigger(tag,my_block_number(),shared_ptr( Chris@16: new trigger_launcher(*this, tag, handler))); Chris@16: } Chris@16: Chris@16: template Chris@16: void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) Chris@16: { Chris@16: BOOST_ASSERT(block_num); Chris@16: install_trigger(tag,my_block_number(),shared_ptr( Chris@16: new reply_trigger_launcher(*this, tag, handler))); Chris@16: } Chris@16: Chris@16: template Chris@16: void mpi_process_group::global_trigger(int tag, const Handler& handler, Chris@16: std::size_t sz) Chris@16: { Chris@16: if (sz==0) // normal trigger Chris@16: install_trigger(tag,0,shared_ptr( Chris@16: new global_trigger_launcher(*this, tag, handler))); Chris@16: else // trigger with irecv Chris@16: install_trigger(tag,0,shared_ptr( Chris@16: new global_irecv_trigger_launcher(*this, tag, handler,sz))); Chris@16: Chris@16: } Chris@16: Chris@16: namespace detail { Chris@16: Chris@16: template Chris@16: void do_oob_receive(mpi_process_group const& self, Chris@16: int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) Chris@16: { Chris@16: using boost::mpi::get_mpi_datatype; Chris@16: Chris@16: //self.impl_->comm.recv(source,tag,data); Chris@16: MPI_Recv(&data, 1, get_mpi_datatype(data), source, tag, self.impl_->comm, Chris@16: MPI_STATUS_IGNORE); Chris@16: } Chris@16: Chris@16: template Chris@16: void do_oob_receive(mpi_process_group const& self, Chris@16: int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) Chris@16: { Chris@16: // self.impl_->comm.recv(source,tag,data); Chris@16: // Receive the size of the data packet Chris@16: boost::mpi::status status; Chris@16: status = self.impl_->comm.probe(source, tag); Chris@16: Chris@16: #if BOOST_VERSION >= 103600 Chris@16: int size = status.count().get(); Chris@16: #else Chris@16: int size; Chris@16: MPI_Status& mpi_status = status; Chris@16: MPI_Get_count(&mpi_status, MPI_PACKED, &size); Chris@16: #endif Chris@16: Chris@16: // Receive the data packed itself Chris@16: boost::mpi::packed_iarchive in(self.impl_->comm); Chris@16: in.resize(size); Chris@16: MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, Chris@16: MPI_STATUS_IGNORE); Chris@16: Chris@16: // Deserialize the data Chris@16: in >> data; Chris@16: } Chris@16: Chris@16: template Chris@16: void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) Chris@16: { Chris@16: do_oob_receive(self, source, tag, data, Chris@16: boost::mpi::is_mpi_datatype()); Chris@16: } Chris@16: Chris@16: Chris@16: } // namespace detail Chris@16: Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::trigger_launcher:: Chris@16: receive(mpi_process_group const&, int source, int tag, Chris@16: trigger_receive_context context, int block) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << (out_of_band? "OOB trigger" : "Trigger") Chris@16: << " receive from source " << source << " and tag " << tag Chris@16: << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; Chris@16: #endif Chris@16: Chris@16: Type data; Chris@16: Chris@16: if (context == trc_out_of_band) { Chris@16: // Receive the message directly off the wire Chris@16: int realtag = self.encode_tag( Chris@16: block == -1 ? self.my_block_number() : block, tag); Chris@16: detail::do_oob_receive(self,source,realtag,data); Chris@16: } Chris@16: else Chris@16: // Receive the message out of the local buffer Chris@16: boost::graph::distributed::receive(self, source, tag, data); Chris@16: Chris@16: // Pass the message off to the handler Chris@16: handler(source, tag, data, context); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::reply_trigger_launcher:: Chris@16: receive(mpi_process_group const&, int source, int tag, Chris@16: trigger_receive_context context, int block) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") Chris@16: << " receive from source " << source << " and tag " << tag Chris@16: << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; Chris@16: #endif Chris@16: BOOST_ASSERT(context == trc_out_of_band); Chris@16: Chris@16: boost::parallel::detail::untracked_pair data; Chris@16: Chris@16: // Receive the message directly off the wire Chris@16: int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, Chris@16: tag); Chris@16: detail::do_oob_receive(self, source, realtag, data); Chris@16: Chris@16: // Pass the message off to the handler and send the result back to Chris@16: // the source. Chris@16: send_oob(self, source, data.first, Chris@16: handler(source, tag, data.second, context), -2); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::global_trigger_launcher:: Chris@16: receive(mpi_process_group const& self, int source, int tag, Chris@16: trigger_receive_context context, int block) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << (out_of_band? "OOB trigger" : "Trigger") Chris@16: << " receive from source " << source << " and tag " << tag Chris@16: << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; Chris@16: #endif Chris@16: Chris@16: Type data; Chris@16: Chris@16: if (context == trc_out_of_band) { Chris@16: // Receive the message directly off the wire Chris@16: int realtag = self.encode_tag( Chris@16: block == -1 ? self.my_block_number() : block, tag); Chris@16: detail::do_oob_receive(self,source,realtag,data); Chris@16: } Chris@16: else Chris@16: // Receive the message out of the local buffer Chris@16: boost::graph::distributed::receive(self, source, tag, data); Chris@16: Chris@16: // Pass the message off to the handler Chris@16: handler(self, source, tag, data, context); Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::global_irecv_trigger_launcher:: Chris@16: receive(mpi_process_group const& self, int source, int tag, Chris@16: trigger_receive_context context, int block) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << (out_of_band? "OOB trigger" : "Trigger") Chris@16: << " receive from source " << source << " and tag " << tag Chris@16: << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; Chris@16: #endif Chris@16: Chris@16: Type data; Chris@16: Chris@16: if (context == trc_out_of_band) { Chris@16: return; Chris@16: } Chris@16: BOOST_ASSERT (context == trc_irecv_out_of_band); Chris@16: Chris@16: // force posting of new MPI_Irecv, even though buffer is already allocated Chris@16: boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); Chris@16: ia >> data; Chris@16: // Start a new receive Chris@16: prepare_receive(self,tag,true); Chris@16: // Pass the message off to the handler Chris@16: handler(self, source, tag, data, context); Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: void Chris@16: mpi_process_group::global_irecv_trigger_launcher:: Chris@16: prepare_receive(mpi_process_group const& self, int tag, bool force) const Chris@16: { Chris@16: #ifdef PBGL_PROCESS_GROUP_DEBUG Chris@16: std::cerr << ("Posting Irecv for trigger") Chris@16: << " receive with tag " << tag << std::endl; Chris@16: #endif Chris@16: if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { Chris@16: self.impl_->buffers[tag].resize(buffer_size); Chris@16: force = true; Chris@16: } Chris@16: BOOST_ASSERT(static_cast(self.impl_->buffers[tag].size()) >= buffer_size); Chris@16: Chris@16: //BOOST_MPL_ASSERT(mpl::not_ >); Chris@16: if (force) { Chris@16: self.impl_->requests.push_back(MPI_Request()); Chris@16: MPI_Request* request = &self.impl_->requests.back(); Chris@16: MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, Chris@16: MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); Chris@16: } Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: inline mpi_process_group::process_id_type Chris@16: receive(const mpi_process_group& pg, int tag, T& value) Chris@16: { Chris@16: for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { Chris@16: if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: value, boost::mpi::is_mpi_datatype())) Chris@16: return source; Chris@16: } Chris@16: BOOST_ASSERT (false); Chris@16: } Chris@16: Chris@16: template Chris@16: typename Chris@16: enable_if, Chris@16: std::pair >::type Chris@16: receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) Chris@16: { Chris@16: for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { Chris@16: bool result = Chris@16: pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: boost::serialization::make_array(values,n), Chris@16: boost::mpl::true_()); Chris@16: if (result) Chris@16: return std::make_pair(source, n); Chris@16: } Chris@16: BOOST_ASSERT(false); Chris@16: } Chris@16: Chris@16: template Chris@16: typename Chris@16: disable_if, Chris@16: std::pair >::type Chris@16: receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) Chris@16: { Chris@16: for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { Chris@16: if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: values, n)) Chris@16: return std::make_pair(source, n); Chris@16: } Chris@16: BOOST_ASSERT(false); Chris@16: } Chris@16: Chris@16: template Chris@16: mpi_process_group::process_id_type Chris@16: receive(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value) Chris@16: { Chris@16: if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: value, boost::mpi::is_mpi_datatype())) Chris@16: return source; Chris@16: else { Chris@16: fprintf(stderr, Chris@16: "Process %d failed to receive a message from process %d with tag %d in block %d.\n", Chris@16: process_id(pg), source, tag, pg.my_block_number()); Chris@16: Chris@16: BOOST_ASSERT(false); Chris@16: abort(); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: typename Chris@16: enable_if, Chris@16: std::pair >::type Chris@16: receive(const mpi_process_group& pg, int source, int tag, T values[], Chris@16: std::size_t n) Chris@16: { Chris@16: if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: boost::serialization::make_array(values,n), Chris@16: boost::mpl::true_())) Chris@16: return std::make_pair(source,n); Chris@16: else { Chris@16: fprintf(stderr, Chris@16: "Process %d failed to receive a message from process %d with tag %d in block %d.\n", Chris@16: process_id(pg), source, tag, pg.my_block_number()); Chris@16: Chris@16: BOOST_ASSERT(false); Chris@16: abort(); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: typename Chris@16: disable_if, Chris@16: std::pair >::type Chris@16: receive(const mpi_process_group& pg, int source, int tag, T values[], Chris@16: std::size_t n) Chris@16: { Chris@16: pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), Chris@16: values, n); Chris@16: Chris@16: return std::make_pair(source, n); Chris@16: } Chris@16: Chris@16: template Chris@16: T* Chris@16: all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, Chris@16: BinaryOperation bin_op) Chris@16: { Chris@16: synchronize(pg); Chris@16: Chris@16: bool inplace = first == out; Chris@16: Chris@16: if (inplace) out = new T [last-first]; Chris@16: Chris@16: boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), Chris@16: boost::mpi::comm_attach), Chris@16: first, last-first, out, bin_op); Chris@16: Chris@16: if (inplace) { Chris@16: std::copy(out, out + (last-first), first); Chris@16: delete [] out; Chris@16: return last; Chris@16: } Chris@16: Chris@16: return out; Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: broadcast(const mpi_process_group& pg, T& val, Chris@16: mpi_process_group::process_id_type root) Chris@16: { Chris@16: // broadcast the seed Chris@16: boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); Chris@16: boost::mpi::broadcast(comm,val,root); Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: T* Chris@16: scan(const mpi_process_group& pg, T* first, T* last, T* out, Chris@16: BinaryOperation bin_op) Chris@16: { Chris@16: synchronize(pg); Chris@16: Chris@16: bool inplace = first == out; Chris@16: Chris@16: if (inplace) out = new T [last-first]; Chris@16: Chris@16: boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); Chris@16: Chris@16: if (inplace) { Chris@16: std::copy(out, out + (last-first), first); Chris@16: delete [] out; Chris@16: return last; Chris@16: } Chris@16: Chris@16: return out; Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: void Chris@16: all_gather(const mpi_process_group& pg, InputIterator first, Chris@16: InputIterator last, std::vector& out) Chris@16: { Chris@16: synchronize(pg); Chris@16: Chris@16: // Stick a copy of the local values into a vector, so we can broadcast it Chris@16: std::vector local_values(first, last); Chris@16: Chris@16: // Collect the number of vertices stored in each process Chris@16: int size = local_values.size(); Chris@16: std::vector sizes(num_processes(pg)); Chris@16: int result = MPI_Allgather(&size, 1, MPI_INT, Chris@16: &sizes[0], 1, MPI_INT, Chris@16: communicator(pg)); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: Chris@16: // Adjust sizes based on the number of bytes Chris@16: std::transform(sizes.begin(), sizes.end(), sizes.begin(), Chris@16: std::bind2nd(std::multiplies(), sizeof(T))); Chris@16: Chris@16: // Compute displacements Chris@16: std::vector displacements; Chris@16: displacements.reserve(sizes.size() + 1); Chris@16: displacements.push_back(0); Chris@16: std::partial_sum(sizes.begin(), sizes.end(), Chris@16: std::back_inserter(displacements)); Chris@16: Chris@16: // Gather all of the values Chris@16: out.resize(displacements.back() / sizeof(T)); Chris@16: if (!out.empty()) { Chris@16: result = MPI_Allgatherv(local_values.empty()? (void*)&local_values Chris@16: /* local results */: (void*)&local_values[0], Chris@16: local_values.size() * sizeof(T), Chris@16: MPI_BYTE, Chris@16: &out[0], &sizes[0], &displacements[0], MPI_BYTE, Chris@16: communicator(pg)); Chris@16: } Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: } Chris@16: Chris@16: template Chris@16: mpi_process_group Chris@16: process_subgroup(const mpi_process_group& pg, Chris@16: InputIterator first, InputIterator last) Chris@16: { Chris@16: /* Chris@16: boost::mpi::group current_group = communicator(pg).group(); Chris@16: boost::mpi::group new_group = current_group.include(first,last); Chris@16: boost::mpi::communicator new_comm(communicator(pg),new_group); Chris@16: return mpi_process_group(new_comm); Chris@16: */ Chris@16: std::vector ranks(first, last); Chris@16: Chris@16: MPI_Group current_group; Chris@16: int result = MPI_Comm_group(communicator(pg), ¤t_group); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: Chris@16: MPI_Group new_group; Chris@16: result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: Chris@16: MPI_Comm new_comm; Chris@16: result = MPI_Comm_create(communicator(pg), new_group, &new_comm); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: Chris@16: result = MPI_Group_free(&new_group); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: result = MPI_Group_free(¤t_group); Chris@16: BOOST_ASSERT(result == MPI_SUCCESS); Chris@16: Chris@16: if (new_comm != MPI_COMM_NULL) { Chris@16: mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); Chris@16: result = MPI_Comm_free(&new_comm); Chris@16: BOOST_ASSERT(result == 0); Chris@16: return result_pg; Chris@16: } else { Chris@16: return mpi_process_group(mpi_process_group::create_empty()); Chris@16: } Chris@16: Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: Receiver* mpi_process_group::get_receiver() Chris@16: { Chris@16: return impl_->blocks[my_block_number()]->on_receive Chris@16: .template target(); Chris@16: } Chris@16: Chris@16: template Chris@16: typename enable_if >::type Chris@16: receive_oob(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value, int block) Chris@16: { Chris@16: using boost::mpi::get_mpi_datatype; Chris@16: Chris@16: // Determine the actual message we expect to receive, and which Chris@16: // communicator it will come by. Chris@16: std::pair actual Chris@16: = pg.actual_communicator_and_tag(tag, block); Chris@16: Chris@16: // Post a non-blocking receive that waits until we complete this request. Chris@16: MPI_Request request; Chris@16: MPI_Irecv(&value, 1, get_mpi_datatype(value), Chris@16: source, actual.second, actual.first, &request); Chris@16: Chris@16: int done = 0; Chris@16: do { Chris@16: MPI_Test(&request, &done, MPI_STATUS_IGNORE); Chris@16: if (!done) Chris@16: pg.poll(/*wait=*/false, block); Chris@16: } while (!done); Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if >::type Chris@16: receive_oob(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type source, int tag, T& value, int block) Chris@16: { Chris@16: // Determine the actual message we expect to receive, and which Chris@16: // communicator it will come by. Chris@16: std::pair actual Chris@16: = pg.actual_communicator_and_tag(tag, block); Chris@16: Chris@16: boost::optional status; Chris@16: do { Chris@16: status = actual.first.iprobe(source, actual.second); Chris@16: if (!status) Chris@16: pg.poll(); Chris@16: } while (!status); Chris@16: Chris@16: //actual.first.recv(status->source(), status->tag(),value); Chris@16: Chris@16: // Allocate the receive buffer Chris@16: boost::mpi::packed_iarchive in(actual.first); Chris@16: Chris@16: #if BOOST_VERSION >= 103600 Chris@16: in.resize(status->count().get()); Chris@16: #else Chris@16: int size; Chris@16: MPI_Status mpi_status = *status; Chris@16: MPI_Get_count(&mpi_status, MPI_PACKED, &size); Chris@16: in.resize(size); Chris@16: #endif Chris@16: Chris@16: // Receive the message data Chris@16: MPI_Recv(in.address(), in.size(), MPI_PACKED, Chris@16: status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); Chris@16: Chris@16: // Unpack the message data Chris@16: in >> value; Chris@16: } Chris@16: Chris@16: Chris@16: template Chris@16: typename enable_if >::type Chris@16: send_oob_with_reply(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type dest, Chris@16: int tag, const SendT& send_value, ReplyT& reply_value, Chris@16: int block) Chris@16: { Chris@16: detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); Chris@16: send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( Chris@16: (int)reply_tag, send_value), block); Chris@16: receive_oob(pg, dest, reply_tag, reply_value); Chris@16: } Chris@16: Chris@16: template Chris@16: typename disable_if >::type Chris@16: send_oob_with_reply(const mpi_process_group& pg, Chris@16: mpi_process_group::process_id_type dest, Chris@16: int tag, const SendT& send_value, ReplyT& reply_value, Chris@16: int block) Chris@16: { Chris@16: detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); Chris@16: send_oob(pg, dest, tag, Chris@16: boost::parallel::detail::make_untracked_pair((int)reply_tag, Chris@16: send_value), block); Chris@16: receive_oob(pg, dest, reply_tag, reply_value); Chris@16: } Chris@16: Chris@16: } } } // end namespace boost::graph::distributed