Chris@16: // Copyright (C) 2004-2006 The Trustees of Indiana University. Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: namespace boost { namespace graph { namespace distributed { Chris@16: Chris@16: template Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE:: Chris@16: distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, Chris@16: const Buffer& buffer, bool polling) Chris@16: : process_group(process_group, attach_distributed_object()), Chris@16: owner(owner), Chris@16: buffer(buffer), Chris@16: polling(polling) Chris@16: { Chris@16: if (!polling) Chris@16: outgoing_buffers.reset( Chris@16: new outgoing_buffers_t(num_processes(process_group))); Chris@16: Chris@16: setup_triggers(); Chris@16: } Chris@16: Chris@16: template Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE:: Chris@16: distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, Chris@16: const Buffer& buffer, const UnaryPredicate& pred, Chris@16: bool polling) Chris@16: : process_group(process_group, attach_distributed_object()), Chris@16: owner(owner), Chris@16: buffer(buffer), Chris@16: pred(pred), Chris@16: polling(polling) Chris@16: { Chris@16: if (!polling) Chris@16: outgoing_buffers.reset( Chris@16: new outgoing_buffers_t(num_processes(process_group))); Chris@16: Chris@16: setup_triggers(); Chris@16: } Chris@16: Chris@16: template Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE:: Chris@16: distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, Chris@16: const UnaryPredicate& pred, bool polling) Chris@16: : process_group(process_group, attach_distributed_object()), Chris@16: owner(owner), Chris@16: pred(pred), Chris@16: polling(polling) Chris@16: { Chris@16: if (!polling) Chris@16: outgoing_buffers.reset( Chris@16: new outgoing_buffers_t(num_processes(process_group))); Chris@16: Chris@16: setup_triggers(); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) Chris@16: { Chris@16: typename ProcessGroup::process_id_type dest = get(owner, x); Chris@16: if (outgoing_buffers) Chris@16: outgoing_buffers->at(dest).push_back(x); Chris@16: else if (dest == process_id(process_group)) Chris@16: buffer.push(x); Chris@16: else Chris@16: send(process_group, get(owner, x), msg_push, x); Chris@16: } Chris@16: Chris@16: template Chris@16: bool Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const Chris@16: { Chris@16: /* Processes will stay here until the buffer is nonempty or Chris@16: synchronization with the other processes indicates that all local Chris@16: buffers are empty (and no messages are in transit). Chris@16: */ Chris@16: while (buffer.empty() && !do_synchronize()) ; Chris@16: Chris@16: return buffer.empty(); Chris@16: } Chris@16: Chris@16: template Chris@16: typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE::size() const Chris@16: { Chris@16: empty(); Chris@16: return buffer.size(); Chris@16: } Chris@16: Chris@16: template Chris@16: void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() Chris@16: { Chris@16: using boost::graph::parallel::simple_trigger; Chris@16: Chris@16: simple_trigger(process_group, msg_push, this, Chris@16: &distributed_queue::handle_push); Chris@16: simple_trigger(process_group, msg_multipush, this, Chris@16: &distributed_queue::handle_multipush); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE:: Chris@16: handle_push(int /*source*/, int /*tag*/, const value_type& value, Chris@16: trigger_receive_context) Chris@16: { Chris@16: if (pred(value)) buffer.push(value); Chris@16: } Chris@16: Chris@16: template Chris@16: void Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE:: Chris@16: handle_multipush(int /*source*/, int /*tag*/, Chris@16: const std::vector& values, Chris@16: trigger_receive_context) Chris@16: { Chris@16: for (std::size_t i = 0; i < values.size(); ++i) Chris@16: if (pred(values[i])) buffer.push(values[i]); Chris@16: } Chris@16: Chris@16: template Chris@16: bool Chris@16: BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const Chris@16: { Chris@16: #ifdef PBGL_ACCOUNTING Chris@16: ++num_synchronizations; Chris@16: #endif Chris@16: Chris@16: using boost::parallel::all_reduce; Chris@16: using std::swap; Chris@16: Chris@16: typedef typename ProcessGroup::process_id_type process_id_type; Chris@16: Chris@16: if (outgoing_buffers) { Chris@16: // Transfer all of the push requests Chris@16: process_id_type id = process_id(process_group); Chris@16: process_id_type np = num_processes(process_group); Chris@16: for (process_id_type dest = 0; dest < np; ++dest) { Chris@16: outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); Chris@16: std::size_t size = outgoing.size(); Chris@16: if (size != 0) { Chris@16: if (dest != id) { Chris@16: send(process_group, dest, msg_multipush, outgoing); Chris@16: } else { Chris@16: for (std::size_t i = 0; i < size; ++i) Chris@16: buffer.push(outgoing[i]); Chris@16: } Chris@16: outgoing.clear(); Chris@16: } Chris@16: } Chris@16: } Chris@16: synchronize(process_group); Chris@16: Chris@16: unsigned local_size = buffer.size(); Chris@16: unsigned global_size = Chris@16: all_reduce(process_group, local_size, std::plus()); Chris@16: return global_size == 0; Chris@16: } Chris@16: Chris@16: } } } // end namespace boost::graph::distributed