Graphs are everywhere. Internet, maps, and social networks to name just a few are all examples of massive graphs that contains vast amounts of useful information. Since the size of these networks is growing and processing them become more and more ubiquitous, we need better tools to do the job.
In this article, I’ll describe how we can use Flink Gelly library to process large graphs and will provide the simple example of how we can find a shortest path between two users in the Twitter graph.
Introduction to Gelly
In a nutshell, Flink Gelly is a library for graph processing implemented on top batch processing API in Apache Flink:
It allows us to process huge graphs in a distributed fashion using Apache Flink API.
You may wonder why do we need to have one more graph library? Since there are other existing graph processing systems (e.g. Apache Giraph) or general purpose Big Data systems that can be used for some intermediate graph processing it may seem that new graphs processing library may be superfluous.
Gelly still has one important advantage over other systems. Since it is a part of Apache Flink, it allows us to preprocess graph data, process graphs and transform result graphs using one system and one API. This is convenient both from development and operational standpoint since in this case we only have one API to learn and one system to operate.
Graph intro
As you probably know a graph is a set vertices connected by edges. Flink represents graphs as two datasets: a dataset of vertices and a dataset of edges.
/**
* @param <K> the key type for edge and vertex identifiers
* @param <VV> the value type for vertices
* @param <EV> the value type for edges
*/
public class Graph<K, VV, EV> {
private final DataSet<Vertex<K, VV>> vertices;
private final DataSet<Edge<K, EV>> edges;
...
}
The Graph
class has three generics arguments to specify a type of vertices keys that uniquely define, type of values associated with vertices and type of values associated with edges. So the following Graph
definition:
Graph<Long, String, Integer> graph = ...
represents a graph with vertices with keys of type Long
, vertices with values of type String
and edges with values of type Integer
.
The Vertex
in Gelly is essentially a tuple with two values: id of a vertex and an optional value associated with it. Similarly, the Edge
is tuple with three elements: ids of source and target values
Since the DataSet
is immutable Graph class is immutable as well, and all operations that change a graph create a new instance. Notice that the Graph
class in Gelly is always directed.
Creating Graph
We can create a graph using one of three ways:
- Generate a graph using one of existing graph generators
- Create a graph from one or two
DataSet
instances - Read a graph from a CSV file
Let’s see how we can create a graph in Gelly in different ways.
Graph generators
Flink supports a number of graph generators to generate star graphs, cycle graphs, complete graphs and so on. Here is an example of how to generate a complete graph where every vertex is connected to all other vertices:
Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
.generate();
Graph generators are useful for testing and allow you to quickly create a graph for your experiments.
Create a graph from datasets
The most common way to create a graph with Gelly is from one or several DataSet
instances. They are usually read from an external system, such as distributed filesystem or a database.
Gelly allows a lot of freedom here. The simplest way is to create a dataset of vertices and a dataset of edges and pass them to the Graph.fromDataSet
method:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, NullValue>> vertices = ...
DataSet<Edge<Long, NullValue>> edges = ...
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
For testing purposes we can the fromCollection
method that allows to create small graphs from in-memory collections:
List<Vertex<Long, NullValue>> vertices = ...
List<Edge<Long, NullValue>> edges = ...
Graph<Long, NullValue, NullValue> graph = Graph.fromCollection(vertices, edges, env);
We can also create a graph using only a dataset of edges:
DataSet<Edge<Long, String>> edges = ...
Graph<Long, NullValue, String> graph = Graph.fromDataSet(edges, env);
In this case Gelly would extract vertices keys from edges and assign no values to vertices.
Gelly provides more methods to create a graph and has different variations that accept datasets of Tuple
s and collections. You can find them in the official documentation.
Reading graph from CSV files
Of course, we can create a DataSet
instance from a CSV file and then create a Graph
instance out of it. But to simplify this common case, Gelly has special support for this use case. To do this, we need to use the Graph.fromCsvReader
method that reads vertices and edges data from two CSV files:
Graph<Long, Double, String> graph =
Graph.fromCsvReader("vertices.csv", "edges.csv", env)
.types(Long.class, Double.class, String.class);
The types
method is used to specify types of keys and values in the graph.
As before Gelly can create a graph from edges only:
Graph<String, NullValue, NullValue> simpleGraph =
Graph.fromCsvReader("edges.csv", env)
.keyType(String.class);
Working with graphs
Once we have a graph, it would be handy to be able to process it in some way. Let’s briefly go through what we can do with Gelly graphs:
Graph properties
These are the most straightforward methods that allow to query basic graph properties, such as number of vertices, number of edges, in-degrees, etc:
Graph<Long, Double, String> graph = ...
// Get graph vertices
DataSet<Vertex<Long, Double>> vertices = graph.getVertices();
// Get graph edges
DataSet<Edge<Long, String>> edges = graph.getEdges()
// get the IDs of the vertices as a DataSet
DataSet<Long> vertexIds = graph.getVertexIds()
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
DataSet<Tuple2<Long, LongValue>> inDegrees = graph.inDegrees()
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
DataSet<Tuple2<Long, LongValue>> outDegrees = graph.outDegrees()
Graph Transformations
With these methods, we can update vertices or edges. The group includes methods like:
- map – change values associated with edges or vertices
- filter – leave only edges and vertices that match a predefined predicate
- reverse – creates a graph where the direction of edges is reversed
- union – merges two graphs together
- difference – leave only vertices and edges that do not exist in another graph
Here is an example of filtering edges that keeps only edges where source and target vertices are different:
Graph<Integer, NullValue, NullValue> graph = ...
Graph<Integer, NullValue, NullValue> filteredGraph = graph.filterOnEdges(new FilterFunction<Edge<Integer, NullValue>>() {
@Override
public boolean filter(Edge<Integer, NullValue> edge) throws Exception {
// Keep only edges where source and target are different
return !edge.getSource().equals(edge.getTarget());
}
});
Graph Mutations
This group of methods includes methods that allow to add or remove edges and vertices:
Graph<Integer, Double, String> graph = ...
// Add edge to the graph
graph.addEdge(new Edge<Integer, String>(1, 2, "1-2"));
// Add vertex to the graph
graph.addVertex(new Vertex<Integer, Double>(1, 4.2));
// Remove edge from the graph
graph.removeEdge(new Edge<Integer, String>(1, 2, "1-2"));
Shortest Path in Twitter
Let’s get more practical. In the following example, I’ll show how to find the shortest path between two users in Twitter social graph. To do this, I’ll use Graph
methods mentioned above and the SingleSourceShortestPath
algorithm that calculates path length from a source vertex to all vertices in a graph.
Instead of reading data from Twitter I use Stanford Twitter Dataset. If you want to know more about data format and how we can read, please refer to my previous post. All it is important to know here is that before processing dataset I load data into a dataset of TwitterFollower
objects:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<TwitterFollower> twitterFollowers = env.createInput(new StanfordTweetsDataSetInputFormat("/Users/ivanmushketyk/Flink/twitter"));
Every TwitterFollower
object contains a pair of two Twitter users: a follower and the one he/she follows.
To process this data we first need to convert it into a dataset of Edge
s that we will use later to create a graph instance. To do this, we can use the map
method:
DataSet<Edge<Integer, NullValue>> twitterEdges = twitterFollowers
.map(new MapFunction<TwitterFollower, Edge<Integer, NullValue>>() {
@Override
public Edge<Integer, NullValue> map(TwitterFollower value) throws Exception {
Edge<Integer, NullValue> edge = new Edge<>();
edge.setSource(value.getFollower());
edge.setTarget(value.getUser());
return edge;
}
});
Graph<Integer, NullValue, NullValue> followersGraph = Graph.fromDataSet(twitterEdges, env);
When we create a graph from a dataset of edges Gelly populates a dataset of vertices using keys specified in input edges.
To calculate the shortest path we will use the SingleSourceShortestPaths
that calculates the shortest path from a source vertex to all other vertices in the graph. The problem is that the SingleSourceShortestPaths
algorithm only works on a weighted graph, meaning that every edge should have a Double
value associated with it. Since out graph so far has no values associated with edges we need to add them first.
To do this we can use the mapEdges
method that updates all edges in the graph. In this case we simply set 1.0
as a weight for every edge:
Graph<Integer, NullValue, Double> weightedFollowersGraph = followersGraph.mapEdges(new MapFunction<Edge<Integer, NullValue>, Double>() {
@Override
public Double map(Edge<Integer, NullValue> edge) throws Exception {
return 1.0;
}
});
Now when we have a weighted graph we can use the SingleSourceShortestPath
algorithm.
The process is pretty straightforward. First, we need to initialize the algorithm with a starting node, and a maximum number of iterations the algorithm will do before it returns a result:
// @fourzerotwo
int sourceVertex = 3359851;
int maxIterations = 10;
SingleSourceShortestPaths<Integer, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<>(sourceVertex, maxIterations);
DataSet<Vertex<Integer, Double>> result = singleSourceShortestPaths.run(weightedFollowersGraph);
Once we have an instance of the SingleSourceShortestPaths
we need to call the run
method and pass the weighted graph to it. This method returns a dataset of Vertices. IDs of these vertices correspond to vertices ids from the original graph, while values represent distances from the source vertex.
The SingleSourceShortestPaths
algorithm calculates the shortest path to all vertices in the network. Just out of curiosity we can display a path length from the target vertex to a random Twitter user:
// @soulpancake
int targetVertex = 19636959;
result.filter(vertex -> vertex.getId().equals(targetVertex))
.print();
Here is the output that I got:
(19636959,3.0)
Conclusions
Graph processing is ubiquitous and can be used in different domains. To tackle this Gelly helps us to use power of Flink API to process large scale graphs. It provides simple API to create and edit graphs and has a plethora of handy algorithms for different graphs processing tasks.
You can find the full code of the example from this post in my Git repository with other Flink examples.
More information
If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink
Here is a short preview of this course:
Share this: