Chris@16: // Copyright (C) 2005-2006 The Trustees of Indiana University. Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // Authors: Douglas Gregor Chris@16: // Andrew Lumsdaine Chris@16: Chris@16: // Chris@16: // Implements redistribution of vertices for a distributed adjacency Chris@16: // list. This file should not be included by users. It will be Chris@16: // included by the distributed adjacency list header. Chris@16: // Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: #include Chris@16: Chris@16: namespace boost { namespace detail { namespace parallel { Chris@16: Chris@16: /* This structure contains a (vertex or edge) descriptor that is being Chris@16: moved from one processor to another. It contains the properties for Chris@16: that descriptor (if any). Chris@16: */ Chris@16: template Chris@16: struct redistributed_descriptor : maybe_store_property Chris@16: { Chris@16: typedef maybe_store_property inherited; Chris@16: Chris@16: redistributed_descriptor() { } Chris@16: Chris@16: redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p) Chris@16: : inherited(p), descriptor(v) { } Chris@16: Chris@16: Descriptor descriptor; Chris@16: Chris@16: private: Chris@16: friend class boost::serialization::access; Chris@16: Chris@16: template Chris@16: void serialize(Archiver& ar, unsigned int /*version*/) Chris@16: { Chris@16: ar & boost::serialization::base_object(*this) Chris@16: & unsafe_serialize(descriptor); Chris@16: } Chris@16: }; Chris@16: Chris@16: /* Predicate that returns true if the target has migrated. */ Chris@16: template Chris@16: struct target_migrated_t Chris@16: { Chris@16: typedef typename graph_traits::vertex_descriptor Vertex; Chris@16: typedef typename graph_traits::edge_descriptor Edge; Chris@16: Chris@16: target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) Chris@16: : vertex_to_processor(vertex_to_processor), g(g) { } Chris@16: Chris@16: bool operator()(Edge e) const Chris@16: { Chris@16: typedef global_descriptor DVertex; Chris@16: processor_id_type owner = get(edge_target_processor_id, g, e); Chris@16: return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner; Chris@16: } Chris@16: Chris@16: private: Chris@16: VertexProcessorMap vertex_to_processor; Chris@16: const Graph& g; Chris@16: }; Chris@16: Chris@16: template Chris@16: inline target_migrated_t Chris@16: target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) Chris@16: { return target_migrated_t(vertex_to_processor, g); } Chris@16: Chris@16: /* Predicate that returns true if the source of an in-edge has migrated. */ Chris@16: template Chris@16: struct source_migrated_t Chris@16: { Chris@16: typedef typename graph_traits::vertex_descriptor Vertex; Chris@16: typedef typename graph_traits::edge_descriptor Edge; Chris@16: Chris@16: source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) Chris@16: : vertex_to_processor(vertex_to_processor), g(g) { } Chris@16: Chris@16: bool operator()(stored_in_edge e) const Chris@16: { Chris@16: return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g))) Chris@16: != e.source_processor; Chris@16: } Chris@16: Chris@16: private: Chris@16: VertexProcessorMap vertex_to_processor; Chris@16: const Graph& g; Chris@16: }; Chris@16: Chris@16: template Chris@16: inline source_migrated_t Chris@16: source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) Chris@16: { return source_migrated_t(vertex_to_processor, g); } Chris@16: Chris@16: /* Predicate that returns true if the target has migrated. */ Chris@16: template Chris@16: struct source_or_target_migrated_t Chris@16: { Chris@16: typedef typename graph_traits::edge_descriptor Edge; Chris@16: Chris@16: source_or_target_migrated_t(VertexProcessorMap vertex_to_processor, Chris@16: const Graph& g) Chris@16: : vertex_to_processor(vertex_to_processor), g(g) { } Chris@16: Chris@16: bool operator()(Edge e) const Chris@16: { Chris@16: return get(vertex_to_processor, source(e, g)) != source(e, g).owner Chris@16: || get(vertex_to_processor, target(e, g)) != target(e, g).owner; Chris@16: } Chris@16: Chris@16: private: Chris@16: VertexProcessorMap vertex_to_processor; Chris@16: const Graph& g; Chris@16: }; Chris@16: Chris@16: template Chris@16: inline source_or_target_migrated_t Chris@16: source_or_target_migrated(VertexProcessorMap vertex_to_processor, Chris@16: const Graph& g) Chris@16: { Chris@16: typedef source_or_target_migrated_t result_type; Chris@16: return result_type(vertex_to_processor, g); Chris@16: } Chris@16: Chris@16: } } // end of namespace detail::parallel Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE Chris@16: ::request_in_neighbors(vertex_descriptor v, Chris@16: VertexProcessorMap vertex_to_processor, Chris@16: bidirectionalS) Chris@16: { Chris@16: BGL_FORALL_INEDGES_T(v, e, *this, graph_type) Chris@16: request(vertex_to_processor, source(e, *this)); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE Chris@16: ::remove_migrated_in_edges(vertex_descriptor v, Chris@16: VertexProcessorMap vertex_to_processor, Chris@16: bidirectionalS) Chris@16: { Chris@16: graph_detail::erase_if(get(vertex_in_edges, base())[v.local], Chris@16: source_migrated(vertex_to_processor, base())); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE Chris@16: ::redistribute(VertexProcessorMap vertex_to_processor) Chris@16: { Chris@16: using boost::parallel::inplace_all_to_all; Chris@16: Chris@16: // When we have stable descriptors, we only move those descriptors Chris@16: // that actually need to be moved. Otherwise, we essentially have to Chris@16: // regenerate the entire graph. Chris@16: const bool has_stable_descriptors = Chris@16: is_same::value Chris@16: || is_same::value Chris@16: || is_same::value; Chris@16: Chris@16: typedef detail::parallel::redistributed_descriptor Chris@16: redistributed_vertex; Chris@16: typedef detail::parallel::redistributed_descriptor Chris@16: redistributed_edge; Chris@16: Chris@16: vertex_iterator vi, vi_end; Chris@16: edge_iterator ei, ei_end; Chris@16: Chris@16: process_group_type pg = process_group(); Chris@16: Chris@16: // Initial synchronization makes sure that we have all of our ducks Chris@16: // in a row. We don't want any outstanding add/remove messages Chris@16: // coming in mid-redistribution! Chris@16: synchronize(process_group_); Chris@16: Chris@16: // We cannot cope with eviction of ghost cells Chris@16: vertex_to_processor.set_max_ghost_cells(0); Chris@16: Chris@16: process_id_type p = num_processes(pg); Chris@16: Chris@16: // Send vertices and edges to the processor where they will Chris@16: // actually reside. This requires O(|V| + |E|) communication Chris@16: std::vector > redistributed_vertices(p); Chris@16: std::vector > redistributed_edges(p); Chris@16: Chris@16: // Build the sets of relocated vertices for each process and then do Chris@16: // an all-to-all transfer. Chris@16: for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) { Chris@16: if (!has_stable_descriptors Chris@16: || get(vertex_to_processor, *vi) != vi->owner) { Chris@16: redistributed_vertices[get(vertex_to_processor, *vi)] Chris@16: .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(), Chris@16: vi->local))); Chris@16: } Chris@16: Chris@16: // When our descriptors are stable, we need to determine which Chris@16: // adjacent descriptors are stable to determine which edges will Chris@16: // be removed. Chris@16: if (has_stable_descriptors) { Chris@16: BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type) Chris@16: request(vertex_to_processor, target(e, *this)); Chris@16: request_in_neighbors(*vi, vertex_to_processor, directed_selector()); Chris@16: } Chris@16: } Chris@16: Chris@16: inplace_all_to_all(pg, redistributed_vertices); Chris@16: Chris@16: // If we have stable descriptors, we need to know where our neighbor Chris@16: // vertices are moving. Chris@16: if (has_stable_descriptors) Chris@16: synchronize(vertex_to_processor); Chris@16: Chris@16: // Build the sets of relocated edges for each process and then do Chris@16: // an all-to-all transfer. Chris@16: for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) { Chris@16: vertex_descriptor src = source(*ei, *this); Chris@16: vertex_descriptor tgt = target(*ei, *this); Chris@16: if (!has_stable_descriptors Chris@16: || get(vertex_to_processor, src) != src.owner Chris@16: || get(vertex_to_processor, tgt) != tgt.owner) Chris@16: redistributed_edges[get(vertex_to_processor, source(*ei, *this))] Chris@16: .push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(), Chris@16: ei->local)))); Chris@16: } Chris@16: inplace_all_to_all(pg, redistributed_edges); Chris@16: Chris@16: // A mapping from old vertex descriptors to new vertex Chris@16: // descriptors. This is an STL map partly because I'm too lazy to Chris@16: // build a real property map (which is hard in the general case) but Chris@16: // also because it won't try to look in the graph itself, because Chris@16: // the keys are all vertex descriptors that have been invalidated. Chris@16: std::map old_to_new_vertex_map; Chris@16: Chris@16: if (has_stable_descriptors) { Chris@16: // Clear out all vertices and edges that will have moved. There Chris@16: // are several stages to this. Chris@16: Chris@16: // First, eliminate all outgoing edges from the (local) vertices Chris@16: // that have been moved or whose targets have been moved. Chris@16: BGL_FORALL_VERTICES_T(v, *this, graph_type) { Chris@16: if (get(vertex_to_processor, v) != v.owner) { Chris@16: clear_out_edges(v.local, base()); Chris@16: clear_in_edges_local(v, directed_selector()); Chris@16: } else { Chris@16: remove_out_edge_if(v.local, Chris@16: target_migrated(vertex_to_processor, base()), Chris@16: base()); Chris@16: remove_migrated_in_edges(v, vertex_to_processor, directed_selector()); Chris@16: } Chris@16: } Chris@16: Chris@16: // Next, eliminate locally-stored edges that have migrated (for Chris@16: // undirected graphs). Chris@16: graph_detail::erase_if(local_edges_, Chris@16: source_or_target_migrated(vertex_to_processor, *this)); Chris@16: Chris@16: // Eliminate vertices that have migrated Chris@16: for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) { Chris@16: if (get(vertex_to_processor, *vi) != vi->owner) Chris@16: remove_vertex((*vi++).local, base()); Chris@16: else { Chris@16: // Add the identity relation for vertices that have not migrated Chris@16: old_to_new_vertex_map[*vi] = *vi; Chris@16: ++vi; Chris@16: } Chris@16: } Chris@16: } else { Chris@16: // Clear out the local graph: the entire graph is in transit Chris@16: clear(); Chris@16: } Chris@16: Chris@16: // Add the new vertices to the graph. When we do so, update the old Chris@16: // -> new vertex mapping both locally and for the owner of the "old" Chris@16: // vertex. Chris@16: { Chris@16: typedef std::pair mapping_pair; Chris@16: std::vector > mappings(p); Chris@16: Chris@16: for (process_id_type src = 0; src < p; ++src) { Chris@16: for (typename std::vector::iterator vi = Chris@16: redistributed_vertices[src].begin(); Chris@16: vi != redistributed_vertices[src].end(); ++vi) { Chris@16: vertex_descriptor new_vertex = Chris@16: add_vertex(vi->get_property(), *this); Chris@16: old_to_new_vertex_map[vi->descriptor] = new_vertex; Chris@16: mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor, Chris@16: new_vertex)); Chris@16: } Chris@16: Chris@16: redistributed_vertices[src].clear(); Chris@16: } Chris@16: Chris@16: inplace_all_to_all(pg, mappings); Chris@16: Chris@16: // Add the mappings we were sent into the old->new map. Chris@16: for (process_id_type src = 0; src < p; ++src) Chris@16: old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end()); Chris@16: } Chris@16: Chris@16: // Get old->new vertex mappings for all of the vertices we need to Chris@16: // know about. Chris@16: Chris@16: // TBD: An optimization here might involve sending the Chris@16: // request-response pairs without an explicit request step (for Chris@16: // bidirectional and undirected graphs). However, it may not matter Chris@16: // all that much given the cost of redistribution. Chris@16: { Chris@16: std::vector > vertex_map_requests(p); Chris@16: std::vector > vertex_map_responses(p); Chris@16: Chris@16: // We need to know about all of the vertices incident on edges Chris@16: // that have been relocated to this processor. Tell each processor Chris@16: // what each other processor needs to know. Chris@16: for (process_id_type src = 0; src < p; ++src) Chris@16: for (typename std::vector::iterator ei = Chris@16: redistributed_edges[src].begin(); Chris@16: ei != redistributed_edges[src].end(); ++ei) { Chris@16: vertex_descriptor need_vertex = target(ei->descriptor, *this); Chris@16: if (old_to_new_vertex_map.find(need_vertex) Chris@16: == old_to_new_vertex_map.end()) Chris@16: { Chris@16: old_to_new_vertex_map[need_vertex] = need_vertex; Chris@16: vertex_map_requests[need_vertex.owner].push_back(need_vertex); Chris@16: } Chris@16: } Chris@16: inplace_all_to_all(pg, Chris@16: vertex_map_requests, Chris@16: vertex_map_responses); Chris@16: Chris@16: // Process the requests made for vertices we own. Then perform yet Chris@16: // another all-to-all swap. This one matches the requests we've Chris@16: // made to the responses we were given. Chris@16: for (process_id_type src = 0; src < p; ++src) Chris@16: for (typename std::vector::iterator vi = Chris@16: vertex_map_responses[src].begin(); Chris@16: vi != vertex_map_responses[src].end(); ++vi) Chris@16: *vi = old_to_new_vertex_map[*vi]; Chris@16: inplace_all_to_all(pg, vertex_map_responses); Chris@16: Chris@16: // Matching the requests to the responses, update the old->new Chris@16: // vertex map for all of the vertices we will need to know. Chris@16: for (process_id_type src = 0; src < p; ++src) { Chris@16: typedef typename std::vector::size_type size_type; Chris@16: for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) { Chris@16: old_to_new_vertex_map[vertex_map_requests[src][i]] = Chris@16: vertex_map_responses[src][i]; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: // Add edges to the graph by mapping the source and target. Chris@16: for (process_id_type src = 0; src < p; ++src) { Chris@16: for (typename std::vector::iterator ei = Chris@16: redistributed_edges[src].begin(); Chris@16: ei != redistributed_edges[src].end(); ++ei) { Chris@16: add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)], Chris@16: old_to_new_vertex_map[target(ei->descriptor, *this)], Chris@16: ei->get_property(), Chris@16: *this); Chris@16: } Chris@16: Chris@16: redistributed_edges[src].clear(); Chris@16: } Chris@16: Chris@16: // Be sure that edge-addition messages are received now, completing Chris@16: // the graph. Chris@16: synchronize(process_group_); Chris@16: Chris@16: this->distribution().clear(); Chris@16: Chris@16: detail::parallel::maybe_initialize_vertex_indices(vertices(base()), Chris@16: get(vertex_index, base())); Chris@16: } Chris@16: Chris@16: } // end namespace boost