Chris@16: // Copyright (C) 2005-2006 The Trustees of Indiana University. Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP Chris@16: #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: namespace boost { namespace graph { namespace detail { Chris@16: Chris@16: template Chris@16: void do_synchronize(ProcessGroup& pg) Chris@16: { Chris@16: using boost::parallel::synchronize; Chris@16: synchronize(pg); Chris@16: } Chris@16: Chris@16: struct remote_set_queued {}; Chris@16: struct remote_set_immediate {}; Chris@16: Chris@16: template Chris@16: class remote_set_semantics Chris@16: { Chris@16: BOOST_STATIC_CONSTANT Chris@16: (bool, Chris@16: queued = (is_convertible< Chris@16: typename ProcessGroup::communication_category, Chris@101: boost::parallel::bsp_process_group_tag>::value)); Chris@16: Chris@16: public: Chris@16: typedef typename mpl::if_c::type type; Chris@16: }; Chris@16: Chris@16: Chris@16: template::type> Chris@16: class remote_update_set; Chris@16: Chris@16: /********************************************************************** Chris@16: * Remote updating set that queues messages until synchronization * Chris@16: **********************************************************************/ Chris@16: template Chris@16: class remote_update_set Chris@16: { Chris@16: typedef typename property_traits::key_type Key; Chris@16: typedef std::vector > Updates; Chris@16: typedef typename Updates::size_type updates_size_type; Chris@16: typedef typename Updates::value_type updates_pair_type; Chris@16: Chris@16: public: Chris@16: Chris@16: private: Chris@16: typedef typename ProcessGroup::process_id_type process_id_type; Chris@16: Chris@16: enum message_kind { Chris@16: /** Message containing the number of updates that will be sent in Chris@16: * a msg_updates message that will immediately follow. This Chris@16: * message will contain a single value of type Chris@16: * updates_size_type. Chris@16: */ Chris@16: msg_num_updates, Chris@16: Chris@16: /** Contains (key, value) pairs with all of the updates from a Chris@16: * particular source. The number of updates is variable, but will Chris@16: * be provided in a msg_num_updates message that immediately Chris@16: * preceeds this message. Chris@16: * Chris@16: */ Chris@16: msg_updates Chris@16: }; Chris@16: Chris@16: struct handle_messages Chris@16: { Chris@16: explicit Chris@16: handle_messages(remote_update_set* self, const ProcessGroup& pg) Chris@16: : self(self), update_sizes(num_processes(pg), 0) { } Chris@16: Chris@16: void operator()(process_id_type source, int tag) Chris@16: { Chris@16: switch(tag) { Chris@16: case msg_num_updates: Chris@16: { Chris@16: // Receive the # of updates Chris@16: updates_size_type num_updates; Chris@16: receive(self->process_group, source, tag, num_updates); Chris@16: Chris@16: update_sizes[source] = num_updates; Chris@16: } Chris@16: break; Chris@16: Chris@16: case msg_updates: Chris@16: { Chris@16: updates_size_type num_updates = update_sizes[source]; Chris@16: BOOST_ASSERT(num_updates); Chris@16: Chris@16: // Receive the actual updates Chris@16: std::vector updates(num_updates); Chris@16: receive(self->process_group, source, msg_updates, &updates[0], Chris@16: num_updates); Chris@16: Chris@16: // Send updates to derived "receive_update" member Chris@16: Derived* derived = static_cast(self); Chris@16: for (updates_size_type u = 0; u < num_updates; ++u) Chris@16: derived->receive_update(source, updates[u].first, updates[u].second); Chris@16: Chris@16: update_sizes[source] = 0; Chris@16: } Chris@16: break; Chris@16: }; Chris@16: } Chris@16: Chris@16: private: Chris@16: remote_update_set* self; Chris@16: std::vector update_sizes; Chris@16: }; Chris@16: friend struct handle_messages; Chris@16: Chris@16: protected: Chris@16: remote_update_set(const ProcessGroup& pg, const OwnerMap& owner) Chris@16: : process_group(pg, handle_messages(this, pg)), Chris@16: updates(num_processes(pg)), owner(owner) { Chris@16: } Chris@16: Chris@16: Chris@16: void update(const Key& key, const Value& value) Chris@16: { Chris@16: if (get(owner, key) == process_id(process_group)) { Chris@16: Derived* derived = static_cast(this); Chris@16: derived->receive_update(get(owner, key), key, value); Chris@16: } Chris@16: else { Chris@16: updates[get(owner, key)].push_back(std::make_pair(key, value)); Chris@16: } Chris@16: } Chris@16: Chris@16: void collect() { } Chris@16: Chris@16: void synchronize() Chris@16: { Chris@16: // Emit all updates and then remove them Chris@16: process_id_type num_processes = updates.size(); Chris@16: for (process_id_type p = 0; p < num_processes; ++p) { Chris@16: if (!updates[p].empty()) { Chris@16: send(process_group, p, msg_num_updates, updates[p].size()); Chris@16: send(process_group, p, msg_updates, Chris@16: &updates[p].front(), updates[p].size()); Chris@16: updates[p].clear(); Chris@16: } Chris@16: } Chris@16: Chris@16: do_synchronize(process_group); Chris@16: } Chris@16: Chris@16: ProcessGroup process_group; Chris@16: Chris@16: private: Chris@16: std::vector updates; Chris@16: OwnerMap owner; Chris@16: }; Chris@16: Chris@16: /********************************************************************** Chris@16: * Remote updating set that sends messages immediately * Chris@16: **********************************************************************/ Chris@16: template Chris@16: class remote_update_set Chris@16: { Chris@16: typedef typename property_traits::key_type Key; Chris@16: typedef std::pair update_pair_type; Chris@16: typedef typename std::vector::size_type updates_size_type; Chris@16: Chris@16: public: Chris@16: typedef typename ProcessGroup::process_id_type process_id_type; Chris@16: Chris@16: private: Chris@16: enum message_kind { Chris@16: /** Contains a (key, value) pair that will be updated. */ Chris@16: msg_update Chris@16: }; Chris@16: Chris@16: struct handle_messages Chris@16: { Chris@16: explicit handle_messages(remote_update_set* self, const ProcessGroup& pg) Chris@16: : self(self) Chris@16: { update_sizes.resize(num_processes(pg), 0); } Chris@16: Chris@16: void operator()(process_id_type source, int tag) Chris@16: { Chris@16: // Receive the # of updates Chris@16: BOOST_ASSERT(tag == msg_update); Chris@16: update_pair_type update; Chris@16: receive(self->process_group, source, tag, update); Chris@16: Chris@16: // Send update to derived "receive_update" member Chris@16: Derived* derived = static_cast(self); Chris@16: derived->receive_update(source, update.first, update.second); Chris@16: } Chris@16: Chris@16: private: Chris@16: std::vector update_sizes; Chris@16: remote_update_set* self; Chris@16: }; Chris@16: friend struct handle_messages; Chris@16: Chris@16: protected: Chris@16: remote_update_set(const ProcessGroup& pg, const OwnerMap& owner) Chris@16: : process_group(pg, handle_messages(this, pg)), owner(owner) { } Chris@16: Chris@16: void update(const Key& key, const Value& value) Chris@16: { Chris@16: if (get(owner, key) == process_id(process_group)) { Chris@16: Derived* derived = static_cast(this); Chris@16: derived->receive_update(get(owner, key), key, value); Chris@16: } Chris@16: else Chris@16: send(process_group, get(owner, key), msg_update, Chris@16: update_pair_type(key, value)); Chris@16: } Chris@16: Chris@16: void collect() Chris@16: { Chris@16: typedef std::pair probe_type; Chris@16: handle_messages handler(this, process_group); Chris@16: while (optional stp = probe(process_group)) Chris@16: if (stp->second == msg_update) handler(stp->first, stp->second); Chris@16: } Chris@16: Chris@16: void synchronize() Chris@16: { Chris@16: do_synchronize(process_group); Chris@16: } Chris@16: Chris@16: ProcessGroup process_group; Chris@16: OwnerMap owner; Chris@16: }; Chris@16: Chris@16: } } } // end namespace boost::graph::detail Chris@16: Chris@16: #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP