GraphLab is Graph-Parallel
Designing and implementing efficient, bug free parallel and distributed algorithms can be very challenging. To address this challenge high-level data-parallel abstractions like Map-Reduce expose a simple computational pattern that isolates users form the complexities of large-scale parallel and distribute system design. Unfortunately, many important computational tasks are not inherently data-parallel and cannot be efficiently or intuitively expressed in data-parallel abstractions.
GraphLab is a high-level graph-parallel abstraction that efficiently and intuitively expresses computational dependencies. Unlike Map-Reduce where computation is applied to independent records, computation in GraphLab is applied to dependent records which are stored as vertices in a large distributed data-graph. Computation in GraphLab is expressed as a vertex-programs which are executed in parallel on each vertex and can interact with neighboring vertices. In contrast to the more general message passing and actor models, GraphLab constrains the interaction of vertex-programs to the graph structure enabling a wide range of system optimizations. GraphLab programs interact by directly reading the state of neighboring vertices and by modifying the state of adjacent edges. In addition, vertex-programs can signal neighboring vertex-programs causing them to be rerun at some point in the future.
PageRank Example
The easiest way to explain the GraphLab abstraction is through an example. The PageRank algorithm is a popular technique for analyzing the relative “importance” of webpages based on the hyperlink structure. At a high-level the PageRank algorithm captures the modeling assumption that a link to a webpage is a vote of confidence and that votes from important pages are more valuable. The PageRank of the webpage i is given by the recurrence equation:
where α is the random reset probability and N is the number of webpages. Because the PageRank of page i depends on the PageRank of the pages that link to page i, we iteratively apply the equation until the PageRank of each page converges. We can express this algorithm as the following vertex program:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
PageRank_vertex_program(vertex i) { // (Gather) Compute the sum of my neighbors rank double sum = 0; foreach(vertex j : in_neighbors(i)) { sum = sum + j.rank / num_out_neighbors(j); } // (Apply) Update my Rank (i) i.old_rank = i.rank; i.rank = (1-ALPHA)/num_vertices + ALPHA*sum; // (Scatter) If necessary signal my neighbors to recompute their rank if(abs(i.old_rank - i.rank) > EPSILON) { foreach(vertex j : out_neighbors(i)) signal(j); } } |
The above vertex-program is the basis of the original GraphLab1 abstraction. Notice that the vertex program can directly access the neighboring vertices:
|
1 2 3 |
foreach(vertex j : in_neighbors(i)) { sum = sum + j.rank / num_out_neighbors(j); } |
Using the resulting sum the program directly updates the value of its vertex:
|
1 2 |
i.old_rank = i.rank; i.rank = (1-ALPHA)/num_vertices + ALPHA*sum; |
Finally, using one of the more powerful features of the GraphLab abstraction, the vertex-program only triggers its neighboring vertices to recompute their value when the PageRank has changed significantly.
|
1 2 3 |
if(abs(i.old_rank - i.rank) > EPSILON) { foreach(vertex j : out_neighbors(i)) signal(j); } |
The signal command tells the GraphLab computational engine to run the vertex-program on the neighboring vertices.
GraphLab2: Cooking with GAS
The original GraphLab1 abstraction encoded the vertex-program as a single function in the underlying language (C++/Java). This made programming simple but limited the potential parallelism as well as the ability for the GraphLab runtime to apply some optimizations needed to really tackle massive problems. For example, in many real world applications a small set of vertices with have enormous neighborhoods (e.g., celebrities in a social network). These high degree vertices lead to performance bottlenecks since their vertex programs are executed sequentially on a single machine. Even worse, because the GraphLab engine cannot choose the order in which neighbors are touched or move parts of the vertex-program to the machines that store the data, certain critical optimizations are not possible.
Through our research in graph-parallel algorithms we discovered a common pattern. The vast majority of vertex-programs can be further decomposed into three phases: Gather, Apply, and Scatter (GAS). During the gather phase the vertex-program typically collects information about its neighborhoods. More importantly, the the computation in the gather phase typically resembles a micro map-reduce job over the neighborhood of the vertex. The result of the gather phase is then passed on to the apply phase in which the vertex program updates the value of the vertex. Finally, during the scatter phase the vertex program typically signals adjacent vertices.
The GraphLab2 abstraction refines the GraphLab1 abstraction by exploiting the GAS decomposition and requiring the user to explicitly break their program into gather, apply, and scatter functions. This allows GraphLab to parallelize and distribute the gather and scatter phases and employ sophisticated new techniques for data layout and caching.
PageRank in GraphLab2:
We can easily decompose the PageRank algorithm into the Gather, Apply, and Scatter phases:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
PageRank_vertex_program { // Gather Phase edge_dir gather_edges(vertex i) { return in_edges; } double gather(vertex i, edge e) { return e.source.rank / e.source.num_out_edges; } double gather_sum(double r1, double r2) { return r1 + r2; } // Apply Phase void apply(vertex i, double sum) { i.old_rank = i.rank; i.rank = (1-ALPHA)/num_vertices + ALPHA*sum; } // Scatter Phase edge_dir scatter_edges(vertex i) { return out_edges; } void scatter(vertex i, edge e) { if(abs(e.source.old_rank - e.source.rank ) > EPSILON) { signal(e.target); } } } |
Actual C++ Code:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
#include <graphlab.hpp> float RESET_PROB = 0.15; float TOLERANCE = 1.0E-2; // The vertex data is just the pagerank value (a float). // Extends POD type to enable serialization as "plain old data" struct vertex_data : public graphlab::IS_POD_TYPE { float rank; vertex_data() : rank(1) { } }; // There is no edge data in the pagerank application typedef graphlab::empty edge_data; // The graph type is determined by the vertex and edge data types typedef graphlab::distributed_graph<vertex_data, edge_data> graph_type; // PageRank Vertex Program class pagerank : public graphlab::ivertex_program<graph_type, float>, public graphlab::IS_POD_TYPE { float last_change; public: /* Gather the weighted rank of the adjacent page */ float gather(icontext_type& context, const vertex_type& vertex, edge_type& edge) const { return ((1.0 - RESET_PROB) / edge.source().num_out_edges()) * edge.source().data().rank; } /* Use the total rank of adjacent pages to update this page */ void apply(icontext_type& context, vertex_type& vertex, const gather_type& total) { const double newval = total + RESET_PROB; last_change = std::fabs(newval - vertex.data().rank); vertex.data().rank = newval; } /* The scatter edges depend on whether the pagerank has converged */ edge_dir_type scatter_edges(icontext_type& context, const vertex_type& vertex) const { if (last_change > TOLERANCE) return graphlab::OUT_EDGES; else return graphlab::NO_EDGES; } /* The scatter function just signal adjacent pages */ void scatter(icontext_type& context, const vertex_type& vertex, edge_type& edge) const { context.signal(edge.target()); } }; // end of factorized_pagerank update functor int main(int argc, char** argv) { // Initialize control plain using mpi graphlab::mpi_tools::init(argc, argv); graphlab::distributed_control dc; // Parse command line options ----------------------------------------------- graphlab::command_line_options clopts("PageRank algorithm."); std::string graph_dir; clopts.attach_option("graph", graph_dir, "The graph file. Required "); if(!clopts.parse(argc, argv)) { dc.cout() << "Error in parsing command line arguments." << std::endl; return EXIT_FAILURE; } if (graph_dir == "") { dc.cout() << "Graph not specified. Cannot continue"; return EXIT_FAILURE; } // Build the graph ---------------------------------------------------------- graph_type graph(dc, clopts); graph.load_format(graph_dir, "tsv"); // Running The Engine ------------------------------------------------------- graphlab::omni_engine<pagerank> engine(dc, graph, "synchronous", clopts); engine.signal_all(); engine.start(); const float runtime = engine.elapsed_seconds(); dc.cout() << "Finished Running engine in " << runtime << " seconds." << std::endl; // Tear-down communication layer and quit ----------------------------------- graphlab::mpi_tools::finalize(); return EXIT_SUCCESS; } // End of main |


