Chris@16: // Copyright (C) 2004-2006 The Trustees of Indiana University. Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Brian Barrett Chris@16: // Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: #ifndef BOOST_GRAPH_PARALLEL_CC_PS_HPP Chris@16: #define BOOST_GRAPH_PARALLEL_CC_PS_HPP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: Chris@16: // Connected components algorithm based on a parallel search. Chris@16: // Chris@16: // Every N nodes starts a parallel search from the first vertex in Chris@16: // their local vertex list during the first superstep (the other nodes Chris@16: // remain idle during the first superstep to reduce the number of Chris@16: // conflicts in numbering the components). At each superstep, all new Chris@16: // component mappings from remote nodes are handled. If there is no Chris@16: // work from remote updates, a new vertex is removed from the local Chris@16: // list and added to the work queue. Chris@16: // Chris@16: // Components are allocated from the component_value_allocator object, Chris@16: // which ensures that a given component number is unique in the Chris@16: // system, currently by using the rank and number of processes to Chris@16: // stride allocations. Chris@16: // Chris@16: // When two components are discovered to actually be the same Chris@16: // component, a mapping is created in the collisions object. The Chris@16: // lower component number is prefered in the resolution, so component Chris@16: // numbering resolution is consistent. After the search has exhausted Chris@16: // all vertices in the graph, the mapping is shared with all Chris@16: // processes, and they independently resolve the comonent mapping (so Chris@16: // O((N * NP) + (V * NP)) work, in O(N + V) time, where N is the Chris@16: // number of mappings and V is the number of local vertices). This Chris@16: // phase can likely be significantly sped up if a clever algorithm for Chris@16: // the reduction can be found. Chris@16: namespace boost { namespace graph { namespace distributed { Chris@16: namespace cc_ps_detail { Chris@16: // Local object for allocating component numbers. There are two Chris@16: // places this happens in the code, and I was getting sick of them Chris@16: // getting out of sync. Components are not tightly packed in Chris@16: // numbering, but are numbered to ensure each rank has its own Chris@16: // independent sets of numberings. Chris@16: template Chris@16: class component_value_allocator { Chris@16: public: Chris@16: component_value_allocator(int num, int size) : Chris@16: last(0), num(num), size(size) Chris@16: { Chris@16: } Chris@16: Chris@16: component_value_type allocate(void) Chris@16: { Chris@16: component_value_type ret = num + (last * size); Chris@16: last++; Chris@16: return ret; Chris@16: } Chris@16: Chris@16: private: Chris@16: component_value_type last; Chris@16: int num; Chris@16: int size; Chris@16: }; Chris@16: Chris@16: Chris@16: // Map of the "collisions" between component names in the global Chris@16: // component mapping. TO make cleanup easier, component numbers Chris@16: // are added, pointing to themselves, when a new component is Chris@16: // found. In order to make the results deterministic, the lower Chris@16: // component number is always taken. The resolver will drill Chris@16: // through the map until it finds a component entry that points to Chris@16: // itself as the next value, allowing some cleanup to happen at Chris@16: // update() time. Attempts are also made to update the mapping Chris@16: // when new entries are created. Chris@16: // Chris@16: // Note that there's an assumption that the entire mapping is Chris@16: // shared during the end of the algorithm, but before component Chris@16: // name resolution. Chris@16: template Chris@16: class collision_map { Chris@16: public: Chris@16: collision_map() : num_unique(0) Chris@16: { Chris@16: } Chris@16: Chris@16: // add new component mapping first time component is used. Own Chris@16: // function only so that we can sanity check there isn't already Chris@16: // a mapping for that component number (which would be bad) Chris@16: void add(const component_value_type &a) Chris@16: { Chris@16: BOOST_ASSERT(collisions.count(a) == 0); Chris@16: collisions[a] = a; Chris@16: } Chris@16: Chris@16: // add a mapping between component values saying they're the Chris@16: // same component Chris@16: void add(const component_value_type &a, const component_value_type &b) Chris@16: { Chris@16: component_value_type high, low, tmp; Chris@16: if (a > b) { Chris@16: high = a; Chris@16: low = b; Chris@16: } else { Chris@16: high = b; Chris@16: low = a; Chris@16: } Chris@16: Chris@16: if (collisions.count(high) != 0 && collisions[high] != low) { Chris@16: tmp = collisions[high]; Chris@16: if (tmp > low) { Chris@16: collisions[tmp] = low; Chris@16: collisions[high] = low; Chris@16: } else { Chris@16: collisions[low] = tmp; Chris@16: collisions[high] = tmp; Chris@16: } Chris@16: } else { Chris@16: collisions[high] = low; Chris@16: } Chris@16: Chris@16: } Chris@16: Chris@16: // get the "real" component number for the given component. Chris@16: // Used to resolve mapping at end of run. Chris@16: component_value_type update(component_value_type a) Chris@16: { Chris@16: BOOST_ASSERT(num_unique > 0); Chris@16: BOOST_ASSERT(collisions.count(a) != 0); Chris@16: return collisions[a]; Chris@16: } Chris@16: Chris@16: // collapse the collisions tree, so that update is a one lookup Chris@16: // operation. Count unique components at the same time. Chris@16: void uniqify(void) Chris@16: { Chris@16: typename std::map::iterator i, end; Chris@16: Chris@16: end = collisions.end(); Chris@16: for (i = collisions.begin() ; i != end ; ++i) { Chris@16: if (i->first == i->second) { Chris@16: num_unique++; Chris@16: } else { Chris@16: i->second = collisions[i->second]; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: // get the number of component entries that have an associated Chris@16: // component number of themselves, which are the real components Chris@16: // used in the final mapping. This is the number of unique Chris@16: // components in the graph. Chris@16: int unique(void) Chris@16: { Chris@16: BOOST_ASSERT(num_unique > 0); Chris@16: return num_unique; Chris@16: } Chris@16: Chris@16: // "serialize" into a vector for communication. Chris@16: std::vector serialize(void) Chris@16: { Chris@16: std::vector ret; Chris@16: typename std::map::iterator i, end; Chris@16: Chris@16: end = collisions.end(); Chris@16: for (i = collisions.begin() ; i != end ; ++i) { Chris@16: ret.push_back(i->first); Chris@16: ret.push_back(i->second); Chris@16: } Chris@16: Chris@16: return ret; Chris@16: } Chris@16: Chris@16: private: Chris@16: std::map collisions; Chris@16: int num_unique; Chris@16: }; Chris@16: Chris@16: Chris@16: // resolver to handle remote updates. The resolver will add Chris@16: // entries into the collisions map if required, and if it is the Chris@16: // first time the vertex has been touched, it will add the vertex Chris@16: // to the remote queue. Note that local updates are handled Chris@16: // differently, in the main loop (below). Chris@16: Chris@16: // BWB - FIX ME - don't need graph anymore - can pull from key value of Component Map. Chris@16: template Chris@16: struct update_reducer { Chris@16: BOOST_STATIC_CONSTANT(bool, non_default_resolver = false); Chris@16: Chris@16: typedef typename property_traits::value_type component_value_type; Chris@16: typedef typename property_traits::key_type vertex_descriptor; Chris@16: Chris@16: update_reducer(work_queue *q, Chris@16: cc_ps_detail::collision_map *collisions, Chris@16: processor_id_type pg_id) : Chris@16: q(q), collisions(collisions), pg_id(pg_id) Chris@16: { Chris@16: } Chris@16: Chris@16: // ghost cell initialization routine. This should never be Chris@16: // called in this imlementation. Chris@16: template Chris@16: component_value_type operator()(const K&) const Chris@16: { Chris@16: return component_value_type(0); Chris@16: } Chris@16: Chris@16: // resolver for remote updates. I'm not entirely sure why, but Chris@16: // I decided to not change the value of the vertex if it's Chris@16: // already non-infinite. It doesn't matter in the end, as we'll Chris@16: // touch every vertex in the cleanup phase anyway. If the Chris@16: // component is currently infinite, set to the new component Chris@16: // number and add the vertex to the work queue. If it's not Chris@16: // infinite, we've touched it already so don't add it to the Chris@16: // work queue. Do add a collision entry so that we know the two Chris@16: // components are the same. Chris@16: component_value_type operator()(const vertex_descriptor &v, Chris@16: const component_value_type& current, Chris@16: const component_value_type& update) const Chris@16: { Chris@16: const component_value_type max = (std::numeric_limits::max)(); Chris@16: component_value_type ret = current; Chris@16: Chris@16: if (max == current) { Chris@16: q->push(v); Chris@16: ret = update; Chris@16: } else if (current != update) { Chris@16: collisions->add(current, update); Chris@16: } Chris@16: Chris@16: return ret; Chris@16: } Chris@16: Chris@16: // So for whatever reason, the property map can in theory call Chris@16: // the resolver with a local descriptor in addition to the Chris@16: // standard global descriptor. As far as I can tell, this code Chris@16: // path is never taken in this implementation, but I need to Chris@16: // have this code here to make it compile. We just make a Chris@16: // global descriptor and call the "real" operator(). Chris@16: template Chris@16: component_value_type operator()(const K& v, Chris@16: const component_value_type& current, Chris@16: const component_value_type& update) const Chris@16: { Chris@16: return (*this)(vertex_descriptor(pg_id, v), current, update); Chris@16: } Chris@16: Chris@16: private: Chris@16: work_queue *q; Chris@16: collision_map *collisions; Chris@16: boost::processor_id_type pg_id; Chris@16: }; Chris@16: Chris@16: } // namespace cc_ps_detail Chris@16: Chris@16: Chris@16: template Chris@16: typename property_traits::value_type Chris@16: connected_components_ps(const Graph& g, ComponentMap c) Chris@16: { Chris@16: using boost::graph::parallel::process_group; Chris@16: Chris@16: typedef typename property_traits::value_type component_value_type; Chris@16: typedef typename graph_traits::vertex_iterator vertex_iterator; Chris@16: typedef typename graph_traits::vertex_descriptor vertex_descriptor; Chris@16: typedef typename boost::graph::parallel::process_group_type Chris@16: ::type process_group_type; Chris@16: typedef typename process_group_type::process_id_type process_id_type; Chris@16: typedef std::queue work_queue; Chris@16: Chris@16: static const component_value_type max_component = Chris@16: (std::numeric_limits::max)(); Chris@16: typename property_map::const_type Chris@16: owner = get(vertex_owner, g); Chris@16: Chris@16: // standard who am i? stuff Chris@16: process_group_type pg = process_group(g); Chris@16: process_id_type id = process_id(pg); Chris@16: Chris@16: // Initialize every vertex to have infinite component number Chris@16: BGL_FORALL_VERTICES_T(v, g, Graph) put(c, v, max_component); Chris@16: Chris@16: vertex_iterator current, end; Chris@16: boost::tie(current, end) = vertices(g); Chris@16: Chris@16: cc_ps_detail::component_value_allocator cva(process_id(pg), num_processes(pg)); Chris@16: cc_ps_detail::collision_map collisions; Chris@16: work_queue q; // this is intentionally a local data structure Chris@16: c.set_reduce(cc_ps_detail::update_reducer(&q, &collisions, id)); Chris@16: Chris@16: // add starting work Chris@16: while (true) { Chris@16: bool useful_found = false; Chris@16: component_value_type val = cva.allocate(); Chris@16: put(c, *current, val); Chris@16: collisions.add(val); Chris@16: q.push(*current); Chris@16: if (0 != out_degree(*current, g)) useful_found = true; Chris@16: ++current; Chris@16: if (useful_found) break; Chris@16: } Chris@16: Chris@16: // Run the loop until everyone in the system is done Chris@16: bool global_done = false; Chris@16: while (!global_done) { Chris@16: Chris@16: // drain queue of work for this superstep Chris@16: while (!q.empty()) { Chris@16: vertex_descriptor v = q.front(); Chris@16: q.pop(); Chris@16: // iterate through outedges of the vertex currently being Chris@16: // examined, setting their component to our component. There Chris@16: // is no way to end up in the queue without having a component Chris@16: // number already. Chris@16: Chris@16: BGL_FORALL_ADJ_T(v, peer, g, Graph) { Chris@16: component_value_type my_component = get(c, v); Chris@16: Chris@16: // update other vertex with our component information. Chris@16: // Resolver will handle remote collisions as well as whether Chris@16: // to put the vertex on the work queue or not. We have to Chris@16: // handle local collisions and work queue management Chris@16: if (id == get(owner, peer)) { Chris@16: if (max_component == get(c, peer)) { Chris@16: put(c, peer, my_component); Chris@16: q.push(peer); Chris@16: } else if (my_component != get(c, peer)) { Chris@16: collisions.add(my_component, get(c, peer)); Chris@16: } Chris@16: } else { Chris@16: put(c, peer, my_component); Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: // synchronize / start a new superstep. Chris@16: synchronize(pg); Chris@16: global_done = all_reduce(pg, (q.empty() && (current == end)), boost::parallel::minimum()); Chris@16: Chris@16: // If the queue is currently empty, add something to do to start Chris@16: // the current superstep (supersteps start at the sync, not at Chris@16: // the top of the while loop as one might expect). Down at the Chris@16: // bottom of the while loop so that not everyone starts the Chris@16: // algorithm with something to do, to try to reduce component Chris@16: // name conflicts Chris@16: if (q.empty()) { Chris@16: bool useful_found = false; Chris@16: for ( ; current != end && !useful_found ; ++current) { Chris@16: if (max_component == get(c, *current)) { Chris@16: component_value_type val = cva.allocate(); Chris@16: put(c, *current, val); Chris@16: collisions.add(val); Chris@16: q.push(*current); Chris@16: if (0 != out_degree(*current, g)) useful_found = true; Chris@16: } Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: // share component mappings Chris@16: std::vector global; Chris@16: std::vector mine = collisions.serialize(); Chris@16: all_gather(pg, mine.begin(), mine.end(), global); Chris@16: for (size_t i = 0 ; i < global.size() ; i += 2) { Chris@16: collisions.add(global[i], global[i + 1]); Chris@16: } Chris@16: collisions.uniqify(); Chris@16: Chris@16: // update the component mappings Chris@16: BGL_FORALL_VERTICES_T(v, g, Graph) { Chris@16: put(c, v, collisions.update(get(c, v))); Chris@16: } Chris@16: Chris@16: return collisions.unique(); Chris@16: } Chris@16: Chris@16: } // end namespace distributed Chris@16: Chris@16: } // end namespace graph Chris@16: Chris@16: } // end namespace boost Chris@16: Chris@16: #endif // BOOST_GRAPH_PARALLEL_CC_HPP