Chris@16
|
1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
|
Chris@16
|
2
|
Chris@16
|
3 // Use, modification and distribution is subject to the Boost Software
|
Chris@16
|
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
5 // http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
6
|
Chris@16
|
7 // Authors: Douglas Gregor
|
Chris@16
|
8 // Andrew Lumsdaine
|
Chris@16
|
9 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
|
Chris@16
|
10 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
|
Chris@16
|
11
|
Chris@16
|
12 #ifndef BOOST_GRAPH_USE_MPI
|
Chris@16
|
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
Chris@16
|
14 #endif
|
Chris@16
|
15
|
Chris@16
|
16 #include <boost/graph/parallel/process_group.hpp>
|
Chris@16
|
17 #include <boost/type_traits/is_convertible.hpp>
|
Chris@16
|
18 #include <vector>
|
Chris@16
|
19 #include <boost/assert.hpp>
|
Chris@16
|
20 #include <boost/optional.hpp>
|
Chris@16
|
21 #include <queue>
|
Chris@16
|
22
|
Chris@16
|
23 namespace boost { namespace graph { namespace detail {
|
Chris@16
|
24
|
Chris@16
|
25 template<typename ProcessGroup>
|
Chris@16
|
26 void do_synchronize(ProcessGroup& pg)
|
Chris@16
|
27 {
|
Chris@16
|
28 using boost::parallel::synchronize;
|
Chris@16
|
29 synchronize(pg);
|
Chris@16
|
30 }
|
Chris@16
|
31
|
Chris@16
|
32 struct remote_set_queued {};
|
Chris@16
|
33 struct remote_set_immediate {};
|
Chris@16
|
34
|
Chris@16
|
35 template<typename ProcessGroup>
|
Chris@16
|
36 class remote_set_semantics
|
Chris@16
|
37 {
|
Chris@16
|
38 BOOST_STATIC_CONSTANT
|
Chris@16
|
39 (bool,
|
Chris@16
|
40 queued = (is_convertible<
|
Chris@16
|
41 typename ProcessGroup::communication_category,
|
Chris@101
|
42 boost::parallel::bsp_process_group_tag>::value));
|
Chris@16
|
43
|
Chris@16
|
44 public:
|
Chris@16
|
45 typedef typename mpl::if_c<queued,
|
Chris@16
|
46 remote_set_queued,
|
Chris@16
|
47 remote_set_immediate>::type type;
|
Chris@16
|
48 };
|
Chris@16
|
49
|
Chris@16
|
50
|
Chris@16
|
51 template<typename Derived, typename ProcessGroup, typename Value,
|
Chris@16
|
52 typename OwnerMap,
|
Chris@16
|
53 typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
|
Chris@16
|
54 class remote_update_set;
|
Chris@16
|
55
|
Chris@16
|
56 /**********************************************************************
|
Chris@16
|
57 * Remote updating set that queues messages until synchronization *
|
Chris@16
|
58 **********************************************************************/
|
Chris@16
|
59 template<typename Derived, typename ProcessGroup, typename Value,
|
Chris@16
|
60 typename OwnerMap>
|
Chris@16
|
61 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
|
Chris@16
|
62 remote_set_queued>
|
Chris@16
|
63 {
|
Chris@16
|
64 typedef typename property_traits<OwnerMap>::key_type Key;
|
Chris@16
|
65 typedef std::vector<std::pair<Key, Value> > Updates;
|
Chris@16
|
66 typedef typename Updates::size_type updates_size_type;
|
Chris@16
|
67 typedef typename Updates::value_type updates_pair_type;
|
Chris@16
|
68
|
Chris@16
|
69 public:
|
Chris@16
|
70
|
Chris@16
|
71 private:
|
Chris@16
|
72 typedef typename ProcessGroup::process_id_type process_id_type;
|
Chris@16
|
73
|
Chris@16
|
74 enum message_kind {
|
Chris@16
|
75 /** Message containing the number of updates that will be sent in
|
Chris@16
|
76 * a msg_updates message that will immediately follow. This
|
Chris@16
|
77 * message will contain a single value of type
|
Chris@16
|
78 * updates_size_type.
|
Chris@16
|
79 */
|
Chris@16
|
80 msg_num_updates,
|
Chris@16
|
81
|
Chris@16
|
82 /** Contains (key, value) pairs with all of the updates from a
|
Chris@16
|
83 * particular source. The number of updates is variable, but will
|
Chris@16
|
84 * be provided in a msg_num_updates message that immediately
|
Chris@16
|
85 * preceeds this message.
|
Chris@16
|
86 *
|
Chris@16
|
87 */
|
Chris@16
|
88 msg_updates
|
Chris@16
|
89 };
|
Chris@16
|
90
|
Chris@16
|
91 struct handle_messages
|
Chris@16
|
92 {
|
Chris@16
|
93 explicit
|
Chris@16
|
94 handle_messages(remote_update_set* self, const ProcessGroup& pg)
|
Chris@16
|
95 : self(self), update_sizes(num_processes(pg), 0) { }
|
Chris@16
|
96
|
Chris@16
|
97 void operator()(process_id_type source, int tag)
|
Chris@16
|
98 {
|
Chris@16
|
99 switch(tag) {
|
Chris@16
|
100 case msg_num_updates:
|
Chris@16
|
101 {
|
Chris@16
|
102 // Receive the # of updates
|
Chris@16
|
103 updates_size_type num_updates;
|
Chris@16
|
104 receive(self->process_group, source, tag, num_updates);
|
Chris@16
|
105
|
Chris@16
|
106 update_sizes[source] = num_updates;
|
Chris@16
|
107 }
|
Chris@16
|
108 break;
|
Chris@16
|
109
|
Chris@16
|
110 case msg_updates:
|
Chris@16
|
111 {
|
Chris@16
|
112 updates_size_type num_updates = update_sizes[source];
|
Chris@16
|
113 BOOST_ASSERT(num_updates);
|
Chris@16
|
114
|
Chris@16
|
115 // Receive the actual updates
|
Chris@16
|
116 std::vector<updates_pair_type> updates(num_updates);
|
Chris@16
|
117 receive(self->process_group, source, msg_updates, &updates[0],
|
Chris@16
|
118 num_updates);
|
Chris@16
|
119
|
Chris@16
|
120 // Send updates to derived "receive_update" member
|
Chris@16
|
121 Derived* derived = static_cast<Derived*>(self);
|
Chris@16
|
122 for (updates_size_type u = 0; u < num_updates; ++u)
|
Chris@16
|
123 derived->receive_update(source, updates[u].first, updates[u].second);
|
Chris@16
|
124
|
Chris@16
|
125 update_sizes[source] = 0;
|
Chris@16
|
126 }
|
Chris@16
|
127 break;
|
Chris@16
|
128 };
|
Chris@16
|
129 }
|
Chris@16
|
130
|
Chris@16
|
131 private:
|
Chris@16
|
132 remote_update_set* self;
|
Chris@16
|
133 std::vector<updates_size_type> update_sizes;
|
Chris@16
|
134 };
|
Chris@16
|
135 friend struct handle_messages;
|
Chris@16
|
136
|
Chris@16
|
137 protected:
|
Chris@16
|
138 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
|
Chris@16
|
139 : process_group(pg, handle_messages(this, pg)),
|
Chris@16
|
140 updates(num_processes(pg)), owner(owner) {
|
Chris@16
|
141 }
|
Chris@16
|
142
|
Chris@16
|
143
|
Chris@16
|
144 void update(const Key& key, const Value& value)
|
Chris@16
|
145 {
|
Chris@16
|
146 if (get(owner, key) == process_id(process_group)) {
|
Chris@16
|
147 Derived* derived = static_cast<Derived*>(this);
|
Chris@16
|
148 derived->receive_update(get(owner, key), key, value);
|
Chris@16
|
149 }
|
Chris@16
|
150 else {
|
Chris@16
|
151 updates[get(owner, key)].push_back(std::make_pair(key, value));
|
Chris@16
|
152 }
|
Chris@16
|
153 }
|
Chris@16
|
154
|
Chris@16
|
155 void collect() { }
|
Chris@16
|
156
|
Chris@16
|
157 void synchronize()
|
Chris@16
|
158 {
|
Chris@16
|
159 // Emit all updates and then remove them
|
Chris@16
|
160 process_id_type num_processes = updates.size();
|
Chris@16
|
161 for (process_id_type p = 0; p < num_processes; ++p) {
|
Chris@16
|
162 if (!updates[p].empty()) {
|
Chris@16
|
163 send(process_group, p, msg_num_updates, updates[p].size());
|
Chris@16
|
164 send(process_group, p, msg_updates,
|
Chris@16
|
165 &updates[p].front(), updates[p].size());
|
Chris@16
|
166 updates[p].clear();
|
Chris@16
|
167 }
|
Chris@16
|
168 }
|
Chris@16
|
169
|
Chris@16
|
170 do_synchronize(process_group);
|
Chris@16
|
171 }
|
Chris@16
|
172
|
Chris@16
|
173 ProcessGroup process_group;
|
Chris@16
|
174
|
Chris@16
|
175 private:
|
Chris@16
|
176 std::vector<Updates> updates;
|
Chris@16
|
177 OwnerMap owner;
|
Chris@16
|
178 };
|
Chris@16
|
179
|
Chris@16
|
180 /**********************************************************************
|
Chris@16
|
181 * Remote updating set that sends messages immediately *
|
Chris@16
|
182 **********************************************************************/
|
Chris@16
|
183 template<typename Derived, typename ProcessGroup, typename Value,
|
Chris@16
|
184 typename OwnerMap>
|
Chris@16
|
185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
|
Chris@16
|
186 remote_set_immediate>
|
Chris@16
|
187 {
|
Chris@16
|
188 typedef typename property_traits<OwnerMap>::key_type Key;
|
Chris@16
|
189 typedef std::pair<Key, Value> update_pair_type;
|
Chris@16
|
190 typedef typename std::vector<update_pair_type>::size_type updates_size_type;
|
Chris@16
|
191
|
Chris@16
|
192 public:
|
Chris@16
|
193 typedef typename ProcessGroup::process_id_type process_id_type;
|
Chris@16
|
194
|
Chris@16
|
195 private:
|
Chris@16
|
196 enum message_kind {
|
Chris@16
|
197 /** Contains a (key, value) pair that will be updated. */
|
Chris@16
|
198 msg_update
|
Chris@16
|
199 };
|
Chris@16
|
200
|
Chris@16
|
201 struct handle_messages
|
Chris@16
|
202 {
|
Chris@16
|
203 explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
|
Chris@16
|
204 : self(self)
|
Chris@16
|
205 { update_sizes.resize(num_processes(pg), 0); }
|
Chris@16
|
206
|
Chris@16
|
207 void operator()(process_id_type source, int tag)
|
Chris@16
|
208 {
|
Chris@16
|
209 // Receive the # of updates
|
Chris@16
|
210 BOOST_ASSERT(tag == msg_update);
|
Chris@16
|
211 update_pair_type update;
|
Chris@16
|
212 receive(self->process_group, source, tag, update);
|
Chris@16
|
213
|
Chris@16
|
214 // Send update to derived "receive_update" member
|
Chris@16
|
215 Derived* derived = static_cast<Derived*>(self);
|
Chris@16
|
216 derived->receive_update(source, update.first, update.second);
|
Chris@16
|
217 }
|
Chris@16
|
218
|
Chris@16
|
219 private:
|
Chris@16
|
220 std::vector<updates_size_type> update_sizes;
|
Chris@16
|
221 remote_update_set* self;
|
Chris@16
|
222 };
|
Chris@16
|
223 friend struct handle_messages;
|
Chris@16
|
224
|
Chris@16
|
225 protected:
|
Chris@16
|
226 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
|
Chris@16
|
227 : process_group(pg, handle_messages(this, pg)), owner(owner) { }
|
Chris@16
|
228
|
Chris@16
|
229 void update(const Key& key, const Value& value)
|
Chris@16
|
230 {
|
Chris@16
|
231 if (get(owner, key) == process_id(process_group)) {
|
Chris@16
|
232 Derived* derived = static_cast<Derived*>(this);
|
Chris@16
|
233 derived->receive_update(get(owner, key), key, value);
|
Chris@16
|
234 }
|
Chris@16
|
235 else
|
Chris@16
|
236 send(process_group, get(owner, key), msg_update,
|
Chris@16
|
237 update_pair_type(key, value));
|
Chris@16
|
238 }
|
Chris@16
|
239
|
Chris@16
|
240 void collect()
|
Chris@16
|
241 {
|
Chris@16
|
242 typedef std::pair<process_id_type, int> probe_type;
|
Chris@16
|
243 handle_messages handler(this, process_group);
|
Chris@16
|
244 while (optional<probe_type> stp = probe(process_group))
|
Chris@16
|
245 if (stp->second == msg_update) handler(stp->first, stp->second);
|
Chris@16
|
246 }
|
Chris@16
|
247
|
Chris@16
|
248 void synchronize()
|
Chris@16
|
249 {
|
Chris@16
|
250 do_synchronize(process_group);
|
Chris@16
|
251 }
|
Chris@16
|
252
|
Chris@16
|
253 ProcessGroup process_group;
|
Chris@16
|
254 OwnerMap owner;
|
Chris@16
|
255 };
|
Chris@16
|
256
|
Chris@16
|
257 } } } // end namespace boost::graph::detail
|
Chris@16
|
258
|
Chris@16
|
259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
|