If you know anything about Apache Flink, you are probably familiar with how to send data to it and how to get results back. But in some cases, we need to send configuration data to the Flink cluster and receive some additional data from it.
In the first part of the article, I’ll describe how to send configuration data to our Flink cluster. There are many things that we want to configure: function parameters, configuration files, machine learning models. Flink provides several different ways to do this, and we will cover how to use them and when to use each one. In the second part of the article, I will describe a non-trivial way of sending data back from a Flink cluster.
This article requires some basic knowledge of Apache Flink. If you are not familiar with it, you can read some of my other articles on the topic: here, here, and here.
Sending data to task managers
Before we dig into the details of how to send data between different components in Apache Flink, let’s first talk about what components there are in a Flink cluster and what are we trying to achieve. The following diagram presents what main moving parts Flink has and how they interact:
When we need to execute a Flink application, we interact with a job manager that stores details about the job it is running, such as an execution graph. It controls task managers and each task manager contains a portion of the data and execute data processing functions that we’ve defined.
In many cases, we would like to configure the behavior of our functions that run in the Flink cluster. Depending on a use-case we may need to set a single variable or submit a file with a static configuration, and we will discuss how Flink supports these and other cases.
In addition to sending configuration data to task managers, sometimes we may want to return data from our functions in addition to regular outputs.
Configuring user-defined functions
Let’s say we have an application that reads a list of movies from a CSV file and needs to filter all movies of a particular genre:
// Read a dataset of movies
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
.ignoreFirstLine()
.parseQuotedStrings('"')
.ignoreInvalidLines()
.types(Long.class, String.class, String.class);
lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
// Genres for a movie separated by the "|" symbol
String[] genres = movie.f2.split("\\|");
// Find all movies that has the "Action" genre
return Stream.of(genres).anyMatch(g -> g.equals("Action"));
}).print();
It is very likely that we would like to extract movies of a different genre and to this we need to be able to configure our filter function. When you implement a function like this, the most straightforward way to configure it is to implement a constructor:
// Pass a genre name
lines.filter(new FilterGenre("Action"))
.print();
...
class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> {
String genre;
// Initialize filter function
public FilterGenre(String genre) {
this.genre = genre;
}
@Override
public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
String[] genres = movie.f2.split("\\|");
return Stream.of(genres).anyMatch(g -> g.equals(genre));
}
}
Alternatively, if you are using lambda functions you can simply use a variable from its closure:
final String genre = "Action";
lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
String[] genres = movie.f2.split("\\|");
// Using variable
return Stream.of(genres).anyMatch(g -> g.equals(genre));
}).print();
Flink will serialize this variable and send it with the function to the cluster.
All these methods can get annoying if you need to pass a lot of variables to your function. To help with that Apache Flink provides the withParameters
method. To use it you need to implement a Rich
version of a function you are interested in, so instead of implementing the MapFunction
interface, you will have to implement the RichMapFunction
.
Rich functions allow you to pass a number of parameters using the withParameters
method:
// Class in Flink to store parameters
Configuration configuration = new Configuration();
configuration.setString("genre", "Action");
lines.filter(new FilterGenreWithParameters())
// Pass parameters to a function
.withParameters(configuration)
.print();
```
To read these parameters we need to implement the `open` and read parameters in it:
```java
class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {
String genre;
@Override
public void open(Configuration parameters) throws Exception {
// Read the parameter
genre = parameters.getString("genre", "");
}
@Override
public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
String[] genres = movie.f2.split("\\|");
return Stream.of(genres).anyMatch(g -> g.equals(genre));
}
}
```
All these options will work, but it can be tedious if you need to set the same parameter for multiple functions. To handle this Flink allows setting global environments variable that will be accessible by all task managers.
To do this, you first need to read arguments from a command line using the `ParameterTool.fromArgs`:
```java
public static void main(String... args) {
// Read command line arguments
ParameterTool parameterTool = ParameterTool.fromArgs(args);
...
}
and then set global job parameters using the setGlobalJobParameters
:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
...
// This function will be able to read these global parameters
lines.filter(new FilterGenreWithGlobalEnv())
.print();
Now we can implement a function that will read these parameters. As before it should be a rich function:
class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {
@Override
public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
String[] genres = movie.f2.split("\\|");
// Get global parameters
ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// Read parameter
String genre = parameterTool.get("genre");
return Stream.of(genres).anyMatch(g -> g.equals(genre));
}
}
To read a parameter we need to call the getGlobalJobParameter
to get all global parameters and then use the get
method to get the parameter we are interested in.
Broadcast variables
All these methods that we’ve discussed before will suit you if you want to send data from a client to task managers, but what if data exists in task managers in the form of a dataset? In this case, it’s better to use another Flink feature called broadcast variables. It simply allows sending a dataset to task managers that will execute your functions.
Let’s say we have a dataset that contains words that we should ignore when we do text processing, and we want to set it our function. To set a broadcast variable for a single function, we need to use the withBroadcastSet
method and a dataset to it.
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
// Get a dataset with words to ignore
DataSet<String> wordsToIgnore = ...
data.map(new RichFlatMapFunction<String, String>() {
// A collection to store words. This will be stored in memory
// of a task manager
Collection<String> wordsToIgnore;
@Override
public void open(Configuration parameters) throws Exception {
// Read a collection of words to ignore
wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
}
@Override
public String map(String line, Collector<String> out) throws Exception {
String[] words = line.split("\\W+");
for (String word : words)
// Use the collection of words to ignore
if (wordsToIgnore.contains(word))
out.collect(new Tuple2<>(word, 1));
}
// Pass a dataset via a broadcast variable
}).withBroadcastSet(wordsToIgnore, "wordsToIgnore");
You should keep in mind that if you use broadcast variables, a dataset will be stored in a task manager’s memory, so you should only use it for small datasets.
If you want to send more data to each task manager and do not want to store this data in memory, you can send a static file to task managers using Flink’s distributed cache. To use it you, first, need to store a file in one of the distributed file systems like HDFS and then register this file in the cache:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Register a file from HDFS
env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")
...
env.execute()
To access the distributed cache, we again need to implement a rich function:
class MyClassifier extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
...
}
@Override
public Integer map(String value) throws Exception {
...
}
}
Notice that to access a file in the distributed cache we need to use the same key that we used to register it.
Accumulators
We’ve covered how we can send data to task managers but now let’s talk about how we can send data from task managers back. You may wonder why do we need to do anything special. After all, Apache Flink is all about building data processing pipelines that read input data, process it, and return a result back.
To clarify what else can we possibly want let’s take a look at an example. Let’s say we need to count how many times each word occurs in a text and at the same time we want to calculate how many lines do we have in the text:
// Text to process
DataSet<String> lines = ...
// Word count algorithm
lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split("\\W+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.groupBy(0)
.sum(1)
.print();
// Count a number of lines in the text to process
int linesCount = lines.count()
System.out.println(linesCount);
The problem is that if we run this application as it is will run two Flink jobs! First to get the word count and second to count a number of lines.
This is definitely inefficient, but how can we avoid this? One way is to use accumulators. They allow you to send data from task managers and this data to be aggregated using a predefined function. Flink has following built-in accumulators:
- IntCounter, LongCounter, DoubleCounter – allows summing together int, long, double values sent from task managers
- AverageAccumulator – calculates an average of double values
- LongMaximum, LongMinimum, IntMaximum, IntMinimum, DoubleMaximum, DoubleMinimum – accumulators to determine maximum and minimum values for different types
- Histogram – used to computed distribution of values from task managers
To use an accumulator, we need to create and register it an user-defined function and then read the result on the client. Here is how we can do this:
lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
// Create an accumulator
private IntCounter linesNum = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
// Register accumulator
getRuntimeContext().addAccumulator("linesNum", linesNum);
}
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split("\\W+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
// Increment after each line is processed
linesNum.add(1);
}
})
.groupBy(0)
.sum(1)
.print();
// Get accumulator result
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);
This allows us to count how many times each word occurs in the input text and how many lines does it have.
If you need a custom accumulator, you can also implement your own accumulators using Accumulator or SimpleAccumulator interfaces.
More information
I hope you liked this article and found it useful. You can find the source code for this article in my git repository with other Apache Flink examples.
I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.
Share this: