Chris@16
|
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
|
Chris@16
|
2
|
Chris@16
|
3 // Use, modification and distribution is subject to the Boost Software
|
Chris@16
|
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
5 // http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
6
|
Chris@16
|
7 // Authors: Douglas Gregor
|
Chris@16
|
8 // Andrew Lumsdaine
|
Chris@16
|
9 #include <boost/optional.hpp>
|
Chris@16
|
10 #include <cassert>
|
Chris@16
|
11 #include <boost/graph/parallel/algorithm.hpp>
|
Chris@16
|
12 #include <boost/graph/parallel/process_group.hpp>
|
Chris@16
|
13 #include <functional>
|
Chris@16
|
14 #include <algorithm>
|
Chris@16
|
15 #include <boost/graph/parallel/simple_trigger.hpp>
|
Chris@16
|
16
|
Chris@16
|
17 #ifndef BOOST_GRAPH_USE_MPI
|
Chris@16
|
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
Chris@16
|
19 #endif
|
Chris@16
|
20
|
Chris@16
|
21 namespace boost { namespace graph { namespace distributed {
|
Chris@16
|
22
|
Chris@16
|
23 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
24 BOOST_DISTRIBUTED_QUEUE_TYPE::
|
Chris@16
|
25 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
Chris@16
|
26 const Buffer& buffer, bool polling)
|
Chris@16
|
27 : process_group(process_group, attach_distributed_object()),
|
Chris@16
|
28 owner(owner),
|
Chris@16
|
29 buffer(buffer),
|
Chris@16
|
30 polling(polling)
|
Chris@16
|
31 {
|
Chris@16
|
32 if (!polling)
|
Chris@16
|
33 outgoing_buffers.reset(
|
Chris@16
|
34 new outgoing_buffers_t(num_processes(process_group)));
|
Chris@16
|
35
|
Chris@16
|
36 setup_triggers();
|
Chris@16
|
37 }
|
Chris@16
|
38
|
Chris@16
|
39 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
40 BOOST_DISTRIBUTED_QUEUE_TYPE::
|
Chris@16
|
41 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
Chris@16
|
42 const Buffer& buffer, const UnaryPredicate& pred,
|
Chris@16
|
43 bool polling)
|
Chris@16
|
44 : process_group(process_group, attach_distributed_object()),
|
Chris@16
|
45 owner(owner),
|
Chris@16
|
46 buffer(buffer),
|
Chris@16
|
47 pred(pred),
|
Chris@16
|
48 polling(polling)
|
Chris@16
|
49 {
|
Chris@16
|
50 if (!polling)
|
Chris@16
|
51 outgoing_buffers.reset(
|
Chris@16
|
52 new outgoing_buffers_t(num_processes(process_group)));
|
Chris@16
|
53
|
Chris@16
|
54 setup_triggers();
|
Chris@16
|
55 }
|
Chris@16
|
56
|
Chris@16
|
57 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
58 BOOST_DISTRIBUTED_QUEUE_TYPE::
|
Chris@16
|
59 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
Chris@16
|
60 const UnaryPredicate& pred, bool polling)
|
Chris@16
|
61 : process_group(process_group, attach_distributed_object()),
|
Chris@16
|
62 owner(owner),
|
Chris@16
|
63 pred(pred),
|
Chris@16
|
64 polling(polling)
|
Chris@16
|
65 {
|
Chris@16
|
66 if (!polling)
|
Chris@16
|
67 outgoing_buffers.reset(
|
Chris@16
|
68 new outgoing_buffers_t(num_processes(process_group)));
|
Chris@16
|
69
|
Chris@16
|
70 setup_triggers();
|
Chris@16
|
71 }
|
Chris@16
|
72
|
Chris@16
|
73 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
74 void
|
Chris@16
|
75 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
|
Chris@16
|
76 {
|
Chris@16
|
77 typename ProcessGroup::process_id_type dest = get(owner, x);
|
Chris@16
|
78 if (outgoing_buffers)
|
Chris@16
|
79 outgoing_buffers->at(dest).push_back(x);
|
Chris@16
|
80 else if (dest == process_id(process_group))
|
Chris@16
|
81 buffer.push(x);
|
Chris@16
|
82 else
|
Chris@16
|
83 send(process_group, get(owner, x), msg_push, x);
|
Chris@16
|
84 }
|
Chris@16
|
85
|
Chris@16
|
86 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
87 bool
|
Chris@16
|
88 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
|
Chris@16
|
89 {
|
Chris@16
|
90 /* Processes will stay here until the buffer is nonempty or
|
Chris@16
|
91 synchronization with the other processes indicates that all local
|
Chris@16
|
92 buffers are empty (and no messages are in transit).
|
Chris@16
|
93 */
|
Chris@16
|
94 while (buffer.empty() && !do_synchronize()) ;
|
Chris@16
|
95
|
Chris@16
|
96 return buffer.empty();
|
Chris@16
|
97 }
|
Chris@16
|
98
|
Chris@16
|
99 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
|
Chris@16
|
101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
|
Chris@16
|
102 {
|
Chris@16
|
103 empty();
|
Chris@16
|
104 return buffer.size();
|
Chris@16
|
105 }
|
Chris@16
|
106
|
Chris@16
|
107 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
|
Chris@16
|
109 {
|
Chris@16
|
110 using boost::graph::parallel::simple_trigger;
|
Chris@16
|
111
|
Chris@16
|
112 simple_trigger(process_group, msg_push, this,
|
Chris@16
|
113 &distributed_queue::handle_push);
|
Chris@16
|
114 simple_trigger(process_group, msg_multipush, this,
|
Chris@16
|
115 &distributed_queue::handle_multipush);
|
Chris@16
|
116 }
|
Chris@16
|
117
|
Chris@16
|
118 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
119 void
|
Chris@16
|
120 BOOST_DISTRIBUTED_QUEUE_TYPE::
|
Chris@16
|
121 handle_push(int /*source*/, int /*tag*/, const value_type& value,
|
Chris@16
|
122 trigger_receive_context)
|
Chris@16
|
123 {
|
Chris@16
|
124 if (pred(value)) buffer.push(value);
|
Chris@16
|
125 }
|
Chris@16
|
126
|
Chris@16
|
127 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
128 void
|
Chris@16
|
129 BOOST_DISTRIBUTED_QUEUE_TYPE::
|
Chris@16
|
130 handle_multipush(int /*source*/, int /*tag*/,
|
Chris@16
|
131 const std::vector<value_type>& values,
|
Chris@16
|
132 trigger_receive_context)
|
Chris@16
|
133 {
|
Chris@16
|
134 for (std::size_t i = 0; i < values.size(); ++i)
|
Chris@16
|
135 if (pred(values[i])) buffer.push(values[i]);
|
Chris@16
|
136 }
|
Chris@16
|
137
|
Chris@16
|
138 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
Chris@16
|
139 bool
|
Chris@16
|
140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
|
Chris@16
|
141 {
|
Chris@16
|
142 #ifdef PBGL_ACCOUNTING
|
Chris@16
|
143 ++num_synchronizations;
|
Chris@16
|
144 #endif
|
Chris@16
|
145
|
Chris@16
|
146 using boost::parallel::all_reduce;
|
Chris@16
|
147 using std::swap;
|
Chris@16
|
148
|
Chris@16
|
149 typedef typename ProcessGroup::process_id_type process_id_type;
|
Chris@16
|
150
|
Chris@16
|
151 if (outgoing_buffers) {
|
Chris@16
|
152 // Transfer all of the push requests
|
Chris@16
|
153 process_id_type id = process_id(process_group);
|
Chris@16
|
154 process_id_type np = num_processes(process_group);
|
Chris@16
|
155 for (process_id_type dest = 0; dest < np; ++dest) {
|
Chris@16
|
156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
|
Chris@16
|
157 std::size_t size = outgoing.size();
|
Chris@16
|
158 if (size != 0) {
|
Chris@16
|
159 if (dest != id) {
|
Chris@16
|
160 send(process_group, dest, msg_multipush, outgoing);
|
Chris@16
|
161 } else {
|
Chris@16
|
162 for (std::size_t i = 0; i < size; ++i)
|
Chris@16
|
163 buffer.push(outgoing[i]);
|
Chris@16
|
164 }
|
Chris@16
|
165 outgoing.clear();
|
Chris@16
|
166 }
|
Chris@16
|
167 }
|
Chris@16
|
168 }
|
Chris@16
|
169 synchronize(process_group);
|
Chris@16
|
170
|
Chris@16
|
171 unsigned local_size = buffer.size();
|
Chris@16
|
172 unsigned global_size =
|
Chris@16
|
173 all_reduce(process_group, local_size, std::plus<unsigned>());
|
Chris@16
|
174 return global_size == 0;
|
Chris@16
|
175 }
|
Chris@16
|
176
|
Chris@16
|
177 } } } // end namespace boost::graph::distributed
|