Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/generic/include/boost/graph/distributed/detail/queue.ipp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
15:663ca0da4350 | 16:2665513ce2d3 |
---|---|
1 // Copyright (C) 2004-2006 The Trustees of Indiana University. | |
2 | |
3 // Use, modification and distribution is subject to the Boost Software | |
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 // http://www.boost.org/LICENSE_1_0.txt) | |
6 | |
7 // Authors: Douglas Gregor | |
8 // Andrew Lumsdaine | |
9 #include <boost/optional.hpp> | |
10 #include <cassert> | |
11 #include <boost/graph/parallel/algorithm.hpp> | |
12 #include <boost/graph/parallel/process_group.hpp> | |
13 #include <functional> | |
14 #include <algorithm> | |
15 #include <boost/graph/parallel/simple_trigger.hpp> | |
16 | |
17 #ifndef BOOST_GRAPH_USE_MPI | |
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
19 #endif | |
20 | |
21 namespace boost { namespace graph { namespace distributed { | |
22 | |
23 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
24 BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
25 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
26 const Buffer& buffer, bool polling) | |
27 : process_group(process_group, attach_distributed_object()), | |
28 owner(owner), | |
29 buffer(buffer), | |
30 polling(polling) | |
31 { | |
32 if (!polling) | |
33 outgoing_buffers.reset( | |
34 new outgoing_buffers_t(num_processes(process_group))); | |
35 | |
36 setup_triggers(); | |
37 } | |
38 | |
39 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
40 BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
41 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
42 const Buffer& buffer, const UnaryPredicate& pred, | |
43 bool polling) | |
44 : process_group(process_group, attach_distributed_object()), | |
45 owner(owner), | |
46 buffer(buffer), | |
47 pred(pred), | |
48 polling(polling) | |
49 { | |
50 if (!polling) | |
51 outgoing_buffers.reset( | |
52 new outgoing_buffers_t(num_processes(process_group))); | |
53 | |
54 setup_triggers(); | |
55 } | |
56 | |
57 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
58 BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
59 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
60 const UnaryPredicate& pred, bool polling) | |
61 : process_group(process_group, attach_distributed_object()), | |
62 owner(owner), | |
63 pred(pred), | |
64 polling(polling) | |
65 { | |
66 if (!polling) | |
67 outgoing_buffers.reset( | |
68 new outgoing_buffers_t(num_processes(process_group))); | |
69 | |
70 setup_triggers(); | |
71 } | |
72 | |
73 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
74 void | |
75 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) | |
76 { | |
77 typename ProcessGroup::process_id_type dest = get(owner, x); | |
78 if (outgoing_buffers) | |
79 outgoing_buffers->at(dest).push_back(x); | |
80 else if (dest == process_id(process_group)) | |
81 buffer.push(x); | |
82 else | |
83 send(process_group, get(owner, x), msg_push, x); | |
84 } | |
85 | |
86 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
87 bool | |
88 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const | |
89 { | |
90 /* Processes will stay here until the buffer is nonempty or | |
91 synchronization with the other processes indicates that all local | |
92 buffers are empty (and no messages are in transit). | |
93 */ | |
94 while (buffer.empty() && !do_synchronize()) ; | |
95 | |
96 return buffer.empty(); | |
97 } | |
98 | |
99 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type | |
101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const | |
102 { | |
103 empty(); | |
104 return buffer.size(); | |
105 } | |
106 | |
107 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() | |
109 { | |
110 using boost::graph::parallel::simple_trigger; | |
111 | |
112 simple_trigger(process_group, msg_push, this, | |
113 &distributed_queue::handle_push); | |
114 simple_trigger(process_group, msg_multipush, this, | |
115 &distributed_queue::handle_multipush); | |
116 } | |
117 | |
118 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
119 void | |
120 BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
121 handle_push(int /*source*/, int /*tag*/, const value_type& value, | |
122 trigger_receive_context) | |
123 { | |
124 if (pred(value)) buffer.push(value); | |
125 } | |
126 | |
127 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
128 void | |
129 BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
130 handle_multipush(int /*source*/, int /*tag*/, | |
131 const std::vector<value_type>& values, | |
132 trigger_receive_context) | |
133 { | |
134 for (std::size_t i = 0; i < values.size(); ++i) | |
135 if (pred(values[i])) buffer.push(values[i]); | |
136 } | |
137 | |
138 template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
139 bool | |
140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const | |
141 { | |
142 #ifdef PBGL_ACCOUNTING | |
143 ++num_synchronizations; | |
144 #endif | |
145 | |
146 using boost::parallel::all_reduce; | |
147 using std::swap; | |
148 | |
149 typedef typename ProcessGroup::process_id_type process_id_type; | |
150 | |
151 if (outgoing_buffers) { | |
152 // Transfer all of the push requests | |
153 process_id_type id = process_id(process_group); | |
154 process_id_type np = num_processes(process_group); | |
155 for (process_id_type dest = 0; dest < np; ++dest) { | |
156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); | |
157 std::size_t size = outgoing.size(); | |
158 if (size != 0) { | |
159 if (dest != id) { | |
160 send(process_group, dest, msg_multipush, outgoing); | |
161 } else { | |
162 for (std::size_t i = 0; i < size; ++i) | |
163 buffer.push(outgoing[i]); | |
164 } | |
165 outgoing.clear(); | |
166 } | |
167 } | |
168 } | |
169 synchronize(process_group); | |
170 | |
171 unsigned local_size = buffer.size(); | |
172 unsigned global_size = | |
173 all_reduce(process_group, local_size, std::plus<unsigned>()); | |
174 return global_size == 0; | |
175 } | |
176 | |
177 } } } // end namespace boost::graph::distributed |