annotate DEPENDENCIES/generic/include/boost/graph/distributed/detail/remote_update_set.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
Chris@16 2
Chris@16 3 // Use, modification and distribution is subject to the Boost Software
Chris@16 4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6
Chris@16 7 // Authors: Douglas Gregor
Chris@16 8 // Andrew Lumsdaine
Chris@16 9 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
Chris@16 10 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
Chris@16 11
Chris@16 12 #ifndef BOOST_GRAPH_USE_MPI
Chris@16 13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
Chris@16 14 #endif
Chris@16 15
Chris@16 16 #include <boost/graph/parallel/process_group.hpp>
Chris@16 17 #include <boost/type_traits/is_convertible.hpp>
Chris@16 18 #include <vector>
Chris@16 19 #include <boost/assert.hpp>
Chris@16 20 #include <boost/optional.hpp>
Chris@16 21 #include <queue>
Chris@16 22
Chris@16 23 namespace boost { namespace graph { namespace detail {
Chris@16 24
Chris@16 25 template<typename ProcessGroup>
Chris@16 26 void do_synchronize(ProcessGroup& pg)
Chris@16 27 {
Chris@16 28 using boost::parallel::synchronize;
Chris@16 29 synchronize(pg);
Chris@16 30 }
Chris@16 31
Chris@16 32 struct remote_set_queued {};
Chris@16 33 struct remote_set_immediate {};
Chris@16 34
Chris@16 35 template<typename ProcessGroup>
Chris@16 36 class remote_set_semantics
Chris@16 37 {
Chris@16 38 BOOST_STATIC_CONSTANT
Chris@16 39 (bool,
Chris@16 40 queued = (is_convertible<
Chris@16 41 typename ProcessGroup::communication_category,
Chris@101 42 boost::parallel::bsp_process_group_tag>::value));
Chris@16 43
Chris@16 44 public:
Chris@16 45 typedef typename mpl::if_c<queued,
Chris@16 46 remote_set_queued,
Chris@16 47 remote_set_immediate>::type type;
Chris@16 48 };
Chris@16 49
Chris@16 50
Chris@16 51 template<typename Derived, typename ProcessGroup, typename Value,
Chris@16 52 typename OwnerMap,
Chris@16 53 typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
Chris@16 54 class remote_update_set;
Chris@16 55
Chris@16 56 /**********************************************************************
Chris@16 57 * Remote updating set that queues messages until synchronization *
Chris@16 58 **********************************************************************/
Chris@16 59 template<typename Derived, typename ProcessGroup, typename Value,
Chris@16 60 typename OwnerMap>
Chris@16 61 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
Chris@16 62 remote_set_queued>
Chris@16 63 {
Chris@16 64 typedef typename property_traits<OwnerMap>::key_type Key;
Chris@16 65 typedef std::vector<std::pair<Key, Value> > Updates;
Chris@16 66 typedef typename Updates::size_type updates_size_type;
Chris@16 67 typedef typename Updates::value_type updates_pair_type;
Chris@16 68
Chris@16 69 public:
Chris@16 70
Chris@16 71 private:
Chris@16 72 typedef typename ProcessGroup::process_id_type process_id_type;
Chris@16 73
Chris@16 74 enum message_kind {
Chris@16 75 /** Message containing the number of updates that will be sent in
Chris@16 76 * a msg_updates message that will immediately follow. This
Chris@16 77 * message will contain a single value of type
Chris@16 78 * updates_size_type.
Chris@16 79 */
Chris@16 80 msg_num_updates,
Chris@16 81
Chris@16 82 /** Contains (key, value) pairs with all of the updates from a
Chris@16 83 * particular source. The number of updates is variable, but will
Chris@16 84 * be provided in a msg_num_updates message that immediately
Chris@16 85 * preceeds this message.
Chris@16 86 *
Chris@16 87 */
Chris@16 88 msg_updates
Chris@16 89 };
Chris@16 90
Chris@16 91 struct handle_messages
Chris@16 92 {
Chris@16 93 explicit
Chris@16 94 handle_messages(remote_update_set* self, const ProcessGroup& pg)
Chris@16 95 : self(self), update_sizes(num_processes(pg), 0) { }
Chris@16 96
Chris@16 97 void operator()(process_id_type source, int tag)
Chris@16 98 {
Chris@16 99 switch(tag) {
Chris@16 100 case msg_num_updates:
Chris@16 101 {
Chris@16 102 // Receive the # of updates
Chris@16 103 updates_size_type num_updates;
Chris@16 104 receive(self->process_group, source, tag, num_updates);
Chris@16 105
Chris@16 106 update_sizes[source] = num_updates;
Chris@16 107 }
Chris@16 108 break;
Chris@16 109
Chris@16 110 case msg_updates:
Chris@16 111 {
Chris@16 112 updates_size_type num_updates = update_sizes[source];
Chris@16 113 BOOST_ASSERT(num_updates);
Chris@16 114
Chris@16 115 // Receive the actual updates
Chris@16 116 std::vector<updates_pair_type> updates(num_updates);
Chris@16 117 receive(self->process_group, source, msg_updates, &updates[0],
Chris@16 118 num_updates);
Chris@16 119
Chris@16 120 // Send updates to derived "receive_update" member
Chris@16 121 Derived* derived = static_cast<Derived*>(self);
Chris@16 122 for (updates_size_type u = 0; u < num_updates; ++u)
Chris@16 123 derived->receive_update(source, updates[u].first, updates[u].second);
Chris@16 124
Chris@16 125 update_sizes[source] = 0;
Chris@16 126 }
Chris@16 127 break;
Chris@16 128 };
Chris@16 129 }
Chris@16 130
Chris@16 131 private:
Chris@16 132 remote_update_set* self;
Chris@16 133 std::vector<updates_size_type> update_sizes;
Chris@16 134 };
Chris@16 135 friend struct handle_messages;
Chris@16 136
Chris@16 137 protected:
Chris@16 138 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
Chris@16 139 : process_group(pg, handle_messages(this, pg)),
Chris@16 140 updates(num_processes(pg)), owner(owner) {
Chris@16 141 }
Chris@16 142
Chris@16 143
Chris@16 144 void update(const Key& key, const Value& value)
Chris@16 145 {
Chris@16 146 if (get(owner, key) == process_id(process_group)) {
Chris@16 147 Derived* derived = static_cast<Derived*>(this);
Chris@16 148 derived->receive_update(get(owner, key), key, value);
Chris@16 149 }
Chris@16 150 else {
Chris@16 151 updates[get(owner, key)].push_back(std::make_pair(key, value));
Chris@16 152 }
Chris@16 153 }
Chris@16 154
Chris@16 155 void collect() { }
Chris@16 156
Chris@16 157 void synchronize()
Chris@16 158 {
Chris@16 159 // Emit all updates and then remove them
Chris@16 160 process_id_type num_processes = updates.size();
Chris@16 161 for (process_id_type p = 0; p < num_processes; ++p) {
Chris@16 162 if (!updates[p].empty()) {
Chris@16 163 send(process_group, p, msg_num_updates, updates[p].size());
Chris@16 164 send(process_group, p, msg_updates,
Chris@16 165 &updates[p].front(), updates[p].size());
Chris@16 166 updates[p].clear();
Chris@16 167 }
Chris@16 168 }
Chris@16 169
Chris@16 170 do_synchronize(process_group);
Chris@16 171 }
Chris@16 172
Chris@16 173 ProcessGroup process_group;
Chris@16 174
Chris@16 175 private:
Chris@16 176 std::vector<Updates> updates;
Chris@16 177 OwnerMap owner;
Chris@16 178 };
Chris@16 179
Chris@16 180 /**********************************************************************
Chris@16 181 * Remote updating set that sends messages immediately *
Chris@16 182 **********************************************************************/
Chris@16 183 template<typename Derived, typename ProcessGroup, typename Value,
Chris@16 184 typename OwnerMap>
Chris@16 185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
Chris@16 186 remote_set_immediate>
Chris@16 187 {
Chris@16 188 typedef typename property_traits<OwnerMap>::key_type Key;
Chris@16 189 typedef std::pair<Key, Value> update_pair_type;
Chris@16 190 typedef typename std::vector<update_pair_type>::size_type updates_size_type;
Chris@16 191
Chris@16 192 public:
Chris@16 193 typedef typename ProcessGroup::process_id_type process_id_type;
Chris@16 194
Chris@16 195 private:
Chris@16 196 enum message_kind {
Chris@16 197 /** Contains a (key, value) pair that will be updated. */
Chris@16 198 msg_update
Chris@16 199 };
Chris@16 200
Chris@16 201 struct handle_messages
Chris@16 202 {
Chris@16 203 explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
Chris@16 204 : self(self)
Chris@16 205 { update_sizes.resize(num_processes(pg), 0); }
Chris@16 206
Chris@16 207 void operator()(process_id_type source, int tag)
Chris@16 208 {
Chris@16 209 // Receive the # of updates
Chris@16 210 BOOST_ASSERT(tag == msg_update);
Chris@16 211 update_pair_type update;
Chris@16 212 receive(self->process_group, source, tag, update);
Chris@16 213
Chris@16 214 // Send update to derived "receive_update" member
Chris@16 215 Derived* derived = static_cast<Derived*>(self);
Chris@16 216 derived->receive_update(source, update.first, update.second);
Chris@16 217 }
Chris@16 218
Chris@16 219 private:
Chris@16 220 std::vector<updates_size_type> update_sizes;
Chris@16 221 remote_update_set* self;
Chris@16 222 };
Chris@16 223 friend struct handle_messages;
Chris@16 224
Chris@16 225 protected:
Chris@16 226 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
Chris@16 227 : process_group(pg, handle_messages(this, pg)), owner(owner) { }
Chris@16 228
Chris@16 229 void update(const Key& key, const Value& value)
Chris@16 230 {
Chris@16 231 if (get(owner, key) == process_id(process_group)) {
Chris@16 232 Derived* derived = static_cast<Derived*>(this);
Chris@16 233 derived->receive_update(get(owner, key), key, value);
Chris@16 234 }
Chris@16 235 else
Chris@16 236 send(process_group, get(owner, key), msg_update,
Chris@16 237 update_pair_type(key, value));
Chris@16 238 }
Chris@16 239
Chris@16 240 void collect()
Chris@16 241 {
Chris@16 242 typedef std::pair<process_id_type, int> probe_type;
Chris@16 243 handle_messages handler(this, process_group);
Chris@16 244 while (optional<probe_type> stp = probe(process_group))
Chris@16 245 if (stp->second == msg_update) handler(stp->first, stp->second);
Chris@16 246 }
Chris@16 247
Chris@16 248 void synchronize()
Chris@16 249 {
Chris@16 250 do_synchronize(process_group);
Chris@16 251 }
Chris@16 252
Chris@16 253 ProcessGroup process_group;
Chris@16 254 OwnerMap owner;
Chris@16 255 };
Chris@16 256
Chris@16 257 } } } // end namespace boost::graph::detail
Chris@16 258
Chris@16 259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP