Mercurial > hg > vamp-build-and-test
diff DEPENDENCIES/generic/include/boost/graph/distributed/detail/queue.ipp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/DEPENDENCIES/generic/include/boost/graph/distributed/detail/queue.ipp Tue Aug 05 11:11:38 2014 +0100 @@ -0,0 +1,177 @@ +// Copyright (C) 2004-2006 The Trustees of Indiana University. + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Authors: Douglas Gregor +// Andrew Lumsdaine +#include <boost/optional.hpp> +#include <cassert> +#include <boost/graph/parallel/algorithm.hpp> +#include <boost/graph/parallel/process_group.hpp> +#include <functional> +#include <algorithm> +#include <boost/graph/parallel/simple_trigger.hpp> + +#ifndef BOOST_GRAPH_USE_MPI +#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" +#endif + +namespace boost { namespace graph { namespace distributed { + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +BOOST_DISTRIBUTED_QUEUE_TYPE:: +distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, + const Buffer& buffer, bool polling) + : process_group(process_group, attach_distributed_object()), + owner(owner), + buffer(buffer), + polling(polling) +{ + if (!polling) + outgoing_buffers.reset( + new outgoing_buffers_t(num_processes(process_group))); + + setup_triggers(); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +BOOST_DISTRIBUTED_QUEUE_TYPE:: +distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, + const Buffer& buffer, const UnaryPredicate& pred, + bool polling) + : process_group(process_group, attach_distributed_object()), + owner(owner), + buffer(buffer), + pred(pred), + polling(polling) +{ + if (!polling) + outgoing_buffers.reset( + new outgoing_buffers_t(num_processes(process_group))); + + setup_triggers(); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +BOOST_DISTRIBUTED_QUEUE_TYPE:: +distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, + const UnaryPredicate& pred, bool polling) + : process_group(process_group, attach_distributed_object()), + owner(owner), + pred(pred), + polling(polling) +{ + if (!polling) + outgoing_buffers.reset( + new outgoing_buffers_t(num_processes(process_group))); + + setup_triggers(); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +void +BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) +{ + typename ProcessGroup::process_id_type dest = get(owner, x); + if (outgoing_buffers) + outgoing_buffers->at(dest).push_back(x); + else if (dest == process_id(process_group)) + buffer.push(x); + else + send(process_group, get(owner, x), msg_push, x); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +bool +BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const +{ + /* Processes will stay here until the buffer is nonempty or + synchronization with the other processes indicates that all local + buffers are empty (and no messages are in transit). + */ + while (buffer.empty() && !do_synchronize()) ; + + return buffer.empty(); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type +BOOST_DISTRIBUTED_QUEUE_TYPE::size() const +{ + empty(); + return buffer.size(); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() +{ + using boost::graph::parallel::simple_trigger; + + simple_trigger(process_group, msg_push, this, + &distributed_queue::handle_push); + simple_trigger(process_group, msg_multipush, this, + &distributed_queue::handle_multipush); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +void +BOOST_DISTRIBUTED_QUEUE_TYPE:: +handle_push(int /*source*/, int /*tag*/, const value_type& value, + trigger_receive_context) +{ + if (pred(value)) buffer.push(value); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +void +BOOST_DISTRIBUTED_QUEUE_TYPE:: +handle_multipush(int /*source*/, int /*tag*/, + const std::vector<value_type>& values, + trigger_receive_context) +{ + for (std::size_t i = 0; i < values.size(); ++i) + if (pred(values[i])) buffer.push(values[i]); +} + +template<BOOST_DISTRIBUTED_QUEUE_PARMS> +bool +BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const +{ +#ifdef PBGL_ACCOUNTING + ++num_synchronizations; +#endif + + using boost::parallel::all_reduce; + using std::swap; + + typedef typename ProcessGroup::process_id_type process_id_type; + + if (outgoing_buffers) { + // Transfer all of the push requests + process_id_type id = process_id(process_group); + process_id_type np = num_processes(process_group); + for (process_id_type dest = 0; dest < np; ++dest) { + outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); + std::size_t size = outgoing.size(); + if (size != 0) { + if (dest != id) { + send(process_group, dest, msg_multipush, outgoing); + } else { + for (std::size_t i = 0; i < size; ++i) + buffer.push(outgoing[i]); + } + outgoing.clear(); + } + } + } + synchronize(process_group); + + unsigned local_size = buffer.size(); + unsigned global_size = + all_reduce(process_group, local_size, std::plus<unsigned>()); + return global_size == 0; +} + +} } } // end namespace boost::graph::distributed