JDK 8 has introduced a lot of long-anticipated features to Java language. Among those, the most notable was the introduction of lambda functions. They allowed adding new frameworks such as Java 8 Streams, as well as, new features to existing frameworks like JUnit 5.

Apache Flink also supports lambda functions, and in this post, I’ll show how to enable them and how to use them in your applications.

Let’s say we need to implement a Flink application to count how often every word appears in our text. Using Java 7 we would implement it like:

DataSource<String> lines = env.fromElements(
    "Apache Flink is a community-driven open source framework for distributed big data analytics,",
    "like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written",
            ...
);

lines.flatMap(new FlatMapFunction<String, Object>() {
    @Override
    public void flatMap(String line, Collector<Object> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
})
.groupBy(0)
.sum(1)
.print();

The code is pretty straightforward. At first, it splits every line in the input dataset into separate words and emits tuples with word and number one. Then the applications groups all emitted tuple by word (0th position in emitted tuples) and sums up all ones (1st position in emitted tuples).

If you use Java 8 the first thing that you would like to change is to replace the FlatMapFunction in the previous example with a modern lambda expressions like this:

lines.flatMap((line, out) -> {
    String[] words = line.split("\\W+");
    for (String word : words) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.groupBy(0)
.sum(1)
.print();

Unfortunately, if we try to compile this class with javac and execute it on a Flink cluster we will get the following error:

$ bin/flink run wordCount.jar
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 

It turned out that Flink is using generics types in user defined functions to generate serializers. When we use anonymous functions this information is preserved, but lambda expressions are not anonymous classes. javac treats them differently and do not store generics types in the class file.

In the following sections of this post, I’ll demonstrate several different ways how you overcome this issue.

Use Eclipse JDT compiler

To solve this problem, one of the Flink committers contributed patches to Eclipse JDT and OpenJDK compilers. Eclipse JDT has accepted the patch, and as a result, it now preserves generics information for lambdas and can be used to implement Flink applications with lambda expressions.

To set Eclipse JDT as a compiler in our Maven project you need to add the following plugin to your pom.xml:

<plugins>
    <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <compilerId>jdt</compilerId>
        </configuration>
        <dependencies>
            <dependency>
                <groupId>org.eclipse.tycho</groupId>
                <artifactId>tycho-compiler-jdt</artifactId>
                <version>0.21.0</version>
            </dependency>
        </dependencies>
    </plugin>
</plugins>

After this you can compile and execute your application with lambda functions using Flink without any changes:

$ mvn clean package -Pbuild-jar
$ bin/flink run target/codes.brewing.flinkexamples-1.0-SNAPSHOT.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
...
(systems,2)
(the,3)
(to,1)
(written,1)

Provide type hints

If you don’t want to use Eclipse JDT compiler for some reason and want to stick with javac you can use a different, but less pretty approach. Apache Flink has a notion of type hints that you can use to, as the name suggests, hint the framework what type is used. To set the return type of the flatMap we can use the returns method.

lines.flatMap((line, out) -> {
    String[] words = line.split("\\W+");
    for (String word : words) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.returns(new TupleTypeInfo(TypeInformation.of(String.class), TypeInformation.of(Integer.class)))
.groupBy(0)
.sum(1)
.print();

The return method accepts an instance of the TypeInformation class that is used in Flink to represent information about Java types. It has multiple implementations such as BasicTypeInfo to represent primitive types, EnumTypeInfo for enums, CompositeType to represent composite types like POJOs, and some others. In this case we use TupleTypeInfo that represents type information of a Flink tuples. To create TupleTypeInfo we pass an array of primitive types that specifies the tuple’s structure.

The last code snippet, even if compiled with javac, will work just fine.

Don’t use lambdas everywhere

If you don’t want to bother either with the Eclipse compiler or with type hints you still have one more option: to use lambdas only to define a subset of operators. It turns out that Flink only need type information if your function has an Iterable or a Collector as one of its arguments. If you only use lambdas that don’t have such arguments and these functions return types without generics you don’t need to change anything in your code.

As an example, the following snippet can be compiled with javac:

lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String line, Collector<String> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(word);
        }
    }
})
.map(word -> new WordCount(word, 1))
.groupBy(wordCount -> wordCount.getWord())
.reduceGroup(new GroupReduceFunction<WordCount, WordCount>() {
    @Override
    public void reduce(Iterable<WordCount> values, Collector<WordCount> out) throws Exception {
        String word = null;
        int count = 0;

        for (WordCount wordCount : values) {
            word = wordCount.getWord();
            count += wordCount.getCount();
        }
        out.collect(new WordCount(word, count));
    }
})
.print();

Notice that in this code snippet we can use lambda function in the map and groupBy functions. In this case we also cannot use tuples (as in previous examples) since they have generics and they were replaced with a WordCount class.

Source code

You can find a full version of the source code from this post in my GitHub repository.

More information

If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink

Here is a short preview of this course:

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner. Creating courses for Pluralsight. Writing for DZone, SitePoint, and SimpleProgrammer.