Chris@16: // Copyright (C) 2007 Douglas Gregor Chris@16: Chris@16: // Use, modification and distribution is subject to the Boost Software Chris@16: // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: // This file contains code for the distributed adjacency list's Chris@16: // initializations. It should not be included directly by users. Chris@16: Chris@16: #ifndef BOOST_GRAPH_DISTRIBUTED_ADJLIST_INITIALIZE_HPP Chris@16: #define BOOST_GRAPH_DISTRIBUTED_ADJLIST_INITIALIZE_HPP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: namespace boost { Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE:: Chris@16: initialize(EdgeIterator first, EdgeIterator last, Chris@16: vertices_size_type, const base_distribution_type& distribution, Chris@16: vecS) Chris@16: { Chris@16: process_id_type id = process_id(process_group_); Chris@16: while (first != last) { Chris@16: if ((process_id_type)distribution(first->first) == id) { Chris@16: vertex_descriptor source(id, distribution.local(first->first)); Chris@16: vertex_descriptor target(distribution(first->second), Chris@16: distribution.local(first->second)); Chris@16: add_edge(source, target, *this); Chris@16: } Chris@16: ++first; Chris@16: } Chris@16: Chris@16: synchronize(process_group_); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE:: Chris@16: initialize(EdgeIterator first, EdgeIterator last, Chris@16: EdgePropertyIterator ep_iter, Chris@16: vertices_size_type, const base_distribution_type& distribution, Chris@16: vecS) Chris@16: { Chris@16: process_id_type id = process_id(process_group_); Chris@16: while (first != last) { Chris@16: if (static_cast(distribution(first->first)) == id) { Chris@16: vertex_descriptor source(id, distribution.local(first->first)); Chris@16: vertex_descriptor target(distribution(first->second), Chris@16: distribution.local(first->second)); Chris@16: add_edge(source, target, *ep_iter, *this); Chris@16: } Chris@16: ++first; Chris@16: ++ep_iter; Chris@16: } Chris@16: Chris@16: synchronize(process_group_); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE:: Chris@16: initialize(EdgeIterator first, EdgeIterator last, Chris@16: EdgePropertyIterator ep_iter, Chris@16: vertices_size_type n, const base_distribution_type& distribution, Chris@16: VertexListS) Chris@16: { Chris@16: using boost::parallel::inplace_all_to_all; Chris@16: Chris@16: typedef vertices_size_type vertex_number_t; Chris@16: typedef typename std::iterator_traits::value_type Chris@16: edge_property_init_t; Chris@16: Chris@16: typedef std::pair Chris@16: st_pair; Chris@16: typedef std::pair delayed_edge_t; Chris@16: Chris@16: process_group_type pg = process_group(); Chris@16: process_id_type id = process_id(pg); Chris@16: Chris@16: // Vertex indices Chris@16: std::vector index_to_vertex; Chris@16: index_to_vertex.reserve(num_vertices(*this)); Chris@16: BGL_FORALL_VERTICES_T(v, base(), inherited) Chris@16: index_to_vertex.push_back(v); Chris@16: Chris@16: // The list of edges we can't add immediately. Chris@16: std::vector delayed_edges; Chris@16: Chris@16: std::vector > descriptor_requests; Chris@16: descriptor_requests.resize(num_processes(pg)); Chris@16: Chris@16: // Add all of the edges we can, up to the point where we run Chris@16: // into a descriptor we don't know. Chris@16: while (first != last) { Chris@16: if (distribution(first->first) == id) { Chris@16: if (distribution(first->second) != id) break; Chris@16: vertex_descriptor source Chris@16: (id, index_to_vertex[distribution.local(first->first)]); Chris@16: vertex_descriptor target Chris@16: (distribution(first->second), Chris@16: index_to_vertex[distribution.local(first->second)]); Chris@16: add_edge(source, target, *ep_iter, *this); Chris@16: } Chris@16: ++first; Chris@16: ++ep_iter; Chris@16: } Chris@16: Chris@16: // Queue all of the remaining edges and determine the set of Chris@16: // descriptors we need to know about. Chris@16: while (first != last) { Chris@16: if (distribution(first->first) == id) { Chris@16: vertex_descriptor source Chris@16: (id, index_to_vertex[distribution.local(first->first)]); Chris@16: process_id_type dest = distribution(first->second); Chris@16: if (dest != id) { Chris@16: descriptor_requests[dest] Chris@16: .push_back(distribution.local(first->second)); Chris@16: // Compact request list if we need to Chris@16: if (descriptor_requests[dest].size() > Chris@16: distribution.block_size(dest, n)) { Chris@16: std::sort(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()); Chris@16: descriptor_requests[dest].erase( Chris@16: std::unique(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()), Chris@16: descriptor_requests[dest].end()); Chris@16: } Chris@16: } Chris@16: Chris@16: // Save the edge for later Chris@16: delayed_edges.push_back Chris@16: (delayed_edge_t(st_pair(source, first->second), *ep_iter)); Chris@16: } Chris@16: ++first; Chris@16: ++ep_iter; Chris@16: } Chris@16: Chris@16: // Compact descriptor requests Chris@16: for (process_id_type dest = 0; dest < num_processes(pg); ++dest) { Chris@16: std::sort(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()); Chris@16: descriptor_requests[dest].erase( Chris@16: std::unique(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()), Chris@16: descriptor_requests[dest].end()); Chris@16: } Chris@16: Chris@16: // Send out all of the descriptor requests Chris@16: std::vector > in_descriptor_requests; Chris@16: in_descriptor_requests.resize(num_processes(pg)); Chris@16: inplace_all_to_all(pg, descriptor_requests, in_descriptor_requests); Chris@16: Chris@16: // Reply to all of the descriptor requests Chris@16: std::vector > Chris@16: descriptor_responses; Chris@16: descriptor_responses.resize(num_processes(pg)); Chris@16: for (process_id_type dest = 0; dest < num_processes(pg); ++dest) { Chris@16: for (std::size_t i = 0; i < in_descriptor_requests[dest].size(); ++i) { Chris@16: local_vertex_descriptor v = Chris@16: index_to_vertex[in_descriptor_requests[dest][i]]; Chris@16: descriptor_responses[dest].push_back(v); Chris@16: } Chris@16: in_descriptor_requests[dest].clear(); Chris@16: } Chris@16: in_descriptor_requests.clear(); Chris@16: inplace_all_to_all(pg, descriptor_responses); Chris@16: Chris@16: // Add the queued edges Chris@16: for(typename std::vector::iterator i Chris@16: = delayed_edges.begin(); i != delayed_edges.end(); ++i) { Chris@16: process_id_type dest = distribution(i->first.second); Chris@16: local_vertex_descriptor tgt_local; Chris@16: if (dest == id) { Chris@16: tgt_local = index_to_vertex[distribution.local(i->first.second)]; Chris@16: } else { Chris@16: std::vector& requests = descriptor_requests[dest]; Chris@16: typename std::vector::iterator pos = Chris@16: std::lower_bound(requests.begin(), requests.end(), Chris@16: distribution.local(i->first.second)); Chris@16: tgt_local = descriptor_responses[dest][pos - requests.begin()]; Chris@16: } Chris@16: add_edge(i->first.first, vertex_descriptor(dest, tgt_local), Chris@16: i->second, *this); Chris@16: } Chris@16: synchronize(process_group_); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void Chris@16: PBGL_DISTRIB_ADJLIST_TYPE:: Chris@16: initialize(EdgeIterator first, EdgeIterator last, Chris@16: vertices_size_type n, const base_distribution_type& distribution, Chris@16: VertexListS) Chris@16: { Chris@16: using boost::parallel::inplace_all_to_all; Chris@16: Chris@16: typedef vertices_size_type vertex_number_t; Chris@16: Chris@16: typedef std::pair delayed_edge_t; Chris@16: Chris@16: process_group_type pg = process_group(); Chris@16: process_id_type id = process_id(pg); Chris@16: Chris@16: // Vertex indices Chris@16: std::vector index_to_vertex; Chris@16: index_to_vertex.reserve(num_vertices(*this)); Chris@16: BGL_FORALL_VERTICES_T(v, base(), inherited) Chris@16: index_to_vertex.push_back(v); Chris@16: Chris@16: // The list of edges we can't add immediately. Chris@16: std::vector delayed_edges; Chris@16: Chris@16: std::vector > descriptor_requests; Chris@16: descriptor_requests.resize(num_processes(pg)); Chris@16: Chris@16: // Add all of the edges we can, up to the point where we run Chris@16: // into a descriptor we don't know. Chris@16: while (first != last) { Chris@16: if (distribution(first->first) == id) { Chris@16: if (distribution(first->second) != id) break; Chris@16: vertex_descriptor source Chris@16: (id, index_to_vertex[distribution.local(first->first)]); Chris@16: vertex_descriptor target Chris@16: (distribution(first->second), Chris@16: index_to_vertex[distribution.local(first->second)]); Chris@16: add_edge(source, target, *this); Chris@16: } Chris@16: ++first; Chris@16: } Chris@16: Chris@16: // Queue all of the remaining edges and determine the set of Chris@16: // descriptors we need to know about. Chris@16: while (first != last) { Chris@16: if (distribution(first->first) == id) { Chris@16: vertex_descriptor source Chris@16: (id, index_to_vertex[distribution.local(first->first)]); Chris@16: process_id_type dest = distribution(first->second); Chris@16: if (dest != id) { Chris@16: descriptor_requests[dest] Chris@16: .push_back(distribution.local(first->second)); Chris@16: // Compact request list if we need to Chris@16: if (descriptor_requests[dest].size() > Chris@16: distribution.block_size(dest, n)) { Chris@16: std::sort(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()); Chris@16: descriptor_requests[dest].erase( Chris@16: std::unique(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()), Chris@16: descriptor_requests[dest].end()); Chris@16: } Chris@16: } Chris@16: Chris@16: // Save the edge for later Chris@16: delayed_edges.push_back(delayed_edge_t(source, first->second)); Chris@16: } Chris@16: ++first; Chris@16: } Chris@16: Chris@16: // Compact descriptor requests Chris@16: for (process_id_type dest = 0; dest < num_processes(pg); ++dest) { Chris@16: std::sort(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()); Chris@16: descriptor_requests[dest].erase( Chris@16: std::unique(descriptor_requests[dest].begin(), Chris@16: descriptor_requests[dest].end()), Chris@16: descriptor_requests[dest].end()); Chris@16: } Chris@16: Chris@16: // Send out all of the descriptor requests Chris@16: std::vector > in_descriptor_requests; Chris@16: in_descriptor_requests.resize(num_processes(pg)); Chris@16: inplace_all_to_all(pg, descriptor_requests, in_descriptor_requests); Chris@16: Chris@16: // Reply to all of the descriptor requests Chris@16: std::vector > Chris@16: descriptor_responses; Chris@16: descriptor_responses.resize(num_processes(pg)); Chris@16: for (process_id_type dest = 0; dest < num_processes(pg); ++dest) { Chris@16: for (std::size_t i = 0; i < in_descriptor_requests[dest].size(); ++i) { Chris@16: local_vertex_descriptor v = Chris@16: index_to_vertex[in_descriptor_requests[dest][i]]; Chris@16: descriptor_responses[dest].push_back(v); Chris@16: } Chris@16: in_descriptor_requests[dest].clear(); Chris@16: } Chris@16: in_descriptor_requests.clear(); Chris@16: inplace_all_to_all(pg, descriptor_responses); Chris@16: Chris@16: // Add the queued edges Chris@16: for(typename std::vector::iterator i Chris@16: = delayed_edges.begin(); i != delayed_edges.end(); ++i) { Chris@16: process_id_type dest = distribution(i->second); Chris@16: local_vertex_descriptor tgt_local; Chris@16: if (dest == id) { Chris@16: tgt_local = index_to_vertex[distribution.local(i->second)]; Chris@16: } else { Chris@16: std::vector& requests = descriptor_requests[dest]; Chris@16: typename std::vector::iterator pos = Chris@16: std::lower_bound(requests.begin(), requests.end(), Chris@16: distribution.local(i->second)); Chris@16: tgt_local = descriptor_responses[dest][pos - requests.begin()]; Chris@16: } Chris@16: add_edge(i->first, vertex_descriptor(dest, tgt_local), *this); Chris@16: } Chris@16: synchronize(process_group_); Chris@16: } Chris@16: Chris@16: } // end namespace boost Chris@16: Chris@16: #endif // BOOST_GRAPH_DISTRIBUTED_ADJLIST_INITIALIZE_HPP