annotate DEPENDENCIES/generic/include/boost/graph/distributed/detail/queue.ipp @ 125:34e428693f5d vext

Vext -> Repoint
author Chris Cannam
date Thu, 14 Jun 2018 11:15:39 +0100
parents 2665513ce2d3
children
rev   line source
Chris@16 1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
Chris@16 2
Chris@16 3 // Use, modification and distribution is subject to the Boost Software
Chris@16 4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6
Chris@16 7 // Authors: Douglas Gregor
Chris@16 8 // Andrew Lumsdaine
Chris@16 9 #include <boost/optional.hpp>
Chris@16 10 #include <cassert>
Chris@16 11 #include <boost/graph/parallel/algorithm.hpp>
Chris@16 12 #include <boost/graph/parallel/process_group.hpp>
Chris@16 13 #include <functional>
Chris@16 14 #include <algorithm>
Chris@16 15 #include <boost/graph/parallel/simple_trigger.hpp>
Chris@16 16
Chris@16 17 #ifndef BOOST_GRAPH_USE_MPI
Chris@16 18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
Chris@16 19 #endif
Chris@16 20
Chris@16 21 namespace boost { namespace graph { namespace distributed {
Chris@16 22
Chris@16 23 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 24 BOOST_DISTRIBUTED_QUEUE_TYPE::
Chris@16 25 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
Chris@16 26 const Buffer& buffer, bool polling)
Chris@16 27 : process_group(process_group, attach_distributed_object()),
Chris@16 28 owner(owner),
Chris@16 29 buffer(buffer),
Chris@16 30 polling(polling)
Chris@16 31 {
Chris@16 32 if (!polling)
Chris@16 33 outgoing_buffers.reset(
Chris@16 34 new outgoing_buffers_t(num_processes(process_group)));
Chris@16 35
Chris@16 36 setup_triggers();
Chris@16 37 }
Chris@16 38
Chris@16 39 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 40 BOOST_DISTRIBUTED_QUEUE_TYPE::
Chris@16 41 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
Chris@16 42 const Buffer& buffer, const UnaryPredicate& pred,
Chris@16 43 bool polling)
Chris@16 44 : process_group(process_group, attach_distributed_object()),
Chris@16 45 owner(owner),
Chris@16 46 buffer(buffer),
Chris@16 47 pred(pred),
Chris@16 48 polling(polling)
Chris@16 49 {
Chris@16 50 if (!polling)
Chris@16 51 outgoing_buffers.reset(
Chris@16 52 new outgoing_buffers_t(num_processes(process_group)));
Chris@16 53
Chris@16 54 setup_triggers();
Chris@16 55 }
Chris@16 56
Chris@16 57 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 58 BOOST_DISTRIBUTED_QUEUE_TYPE::
Chris@16 59 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
Chris@16 60 const UnaryPredicate& pred, bool polling)
Chris@16 61 : process_group(process_group, attach_distributed_object()),
Chris@16 62 owner(owner),
Chris@16 63 pred(pred),
Chris@16 64 polling(polling)
Chris@16 65 {
Chris@16 66 if (!polling)
Chris@16 67 outgoing_buffers.reset(
Chris@16 68 new outgoing_buffers_t(num_processes(process_group)));
Chris@16 69
Chris@16 70 setup_triggers();
Chris@16 71 }
Chris@16 72
Chris@16 73 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 74 void
Chris@16 75 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
Chris@16 76 {
Chris@16 77 typename ProcessGroup::process_id_type dest = get(owner, x);
Chris@16 78 if (outgoing_buffers)
Chris@16 79 outgoing_buffers->at(dest).push_back(x);
Chris@16 80 else if (dest == process_id(process_group))
Chris@16 81 buffer.push(x);
Chris@16 82 else
Chris@16 83 send(process_group, get(owner, x), msg_push, x);
Chris@16 84 }
Chris@16 85
Chris@16 86 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 87 bool
Chris@16 88 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
Chris@16 89 {
Chris@16 90 /* Processes will stay here until the buffer is nonempty or
Chris@16 91 synchronization with the other processes indicates that all local
Chris@16 92 buffers are empty (and no messages are in transit).
Chris@16 93 */
Chris@16 94 while (buffer.empty() && !do_synchronize()) ;
Chris@16 95
Chris@16 96 return buffer.empty();
Chris@16 97 }
Chris@16 98
Chris@16 99 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
Chris@16 101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
Chris@16 102 {
Chris@16 103 empty();
Chris@16 104 return buffer.size();
Chris@16 105 }
Chris@16 106
Chris@16 107 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
Chris@16 109 {
Chris@16 110 using boost::graph::parallel::simple_trigger;
Chris@16 111
Chris@16 112 simple_trigger(process_group, msg_push, this,
Chris@16 113 &distributed_queue::handle_push);
Chris@16 114 simple_trigger(process_group, msg_multipush, this,
Chris@16 115 &distributed_queue::handle_multipush);
Chris@16 116 }
Chris@16 117
Chris@16 118 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 119 void
Chris@16 120 BOOST_DISTRIBUTED_QUEUE_TYPE::
Chris@16 121 handle_push(int /*source*/, int /*tag*/, const value_type& value,
Chris@16 122 trigger_receive_context)
Chris@16 123 {
Chris@16 124 if (pred(value)) buffer.push(value);
Chris@16 125 }
Chris@16 126
Chris@16 127 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 128 void
Chris@16 129 BOOST_DISTRIBUTED_QUEUE_TYPE::
Chris@16 130 handle_multipush(int /*source*/, int /*tag*/,
Chris@16 131 const std::vector<value_type>& values,
Chris@16 132 trigger_receive_context)
Chris@16 133 {
Chris@16 134 for (std::size_t i = 0; i < values.size(); ++i)
Chris@16 135 if (pred(values[i])) buffer.push(values[i]);
Chris@16 136 }
Chris@16 137
Chris@16 138 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
Chris@16 139 bool
Chris@16 140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
Chris@16 141 {
Chris@16 142 #ifdef PBGL_ACCOUNTING
Chris@16 143 ++num_synchronizations;
Chris@16 144 #endif
Chris@16 145
Chris@16 146 using boost::parallel::all_reduce;
Chris@16 147 using std::swap;
Chris@16 148
Chris@16 149 typedef typename ProcessGroup::process_id_type process_id_type;
Chris@16 150
Chris@16 151 if (outgoing_buffers) {
Chris@16 152 // Transfer all of the push requests
Chris@16 153 process_id_type id = process_id(process_group);
Chris@16 154 process_id_type np = num_processes(process_group);
Chris@16 155 for (process_id_type dest = 0; dest < np; ++dest) {
Chris@16 156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
Chris@16 157 std::size_t size = outgoing.size();
Chris@16 158 if (size != 0) {
Chris@16 159 if (dest != id) {
Chris@16 160 send(process_group, dest, msg_multipush, outgoing);
Chris@16 161 } else {
Chris@16 162 for (std::size_t i = 0; i < size; ++i)
Chris@16 163 buffer.push(outgoing[i]);
Chris@16 164 }
Chris@16 165 outgoing.clear();
Chris@16 166 }
Chris@16 167 }
Chris@16 168 }
Chris@16 169 synchronize(process_group);
Chris@16 170
Chris@16 171 unsigned local_size = buffer.size();
Chris@16 172 unsigned global_size =
Chris@16 173 all_reduce(process_group, local_size, std::plus<unsigned>());
Chris@16 174 return global_size == 0;
Chris@16 175 }
Chris@16 176
Chris@16 177 } } } // end namespace boost::graph::distributed