Chris@16: // Copyright Daniel Wallin 2007. Use, modification and distribution is Chris@16: // subject to the Boost Software License, Version 1.0. (See accompanying Chris@16: // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: #ifndef BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP Chris@16: #define BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP Chris@16: Chris@16: #ifndef BOOST_GRAPH_USE_MPI Chris@16: #error "Parallel BGL files should not be included unless has been included" Chris@16: #endif Chris@16: Chris@16: # include Chris@16: # include Chris@16: # include Chris@16: # include Chris@16: # include Chris@16: # include Chris@16: # include Chris@16: Chris@16: namespace boost { Chris@16: Chris@16: namespace detail { namespace parallel Chris@16: { Chris@16: Chris@16: // Wraps a local descriptor, making it serializable. Chris@16: template Chris@16: struct serializable_local_descriptor Chris@16: { Chris@16: serializable_local_descriptor() Chris@16: {} Chris@16: Chris@16: serializable_local_descriptor(Local local) Chris@16: : local(local) Chris@16: {} Chris@16: Chris@16: operator Local const&() const Chris@16: { Chris@16: return local; Chris@16: } Chris@16: Chris@16: bool operator==(serializable_local_descriptor const& other) const Chris@16: { Chris@16: return local == other.local; Chris@16: } Chris@16: Chris@16: bool operator<(serializable_local_descriptor const& other) const Chris@16: { Chris@16: return local < other.local; Chris@16: } Chris@16: Chris@16: template Chris@16: void serialize(Archive& ar, const unsigned int /*version*/) Chris@16: { Chris@16: ar & unsafe_serialize(local); Chris@16: } Chris@16: Chris@16: Local local; Chris@16: }; Chris@16: Chris@16: template Chris@16: struct pending_edge Chris@16: { Chris@16: pending_edge( Chris@16: Vertex source, Vertex target Chris@16: , Properties properties, void* property_ptr Chris@16: ) Chris@16: : source(source) Chris@16: , target(target) Chris@16: , properties(properties) Chris@16: , property_ptr(property_ptr) Chris@16: {} Chris@16: Chris@16: Vertex source; Chris@16: Vertex target; Chris@16: Properties properties; Chris@16: void* property_ptr; Chris@16: }; Chris@16: Chris@16: inline bool is_digit(char c) Chris@16: { Chris@16: return (bool)std::isdigit(c); Chris@16: } Chris@16: Chris@16: inline std::vector Chris@16: available_process_files(std::string const& filename) Chris@16: { Chris@16: if (!filesystem::exists(filename)) Chris@16: return std::vector(); Chris@16: Chris@16: std::vector result; Chris@16: Chris@16: for (filesystem::directory_iterator i(filename), end; i != end; ++i) Chris@16: { Chris@16: if (!filesystem::is_regular(*i)) Chris@16: boost::throw_exception(std::runtime_error("directory contains non-regular entries")); Chris@16: Chris@16: std::string process_name = i->path().filename().string(); Chris@16: for (std::string::size_type i = 0; i < process_name.size(); ++i) Chris@16: if (!is_digit(process_name[i])) Chris@16: boost::throw_exception(std::runtime_error("directory contains files with invalid names")); Chris@16: Chris@16: result.push_back(boost::lexical_cast(process_name)); Chris@16: } Chris@16: Chris@16: return result; Chris@16: } Chris@16: Chris@16: template Chris@16: void maybe_load_properties( Chris@16: Archive& ar, char const* name, property& properties) Chris@16: { Chris@16: ar >> serialization::make_nvp(name, get_property_value(properties, Tag())); Chris@16: maybe_load_properties(ar, name, static_cast(properties)); Chris@16: } Chris@16: Chris@16: template Chris@16: void maybe_load_properties( Chris@16: Archive&, char const*, no_property&) Chris@16: {} Chris@16: Chris@16: template Chris@16: void maybe_load_properties( Chris@16: Archive& ar, char const* name, Bundle& bundle) Chris@16: { Chris@16: ar >> serialization::make_nvp(name, bundle); Chris@16: no_property prop; Chris@16: maybe_load_properties(ar, name, prop); Chris@16: } Chris@16: Chris@16: Chris@16: Chris@16: Chris@16: Chris@16: Chris@16: template Chris@16: struct graph_loader Chris@16: { Chris@16: typedef typename Graph::vertex_descriptor vertex_descriptor; Chris@16: typedef typename Graph::local_vertex_descriptor local_vertex_descriptor; Chris@16: typedef typename Graph::vertex_property_type vertex_property_type; Chris@16: typedef typename Graph::edge_descriptor edge_descriptor; Chris@16: typedef typename Graph::local_edge_descriptor local_edge_descriptor; Chris@16: typedef typename Graph::edge_property_type edge_property_type; Chris@16: typedef typename Graph::process_group_type process_group_type; Chris@16: typedef typename process_group_type::process_id_type process_id_type; Chris@16: typedef typename Graph::directed_selector directed_selector; Chris@16: typedef typename mpl::if_< Chris@16: is_same, vecS, VertexListS Chris@16: >::type vertex_list_selector; Chris@16: typedef pending_edge Chris@16: pending_edge_type; Chris@16: typedef serializable_local_descriptor Chris@16: serializable_vertex_descriptor; Chris@16: Chris@16: graph_loader(Graph& g, Archive& ar) Chris@16: : m_g(g) Chris@16: , m_ar(ar) Chris@16: , m_pg(g.process_group()) Chris@16: , m_requested_vertices(num_processes(m_pg)) Chris@16: , m_remote_vertices(num_processes(m_pg)) Chris@16: , m_property_ptrs(num_processes(m_pg)) Chris@16: { Chris@16: g.clear(); Chris@16: load_prefix(); Chris@16: load_vertices(); Chris@16: load_edges(); Chris@16: ar >> make_nvp("distribution", m_g.distribution()); Chris@16: } Chris@16: Chris@16: private: Chris@16: struct pending_in_edge Chris@16: { Chris@16: pending_in_edge( Chris@16: vertex_descriptor u, vertex_descriptor v, void* property_ptr Chris@16: ) Chris@16: : u(u) Chris@16: , v(v) Chris@16: , property_ptr(property_ptr) Chris@16: {} Chris@16: Chris@16: vertex_descriptor u; Chris@16: vertex_descriptor v; Chris@16: void* property_ptr; Chris@16: }; Chris@16: Chris@16: bool is_root() const Chris@16: { Chris@16: return process_id(m_pg) == 0; Chris@16: } Chris@16: Chris@16: template Chris@16: serialization::nvp const make_nvp(char const* name, T& value) const Chris@16: { Chris@16: return serialization::nvp(name, value); Chris@16: } Chris@16: Chris@16: void load_prefix(); Chris@16: void load_vertices(); Chris@16: Chris@16: template Chris@16: void maybe_load_and_store_local_vertex(Anything); Chris@16: void maybe_load_and_store_local_vertex(vecS); Chris@16: Chris@16: void load_edges(); Chris@16: void load_in_edges(bidirectionalS); Chris@16: void load_in_edges(directedS); Chris@16: void add_pending_in_edge( Chris@16: vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS); Chris@16: template Chris@16: void add_pending_in_edge( Chris@16: vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything); Chris@16: template Chris@16: void add_edge( Chris@16: vertex_descriptor u, vertex_descriptor v Chris@16: , edge_property_type const& property, void* property_ptr, Anything); Chris@16: void add_edge( Chris@16: vertex_descriptor u, vertex_descriptor v Chris@16: , edge_property_type const& property, void* property_ptr, vecS); Chris@16: void add_remote_vertex_request( Chris@16: vertex_descriptor u, vertex_descriptor v, directedS); Chris@16: void add_remote_vertex_request( Chris@16: vertex_descriptor u, vertex_descriptor v, bidirectionalS); Chris@16: void add_in_edge( Chris@16: edge_descriptor const&, void*, directedS); Chris@16: void add_in_edge( Chris@16: edge_descriptor const& edge, void* old_property_ptr, bidirectionalS); Chris@16: Chris@16: void resolve_remote_vertices(directedS); Chris@16: void resolve_remote_vertices(bidirectionalS); Chris@16: vertex_descriptor resolve_remote_vertex(vertex_descriptor u) const; Chris@16: vertex_descriptor resolve_remote_vertex(vertex_descriptor u, vecS) const; Chris@16: template Chris@16: vertex_descriptor resolve_remote_vertex(vertex_descriptor u, Anything) const; Chris@16: Chris@16: void resolve_property_ptrs(); Chris@16: Chris@16: void commit_pending_edges(vecS); Chris@16: template Chris@16: void commit_pending_edges(Anything); Chris@16: void commit_pending_in_edges(directedS); Chris@16: void commit_pending_in_edges(bidirectionalS); Chris@16: Chris@16: void* maybe_load_property_ptr(directedS) { return 0; } Chris@16: void* maybe_load_property_ptr(bidirectionalS); Chris@16: Chris@16: Graph& m_g; Chris@16: Archive& m_ar; Chris@16: process_group_type m_pg; Chris@16: Chris@16: std::vector m_id_mapping; Chris@16: Chris@16: // Maps local vertices as loaded from the archive to Chris@16: // the ones actually added to the graph. Only used Chris@16: // when !vecS. Chris@16: std::map m_local_vertices; Chris@16: Chris@16: // This is the list of remote vertex descriptors that we Chris@16: // are going to receive from other processes. This is Chris@16: // kept sorted so that we can determine the position of Chris@16: // the matching vertex descriptor in m_remote_vertices. Chris@16: std::vector > m_requested_vertices; Chris@16: Chris@16: // This is the list of remote vertex descriptors that Chris@16: // we send and receive from other processes. Chris@16: std::vector > m_remote_vertices; Chris@16: Chris@16: // ... Chris@16: std::vector m_pending_edges; Chris@16: Chris@16: // The pending in-edges that will be added in the commit step, after Chris@16: // the remote vertex descriptors has been resolved. Only used Chris@16: // when bidirectionalS and !vecS. Chris@16: std::vector m_pending_in_edges; Chris@16: Chris@16: std::vector > > m_property_ptrs; Chris@16: }; Chris@16: Chris@16: template Chris@16: void graph_loader::load_prefix() Chris@16: { Chris@16: typename process_group_type::process_size_type num_processes_; Chris@16: m_ar >> make_nvp("num_processes", num_processes_); Chris@16: Chris@16: if (num_processes_ != num_processes(m_pg)) Chris@16: boost::throw_exception(std::runtime_error("number of processes mismatch")); Chris@16: Chris@16: process_id_type old_id; Chris@16: m_ar >> make_nvp("id", old_id); Chris@16: Chris@16: std::vector mapping; Chris@16: m_ar >> make_nvp("mapping", mapping); Chris@16: Chris@16: // Fetch all the old id's from the other processes. Chris@16: std::vector old_ids; Chris@16: all_gather(m_pg, &old_id, &old_id+1, old_ids); Chris@16: Chris@16: m_id_mapping.resize(num_processes(m_pg), -1); Chris@16: Chris@16: for (process_id_type i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << i << " used to be " << old_ids[i] << "\n"; Chris@16: # endif Chris@16: BOOST_ASSERT(m_id_mapping[old_ids[i]] == -1); Chris@16: m_id_mapping[old_ids[i]] = i; Chris@16: } Chris@16: Chris@16: std::vector new_mapping( Chris@16: mapping.size()); Chris@16: Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: new_mapping[mapping[old_ids[i]]] = i; Chris@16: } Chris@16: Chris@16: m_g.distribution().assign_mapping( Chris@16: new_mapping.begin(), new_mapping.end()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::load_vertices() Chris@16: { Chris@16: int V; Chris@16: m_ar >> BOOST_SERIALIZATION_NVP(V); Chris@16: Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << "Loading vertices\n"; Chris@16: # endif Chris@16: Chris@16: for (int i = 0; i < V; ++i) Chris@16: { Chris@16: maybe_load_and_store_local_vertex(vertex_list_selector()); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void graph_loader::maybe_load_and_store_local_vertex(Anything) Chris@16: { Chris@16: // Load the original vertex descriptor Chris@16: local_vertex_descriptor local; Chris@16: m_ar >> make_nvp("local", unsafe_serialize(local)); Chris@16: Chris@16: // Load the properties Chris@16: vertex_property_type property; Chris@16: detail::parallel::maybe_load_properties(m_ar, "vertex_property", Chris@16: property); Chris@16: Chris@16: // Add the vertex Chris@16: vertex_descriptor v(process_id(m_pg), add_vertex(property, m_g.base())); Chris@16: Chris@16: if (m_g.on_add_vertex) Chris@16: m_g.on_add_vertex(v, m_g); Chris@16: Chris@16: // Create the mapping from the "old" local descriptor to the new Chris@16: // local descriptor. Chris@16: m_local_vertices[local] = v.local; Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::maybe_load_and_store_local_vertex(vecS) Chris@16: { Chris@16: // Load the properties Chris@16: vertex_property_type property; Chris@16: detail::parallel::maybe_load_properties(m_ar, "vertex_property", Chris@16: property); Chris@16: Chris@16: // Add the vertex Chris@16: vertex_descriptor v(process_id(m_pg), Chris@16: add_vertex(m_g.build_vertex_property(property), Chris@16: m_g.base())); Chris@16: Chris@16: if (m_g.on_add_vertex) Chris@16: m_g.on_add_vertex(v, m_g); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::load_edges() Chris@16: { Chris@16: int E; Chris@16: m_ar >> BOOST_SERIALIZATION_NVP(E); Chris@16: Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << "Loading edges\n"; Chris@16: # endif Chris@16: Chris@16: for (int i = 0; i < E; ++i) Chris@16: { Chris@16: local_vertex_descriptor local_src; Chris@16: process_id_type target_owner; Chris@16: local_vertex_descriptor local_tgt; Chris@16: Chris@16: m_ar >> make_nvp("source", unsafe_serialize(local_src)); Chris@16: m_ar >> make_nvp("target_owner", target_owner); Chris@16: m_ar >> make_nvp("target", unsafe_serialize(local_tgt)); Chris@16: Chris@16: process_id_type new_src_owner = process_id(m_pg); Chris@16: process_id_type new_tgt_owner = m_id_mapping[target_owner]; Chris@16: Chris@16: vertex_descriptor source(new_src_owner, local_src); Chris@16: vertex_descriptor target(new_tgt_owner, local_tgt); Chris@16: Chris@16: edge_property_type properties; Chris@16: detail::parallel::maybe_load_properties(m_ar, "edge_property", properties); Chris@16: Chris@16: void* property_ptr = maybe_load_property_ptr(directed_selector()); Chris@16: add_edge(source, target, properties, property_ptr, vertex_list_selector()); Chris@16: } Chris@16: Chris@16: load_in_edges(directed_selector()); Chris@16: commit_pending_edges(vertex_list_selector()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::load_in_edges(bidirectionalS) Chris@16: { Chris@16: std::size_t I; Chris@16: m_ar >> BOOST_SERIALIZATION_NVP(I); Chris@16: Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << "Loading in-edges\n"; Chris@16: # endif Chris@16: Chris@16: for (int i = 0; i < I; ++i) Chris@16: { Chris@16: process_id_type src_owner; Chris@16: local_vertex_descriptor local_src; Chris@16: local_vertex_descriptor local_target; Chris@16: void* property_ptr; Chris@16: Chris@16: m_ar >> make_nvp("src_owner", src_owner); Chris@16: m_ar >> make_nvp("source", unsafe_serialize(local_src)); Chris@16: m_ar >> make_nvp("target", unsafe_serialize(local_target)); Chris@16: m_ar >> make_nvp("property_ptr", unsafe_serialize(property_ptr)); Chris@16: Chris@16: src_owner = m_id_mapping[src_owner]; Chris@16: Chris@16: vertex_descriptor u(src_owner, local_src); Chris@16: vertex_descriptor v(process_id(m_pg), local_target); Chris@16: Chris@16: add_pending_in_edge(u, v, property_ptr, vertex_list_selector()); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::load_in_edges(directedS) Chris@16: {} Chris@16: Chris@16: template Chris@16: void graph_loader::add_pending_in_edge( Chris@16: vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS) Chris@16: { Chris@16: m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr)); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void graph_loader::add_pending_in_edge( Chris@16: vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything) Chris@16: { Chris@16: // u and v represent the out-edge here, meaning v is local Chris@16: // to us, and u is always remote. Chris@16: m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr)); Chris@16: add_remote_vertex_request(v, u, bidirectionalS()); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void graph_loader::add_edge( Chris@16: vertex_descriptor u, vertex_descriptor v Chris@16: , edge_property_type const& property, void* property_ptr, Anything) Chris@16: { Chris@16: m_pending_edges.push_back(pending_edge_type(u, v, property, property_ptr)); Chris@16: add_remote_vertex_request(u, v, directed_selector()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::add_remote_vertex_request( Chris@16: vertex_descriptor u, vertex_descriptor v, directedS) Chris@16: { Chris@16: // We have to request the remote vertex. Chris@16: m_requested_vertices[owner(v)].push_back(local(v)); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::add_remote_vertex_request( Chris@16: vertex_descriptor u, vertex_descriptor v, bidirectionalS) Chris@16: { Chris@16: // If the edge spans to another process, we know Chris@16: // that that process has a matching in-edge, so Chris@16: // we can just send our vertex. No requests Chris@16: // necessary. Chris@16: if (owner(v) != m_g.processor()) Chris@16: { Chris@16: m_remote_vertices[owner(v)].push_back(local(u)); Chris@16: m_requested_vertices[owner(v)].push_back(local(v)); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::add_edge( Chris@16: vertex_descriptor u, vertex_descriptor v Chris@16: , edge_property_type const& property, void* property_ptr, vecS) Chris@16: { Chris@16: std::pair inserted = Chris@16: detail::parallel::add_local_edge( Chris@16: local(u), local(v) Chris@16: , m_g.build_edge_property(property), m_g.base()); Chris@16: BOOST_ASSERT(inserted.second); Chris@16: put(edge_target_processor_id, m_g.base(), inserted.first, owner(v)); Chris@16: Chris@16: edge_descriptor e(owner(u), owner(v), true, inserted.first); Chris@16: Chris@16: if (inserted.second && m_g.on_add_edge) Chris@16: m_g.on_add_edge(e, m_g); Chris@16: Chris@16: add_in_edge(e, property_ptr, directed_selector()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::add_in_edge( Chris@16: edge_descriptor const&, void*, directedS) Chris@16: {} Chris@16: Chris@16: template Chris@16: void graph_loader::add_in_edge( Chris@16: edge_descriptor const& edge, void* old_property_ptr, bidirectionalS) Chris@16: { Chris@16: if (owner(target(edge, m_g)) == m_g.processor()) Chris@16: { Chris@16: detail::parallel::stored_in_edge Chris@16: e(m_g.processor(), local(edge)); Chris@16: boost::graph_detail::push(get( Chris@16: vertex_in_edges, m_g.base())[local(target(edge, m_g))], e); Chris@16: } Chris@16: else Chris@16: { Chris@16: // We send the (old,new) property pointer pair to Chris@16: // the remote process. This could be optimized to Chris@16: // only send the new one -- the ordering can be Chris@16: // made implicit because the old pointer value is Chris@16: // stored on the remote process. Chris@16: // Chris@16: // Doing that is a little bit more complicated, but Chris@16: // in case it turns out it's important we can do it. Chris@16: void* property_ptr = local(edge).get_property(); Chris@16: m_property_ptrs[owner(target(edge, m_g))].push_back( Chris@16: unsafe_pair(old_property_ptr, property_ptr)); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::resolve_property_ptrs() Chris@16: { Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << "Resolving property pointers\n"; Chris@16: # endif Chris@16: Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: std::sort( Chris@16: m_property_ptrs[i].begin(), m_property_ptrs[i].end()); Chris@16: } Chris@16: Chris@16: boost::parallel::inplace_all_to_all(m_pg, m_property_ptrs); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::resolve_remote_vertices(directedS) Chris@16: { Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end()); Chris@16: } Chris@16: Chris@16: boost::parallel::inplace_all_to_all( Chris@16: m_pg, m_requested_vertices, m_remote_vertices); Chris@16: Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i]) Chris@16: { Chris@16: u = m_local_vertices[u]; Chris@16: } Chris@16: } Chris@16: Chris@16: boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::resolve_remote_vertices(bidirectionalS) Chris@16: { Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: if (is_root()) Chris@16: std::cout << "Resolving remote vertices\n"; Chris@16: # endif Chris@16: Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: { Chris@16: std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end()); Chris@16: std::sort(m_remote_vertices[i].begin(), m_remote_vertices[i].end()); Chris@16: Chris@16: BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i]) Chris@16: { Chris@16: u = m_local_vertices[u]; Chris@16: } Chris@16: } Chris@16: Chris@16: boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices); Chris@16: Chris@16: for (int i = 0; i < num_processes(m_pg); ++i) Chris@16: BOOST_ASSERT(m_remote_vertices[i].size() == m_requested_vertices[i].size()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::commit_pending_edges(vecS) Chris@16: { Chris@16: commit_pending_in_edges(directed_selector()); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void graph_loader::commit_pending_edges(Anything) Chris@16: { Chris@16: resolve_remote_vertices(directed_selector()); Chris@16: Chris@16: BOOST_FOREACH(pending_edge_type const& e, m_pending_edges) Chris@16: { Chris@16: vertex_descriptor u = resolve_remote_vertex(e.source); Chris@16: vertex_descriptor v = resolve_remote_vertex(e.target); Chris@16: add_edge(u, v, e.properties, e.property_ptr, vecS()); Chris@16: } Chris@16: Chris@16: commit_pending_in_edges(directed_selector()); Chris@16: } Chris@16: Chris@16: template Chris@16: void graph_loader::commit_pending_in_edges(directedS) Chris@16: {} Chris@16: Chris@16: template Chris@16: void graph_loader::commit_pending_in_edges(bidirectionalS) Chris@16: { Chris@16: resolve_property_ptrs(); Chris@16: Chris@16: BOOST_FOREACH(pending_in_edge const& e, m_pending_in_edges) Chris@16: { Chris@16: vertex_descriptor u = resolve_remote_vertex(e.u, vertex_list_selector()); Chris@16: vertex_descriptor v = resolve_remote_vertex(e.v, vertex_list_selector()); Chris@16: Chris@16: typedef detail::parallel::stored_in_edge stored_edge; Chris@16: Chris@16: std::vector >::iterator i = std::lower_bound( Chris@16: m_property_ptrs[owner(u)].begin() Chris@16: , m_property_ptrs[owner(u)].end() Chris@16: , unsafe_pair(e.property_ptr, 0) Chris@16: ); Chris@16: Chris@16: if (i == m_property_ptrs[owner(u)].end() Chris@16: || i->first != e.property_ptr) Chris@16: { Chris@16: BOOST_ASSERT(false); Chris@16: } Chris@16: Chris@16: local_edge_descriptor local_edge(local(u), local(v), i->second); Chris@16: stored_edge edge(owner(u), local_edge); Chris@16: boost::graph_detail::push( Chris@16: get(vertex_in_edges, m_g.base())[local(v)], edge); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: typename graph_loader::vertex_descriptor Chris@16: graph_loader::resolve_remote_vertex( Chris@16: vertex_descriptor u) const Chris@16: { Chris@16: if (owner(u) == process_id(m_pg)) Chris@16: { Chris@16: return vertex_descriptor( Chris@16: process_id(m_pg), m_local_vertices.find(local(u))->second); Chris@16: } Chris@16: Chris@16: typename std::vector::const_iterator Chris@16: i = std::lower_bound( Chris@16: m_requested_vertices[owner(u)].begin() Chris@16: , m_requested_vertices[owner(u)].end() Chris@16: , serializable_vertex_descriptor(local(u)) Chris@16: ); Chris@16: Chris@16: if (i == m_requested_vertices[owner(u)].end() Chris@16: || *i != local(u)) Chris@16: { Chris@16: BOOST_ASSERT(false); Chris@16: } Chris@16: Chris@16: local_vertex_descriptor local = Chris@16: m_remote_vertices[owner(u)][m_requested_vertices[owner(u)].end() - i]; Chris@16: return vertex_descriptor(owner(u), local); Chris@16: } Chris@16: Chris@16: template Chris@16: typename graph_loader::vertex_descriptor Chris@16: graph_loader::resolve_remote_vertex( Chris@16: vertex_descriptor u, vecS) const Chris@16: { Chris@16: return u; Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: typename graph_loader::vertex_descriptor Chris@16: graph_loader::resolve_remote_vertex( Chris@16: vertex_descriptor u, Anything) const Chris@16: { Chris@16: return resolve_remote_vertex(u); Chris@16: } Chris@16: Chris@16: template Chris@16: void* Chris@16: graph_loader::maybe_load_property_ptr(bidirectionalS) Chris@16: { Chris@16: void* ptr; Chris@16: m_ar >> make_nvp("property_ptr", unsafe_serialize(ptr)); Chris@16: return ptr; Chris@16: } Chris@16: Chris@16: template Chris@16: void maybe_save_local_descriptor(Archive& ar, D const&, vecS) Chris@16: {} Chris@16: Chris@16: template Chris@16: void maybe_save_local_descriptor(Archive& ar, D const& d, NotVecS) Chris@16: { Chris@16: ar << serialization::make_nvp( Chris@16: "local", unsafe_serialize(const_cast(d))); Chris@16: } Chris@16: Chris@16: template Chris@16: void maybe_save_properties( Chris@16: Archive&, char const*, no_property const&) Chris@16: {} Chris@16: Chris@16: template Chris@16: void maybe_save_properties( Chris@16: Archive& ar, char const* name, property const& properties) Chris@16: { Chris@16: ar & serialization::make_nvp(name, get_property_value(properties, Tag())); Chris@16: maybe_save_properties(ar, name, static_cast(properties)); Chris@16: } Chris@16: Chris@16: template Chris@16: void save_in_edges(Archive& ar, Graph const& g, directedS) Chris@16: {} Chris@16: Chris@16: // We need to save the edges in the base edge Chris@16: // list, and the in_edges that are stored in the Chris@16: // vertex_in_edges vertex property. Chris@16: template Chris@16: void save_in_edges(Archive& ar, Graph const& g, bidirectionalS) Chris@16: { Chris@16: typedef typename Graph::process_group_type Chris@16: process_group_type; Chris@16: typedef typename process_group_type::process_id_type Chris@16: process_id_type; Chris@16: typedef typename graph_traits< Chris@16: Graph>::vertex_descriptor vertex_descriptor; Chris@16: typedef typename vertex_descriptor::local_descriptor_type Chris@16: local_vertex_descriptor; Chris@16: typedef typename graph_traits< Chris@16: Graph>::edge_descriptor edge_descriptor; Chris@16: Chris@16: process_id_type id = g.processor(); Chris@16: Chris@16: std::vector saved_in_edges; Chris@16: Chris@16: BGL_FORALL_VERTICES_T(v, g, Graph) Chris@16: { Chris@16: BOOST_FOREACH(edge_descriptor const& e, in_edges(v, g)) Chris@16: { Chris@16: // Only save the in_edges that isn't owned by this process. Chris@16: if (owner(e) == id) Chris@16: continue; Chris@16: Chris@16: saved_in_edges.push_back(e); Chris@16: } Chris@16: } Chris@16: Chris@16: std::size_t I = saved_in_edges.size(); Chris@16: ar << BOOST_SERIALIZATION_NVP(I); Chris@16: Chris@16: BOOST_FOREACH(edge_descriptor const& e, saved_in_edges) Chris@16: { Chris@16: process_id_type src_owner = owner(source(e,g)); Chris@16: local_vertex_descriptor local_src = local(source(e,g)); Chris@16: local_vertex_descriptor local_target = local(target(e,g)); Chris@16: void* property_ptr = local(e).get_property(); Chris@16: Chris@16: using serialization::make_nvp; Chris@16: Chris@16: ar << make_nvp("src_owner", src_owner); Chris@16: ar << make_nvp("source", unsafe_serialize(local_src)); Chris@16: ar << make_nvp("target", unsafe_serialize(local_target)); Chris@16: ar << make_nvp("property_ptr", unsafe_serialize(property_ptr)); Chris@16: } Chris@16: } Chris@16: Chris@16: template Chris@16: void maybe_save_property_ptr(Archive&, Edge const&, directedS) Chris@16: {} Chris@16: Chris@16: template Chris@16: void maybe_save_property_ptr(Archive& ar, Edge const& e, bidirectionalS) Chris@16: { Chris@16: void* ptr = local(e).get_property(); Chris@16: ar << serialization::make_nvp("property_ptr", unsafe_serialize(ptr)); Chris@16: } Chris@16: Chris@16: template Chris@16: void save_edges(Archive& ar, Graph const& g, DirectedS) Chris@16: { Chris@16: typedef typename Graph::process_group_type Chris@16: process_group_type; Chris@16: typedef typename process_group_type::process_id_type Chris@16: process_id_type; Chris@16: typedef typename graph_traits< Chris@16: Graph>::vertex_descriptor vertex_descriptor; Chris@16: Chris@16: typedef typename Graph::edge_property_type edge_property_type; Chris@16: Chris@16: int E = num_edges(g); Chris@16: ar << BOOST_SERIALIZATION_NVP(E); Chris@16: Chris@16: // For *directed* graphs, we can just save Chris@16: // the edge list and be done. Chris@16: // Chris@16: // For *bidirectional* graphs, we need to also Chris@16: // save the "vertex_in_edges" property map, Chris@16: // because it might contain in-edges that Chris@16: // are not locally owned. Chris@16: BGL_FORALL_EDGES_T(e, g, Graph) Chris@16: { Chris@16: vertex_descriptor src(source(e, g)); Chris@16: vertex_descriptor tgt(target(e, g)); Chris@16: Chris@16: typename vertex_descriptor::local_descriptor_type Chris@16: local_u(local(src)); Chris@16: typename vertex_descriptor::local_descriptor_type Chris@16: local_v(local(tgt)); Chris@16: Chris@16: process_id_type target_owner = owner(tgt); Chris@16: Chris@16: using serialization::make_nvp; Chris@16: Chris@16: ar << make_nvp("source", unsafe_serialize(local_u)); Chris@16: ar << make_nvp("target_owner", target_owner); Chris@16: ar << make_nvp("target", unsafe_serialize(local_v)); Chris@16: Chris@16: maybe_save_properties( Chris@16: ar, "edge_property" Chris@16: , static_cast(get(edge_all_t(), g, e)) Chris@16: ); Chris@16: Chris@16: maybe_save_property_ptr(ar, e, DirectedS()); Chris@16: } Chris@16: Chris@16: save_in_edges(ar, g, DirectedS()); Chris@16: } Chris@16: Chris@16: }} // namespace detail::parallel Chris@16: Chris@16: template Chris@16: template Chris@16: void PBGL_DISTRIB_ADJLIST_TYPE::load(std::string const& filename) Chris@16: { Chris@16: process_group_type pg = process_group(); Chris@16: process_id_type id = process_id(pg); Chris@16: Chris@16: synchronize(pg); Chris@16: Chris@16: std::vector disk_files = detail::parallel::available_process_files(filename); Chris@16: std::sort(disk_files.begin(), disk_files.end()); Chris@16: Chris@16: // Negotiate which process gets which file. Serialized. Chris@16: std::vector consumed_files; Chris@16: int picked_file = -1; Chris@16: Chris@16: if (id > 0) Chris@16: receive_oob(pg, id-1, 0, consumed_files); Chris@16: Chris@16: std::sort(consumed_files.begin(), consumed_files.end()); Chris@16: std::vector available_files; Chris@16: std::set_difference( Chris@16: disk_files.begin(), disk_files.end() Chris@16: , consumed_files.begin(), consumed_files.end() Chris@16: , std::back_inserter(available_files) Chris@16: ); Chris@16: Chris@16: if (available_files.empty()) Chris@16: boost::throw_exception(std::runtime_error("no file available")); Chris@16: Chris@16: // back() used for debug purposes. Making sure the Chris@16: // ranks are shuffled. Chris@16: picked_file = available_files.back(); Chris@16: Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: std::cout << id << " picked " << picked_file << "\n"; Chris@16: # endif Chris@16: Chris@16: consumed_files.push_back(picked_file); Chris@16: Chris@16: if (id < num_processes(pg) - 1) Chris@16: send_oob(pg, id+1, 0, consumed_files); Chris@16: Chris@16: std::string local_filename = filename + "/" + Chris@16: lexical_cast(picked_file); Chris@16: Chris@16: std::ifstream in(local_filename.c_str(), std::ios_base::binary); Chris@16: IStreamConstructibleArchive ar(in); Chris@16: Chris@16: detail::parallel::graph_loader< Chris@16: graph_type, IStreamConstructibleArchive, InVertexListS Chris@16: > loader(*this, ar); Chris@16: Chris@16: # ifdef PBGL_SERIALIZE_DEBUG Chris@16: std::cout << "Process " << id << " done loading.\n"; Chris@16: # endif Chris@16: Chris@16: synchronize(pg); Chris@16: } Chris@16: Chris@16: template Chris@16: template Chris@16: void PBGL_DISTRIB_ADJLIST_TYPE::save(std::string const& filename) const Chris@16: { Chris@16: typedef typename config_type::VertexListS vertex_list_selector; Chris@16: Chris@16: process_group_type pg = process_group(); Chris@16: process_id_type id = process_id(pg); Chris@16: Chris@16: if (filesystem::exists(filename) && !filesystem::is_directory(filename)) Chris@16: boost::throw_exception(std::runtime_error("entry exists, but is not a directory")); Chris@16: Chris@16: filesystem::remove_all(filename); Chris@16: filesystem::create_directory(filename); Chris@16: Chris@16: synchronize(pg); Chris@16: Chris@16: std::string local_filename = filename + "/" + Chris@16: lexical_cast(id); Chris@16: Chris@16: std::ofstream out(local_filename.c_str(), std::ios_base::binary); Chris@16: OStreamConstructibleArchive ar(out); Chris@16: Chris@16: using serialization::make_nvp; Chris@16: Chris@16: typename process_group_type::process_size_type num_processes_ = num_processes(pg); Chris@16: ar << make_nvp("num_processes", num_processes_); Chris@16: ar << BOOST_SERIALIZATION_NVP(id); Chris@16: ar << make_nvp("mapping", this->distribution().mapping()); Chris@16: Chris@16: int V = num_vertices(*this); Chris@16: ar << BOOST_SERIALIZATION_NVP(V); Chris@16: Chris@16: BGL_FORALL_VERTICES_T(v, *this, graph_type) Chris@16: { Chris@16: local_vertex_descriptor local_descriptor(local(v)); Chris@16: detail::parallel::maybe_save_local_descriptor( Chris@16: ar, local_descriptor, vertex_list_selector()); Chris@16: detail::parallel::maybe_save_properties( Chris@16: ar, "vertex_property" Chris@16: , static_cast(get(vertex_all_t(), *this, v)) Chris@16: ); Chris@16: } Chris@16: Chris@16: detail::parallel::save_edges(ar, *this, directed_selector()); Chris@16: Chris@16: ar << make_nvp("distribution", this->distribution()); Chris@16: } Chris@16: Chris@16: } // namespace boost Chris@16: Chris@16: #endif // BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP Chris@16: