If you’ve been following recent news in the Big Data world, you’ve probably heard about Apache Flink. This platform for batch and stream processing, which is built on a few significant technical innovations, can become a real game changer and it is starting to compete with existing products like Apache Spark.

In this post, I would like to show how to implement a simple batch processing algorithm using Apache Flink. We will work with a dataset of movie ratings and will produce a distribution of user ratings. In the process, I’ll show few tricks that you can use to improve the performance of your Flink applications.

Get the data

At first, we need to get the data to work with. Here I will use free data from the GroupLens website that has multiple movie rating datasets for development and research. To make things a bit faster I will run all tests on their small development dataset with 100K ratings, but the code that I present here can be used on datasets of any size.

If you are following along, simply download and unpack the dataset. It contains four CSV files:

  • ratings.csv – contains ratings of movies by users on the 1 to 5 scale. It has four fields: user id, movie id, rating, and a timestamp of a rating
  • movies.csv – contains information about the movies rated in the ratings.csv file. It has three fields: movie id, movie name, and a list of genres like “Comedy|Drama|Romance”
  • links.csv – contains links between movies ids in this dataset and ids on the IMDB and TMDb movie databases
  • tags.csv – represents tags assigned by users to movies in this dataset. Contains four fields: user id, movie id, string tag, and a timestamp when tag was added

In this post, we will work only with data in the ratings.csv file, but in the following blog posts I will show other, more sophisticated algorithms that will use other files in this dataset.

If you take a look at the ratings.csv you will find something like this:

userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179
...

The first line here means that user with id 1 rated movie 31 with grade 2.5.

Create the project

Creating an Apache Flink project is pretty straightforward. Apache Flink developers created a project template for us, so all we need to do is to use the Maven archetype:generate command

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.flink \
      -DarchetypeArtifactId=flink-quickstart-java \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.1.3

It will generate a pom.xml file and several example Flink applications. To write our own, we need to create a new Java class with a main method. It will work in both development mode and on a Flink cluster.

Write the code

As result of our application, we would like to see a distribution of ratings in the movies dataset. In other words, we need to count how many times users have published one-star ratings, how many they’ve published times two-star ratings, and so on.

The first thing that we need to do is to read the file. This can be done with the following:

public class RatingsDistribution {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> file = env.readTextFile("ml-latest-small/ratings.csv");
    }
}

In the first line, we create an execution environment that will give us access to Flink features. If we run this application on a development machine, it will locally start a mini Flink cluster for testing. If we execute this application on a Flink cluster, it will use existing cluster resources to perform data processing.

The second line of the main function will read data from a local filesystem, which is handy for development but Flink also supports reading data from HDFS, S3, and other input sources. To read from a different source, we can use a URL like: “hdfs://path/to/file” or “s3://path/to/file” which instructs Flink to access a distributed filesystem.

At this moment, we have a DataSet: an object that works as a handle for data in Flink. Every item in this dataset represents a single line from the downloaded CSV file. We can use this DataSet instance to perform data transformations like map, filter, groupBy, etc. to implement our data processing algorithm. These operations form sort of a pipeline for data processing.

The next thing that we need to do is convert all items in this dataset into Java objects that we can process. To do this, we will use the flatMap function that converts every input element into zero, one, or multiple output elements.

Flink already has a function to read CSV files, but I just want to show how we can implement it ourselves.

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> file = env.readTextFile("ml-latest-small/ratings.csv");
DataSet<Tuple2<IntValue, Integer>> ratings = file.flatMap(new ExtractRating());
...

private static class ExtractRating implements FlatMapFunction<String, Tuple2<IntValue, Integer>> {
    // Mutable int field to reuse to reduce GC pressure
    IntValue ratingValue = new IntValue();

    // Reuse rating value and result tuple
    Tuple2<IntValue, Integer> result = new Tuple2<>(ratingValue, 1);

    @Override
    public void flatMap(String s, Collector<Tuple2<IntValue, Integer>> collector) throws Exception {
        // Every line contains comma separated values
        // user id | item id | rating | timestamp
        String[] split = s.split(",");
        String ratingStr = split[2];

        // Ignore CSV header
        if (!ratingStr.equals("rating")) {
            int rating = (int) Double.parseDouble(split[2]);
            ratingValue.setValue(rating);

            collector.collect(result);
        }
    }
}

There are a few things to go through here. First of all, we need to provide an instance of a FlatMapFunction that transforms input Strings into tuples. We could convert them into custom POJOs as well, but Flink has some nice utility methods that help to work with Tuples more efficiently, so we will stick to them.

To produce a tuple that can be used by the next stage of the pipeline we need to use collector.collect method. The only case when we do not produce an output value is when we process a header in the CSV file.

Notice that we are reusing the same Tuple2 instance between calls. While we could create a new tuple on every call Flink allows us to reuse return values to reduce GC. To reuse as many objects as possible from call to call we also create an instance of IntValue class and pass a reference to it to result Tuple. IntValue is a mutable counterpart of the Integer class in JDK, and we use it to build a result tuple once, and then we only need to change int value in the existing object.

You can see that on every call we generate a tuple with two elements: rating value and number one. We will use this second element to group by rating value and sum them to get the result distribution.

To do this, we need to call the groupBy method and specify a key to group elements by. Flink provides multiple ways to specify a key, but since we are using tuples, we can simply specify a position of a tuple field to group by. Since rating values are in the first field, we need pass 0:

file.flatMap(new ExtractRating())
    .groupBy(0);

At this stage, we have a grouped dataset, and we can process elements in every group. To do this we can use the reduceGroup method:

file.flatMap(new ExtractRating())
    .groupBy(0)
    .reduceGroup(new SumRatingCount());
...
private static class SumRatingCount implements GroupReduceFunction<Tuple2<IntValue, Integer>, Tuple2<IntValue, Integer>> {
    @Override
    public void reduce(Iterable<Tuple2<IntValue, Integer>> iterable, Collector<Tuple2<IntValue, Integer>> collector) throws Exception {
        IntValue rating = null;
        int ratingsCount = 0;
        for (Tuple2<IntValue, Integer> tuple : iterable) {
            rating = tuple.f0;
            ratingsCount += tuple.f1;
        }

        collector.collect(new Tuple2<>(rating, ratingsCount));
    }
}

The code is pretty straightforward. We need to implement an instance of GroupReduceFunction with a single method reduce that will be called to process a group of elements with a single key. Flink passes an iterable to access elements in the group and a collector (as in the flatMap function) that should be used to produce the final result.

Our pipeline is finished, but Flink won’t do anything at this stage yet. All the code that we’ve written so far only defines the structure of our application. Now we need to instruct it to start execution. There are multiple ways to do it, but the simplest way for development purposes is to call the print function. It will execute a data processing algorithm and output the result to the stdout:

file.flatMap(new ExtractRating())
    .groupBy(0)
    .reduceGroup(new SumRatingCount())
    .print();

Flink is not limited to this and can also output data to filesystems and databases.

Now we can simply execute the main function and inspect the result. It will start a mini cluster and print the result:

14:10:10,299 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
14:10:10,730 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Disabled queryable state server
14:10:10,812 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
...
14:10:14,095 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Task manager akka://flink/user/taskmanager_1 is completely shut down.
(3,30602)
(0,1101)
(1,5013)
(5,15095)
(2,11720)
(4,36473)

Simplify the code

Summing values in a group is a common operation in batch processing algorithms, and Flink has a shortcut to do this. Instead of using the reduceGroup function, we can use the sum function that does exactly this. Since we are using tuples, we need to specify a field number to sum, which is the second field in our case:

file.flatMap(new ExtractRating())
    .groupBy(0)
    .sum(1)
    .print();

You can find the result application in my GitHub repo.

Conclusions

In this blog post, we’ve explored the Flink batch processing API and implemented a simple data processing algorithm. In the next blog posts, I’ll show how to implement more complicated examples that use other batch processing techniques.

 

More information

If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink

Here is a short preview of this course:

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner. Creating courses for Pluralsight. Writing for DZone, SitePoint, and SimpleProgrammer.