Apache Flink has a versatile set of connectors for externals data sources. It can read and write data from databases, local and distributed file systems. However, sometimes what Flink provides is not enough, and we need to read some uncommon data format.

In this article, I will show you how to implement a custom connector for reading a dataset in Flink.

Data format

Before we start writing code let’s take a look at what we are going to read. As an example, I selected Twitter social graph from Stanford Network Analysis Project. The format is pretty straightforward and describes “follows” relationships between Twitter users. If we unpack the archive from the Standford website with the social graph, we will find a number of files with the .edges extension. Every file has a long list of followers list like this:

$ cat 100318079.edges
214328887 34428380
17116707 28465635
380580781 18996905
221036078 153460275
107830991 17868918
151338729 222261763

Every line in these files represents a “follows” relationship. The first number is a follower’s Twitter user id and the second value is a user who is followed on Twitter.

If you want to find a Twitter page by ids from these files just use the following URL:

https://twitter.com/intent/user?user_id=[user-id]

End goal

Let’s start with the end goal in mind. We would like to point Flink to a directory with all the edges files in it and let it create a dataset of edges for us. This should look like this:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<TwitterFollower> twitterFollowers = env.createInput(new StanfordTweetsDataSetInputFormat("/path/to/twitter/dataset"));

To read a dataset, we need to use the createInput function from the ExecutionEnvironment class. The createInput has a single argument of the InputFormat that defines how to read a dataset for data processing. We will implement this interface in this post.

The TwitterFollower is a POJO type that I defined specifically for this InputSource. It has two fields user id and followers id. Our InputSource will return a dataset of these objects.

public class TwitterFollower extends Tuple2<Integer, Integer> {

    public TwitterFollower() {
        super(null, null);
    }

    public TwitterFollower(int user, int follower) {
        super(user, follower);
    }

    public Integer getUser() {
        return f0;
    }

    public Integer getFollower() {
        return f1;
    }

    public void setUser(int user) {
        this.f0 = user;
    }

    public void setFollower(int follower) {
        this.f1 = follower;
    }
}

As you could notice it inherits Tuple2 class from Flink. It is not mandatory, but it will allow users of this class to use more efficient versions of group by and join operations in Flink when we only need to specify a field index in a dataset.

This is a common pattern that you can find in Flink sources. For example here is an implementation of a graph edge class from Flink’s Gelly library.

InputSource interface

To understand how to implement InputSource interface we first need to understand how it is used. When Flink is reading data from a data source, it first calls InputSource implementation to split input data into chunks of work that are called splits. Then Flink reads these splits of data in parallel.

These two steps are represented by two groups of methods in the InputFormat interface, and we will implement them one by one.

Splitting input data

The first group of methods is used to split input data into separate chunks that can be read in parallel:

  • configure – method that is called to configure an InputFormat
  • createInputSplits – this method defines how to split reading of the input data into independent chunks
  • getStatistics – get statistics about the input data

Let’s start with the configure method.


public class StanfordTweetsDataSetInputFormat extends RichInputFormat<TwitterFollower, TweetFileInputSplit> {
    
    private transient FileSystem fileSystem;
    private transient BufferedReader reader;
    private final String inputPath;
    private String nextLine;

    public StanfordTweetsDataSetInputFormat(String path) {
        this.inputPath = path;
    }

    @Override
    public void configure(Configuration parameters) {
    }
    ...
}

Our implementation does not need it, but other InputFormat implementations use it to read task-specific configuration. For example, FileInputFormat uses this method to read configuration value for recursive file reading:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<String> logs = env.readTextFile("file:///directory/path")
			  .withParameters(parameters);

Next method that we will implement is createInputSplits that as the name suggests creates an array of splits of input data that can be read in parallel.


private transient FileSystem fileSystem;

@Override
public TweetFileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
	FileSystem fileSystem = getFileSystem();
	// Get all files in the input directory
    FileStatus[] statuses = fileSystem.listStatus(new Path(path));

    List<TweetFileInputSplit> splits = new ArrayList<>();
    for (int i = 0; i < statuses.length; i++) {
        FileStatus status = statuses[i];
        String fileName = status.getPath().getName();
        // Ignore other auxiliary files
        if (fileName.endsWith("edges")) {
            // Create an input split to read one file
            splits.add(new TweetFileInputSplit(i, status.getPath()));
        }
    }

    return splits.toArray(new TweetFileInputSplit[splits.size()]);
}

private FileSystem getFileSystem() throws IOException {
    // Lazy initialization since FileSystem is not serializable
    if (fileSystem == null) {
        try {
            fileSystem = FileSystem.get(new URI(path));
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
    return fileSystem;
}

// Input split to read one input file
class TweetFileInputSplit implements InputSplit {

    private final int splitNumber;
    private final Path path;

    public TweetFileInputSplit(int splitNumber, Path path) {
        this.splitNumber = splitNumber;
        this.path = path;
    }

    @Override
    public int getSplitNumber() {
        return splitNumber;
    }

    public Path getPath() {
        return path;
    }
}

Let’s go through this code line by line. At first, we create an instance of Flink’s FileSystem class. This class allows performing operations with local or remote filesystems. For example, if we pass a URL like “hdfs://dir/” Flink will create an HDFS specific implementation.

Since we control how to read each split we have a lot of leeway regarding how to split input data. In this code we every split will contain a file name to read in parallel, but we could split files into blocks or group several files together in a single split.

The last method that we need to implement is getStatistics that returns information about data that we are about to read. Statistics contains three fields: total size of all files in bytes, number of records and average record width. We can only evaluate total files size in this case:

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    FileSystem fileSystem = createFileSystem();
    FileStatus[] statuses = fileSystem.listStatus(new Path(path));
    return new GraphStatistics(statuses.length);
}

private class GraphStatistics implements BaseStatistics {

    private long totalInputSize;

    public GraphStatistics(long totalInputSize) {
        this.totalInputSize = totalInputSize;
    }

    @Override
    public long getTotalInputSize() {
        return totalInputSize;
    }

    @Override
    public long getNumberOfRecords() {
        return BaseStatistics.NUM_RECORDS_UNKNOWN;
    }

    @Override
    public float getAverageRecordWidth() {
        return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
    }
}

If we cannot provide any useful information about data to read we can simply return null from the getStatistics method.

One optimization that we could do is to use cached statistic that is passed the getStatistics. To make use of it, we need to track time when previous statistics were calculated and recalculate it only if a filesystem was updated since then.

Reading data

The second group of methods in the InputSource interface is used for reading a single data split. In our case, it should read a single file with edges and produce an instance of TwitterFollower for every line.

To do this, we need to implement four methods:

  • open – this method is called to read records from a single input split
  • reachedEnd – called to check if there are more records to read in a single input split
  • nextRecord – called to read the next record (TwitterFollower in our case)
  • close – called to close all resources allocated for reading an input split

open method in this implementation is quite simple. We just need to create a reader for a text file:

@Override
public void open(TweetFileInputSplit split) throws IOException {
    FileSystem fileSystem = createFileSystem();
    this.reader = new BufferedReader(new InputStreamReader(fileSystem.open(split.getPath())));
    // Pre-read next line to easily check if we've reached the end of an input split
    this.nextLine = reader.readLine();
}

As before we use Flink’s FileSystem type to work with files. Its open method returns an InputStream for a file on a file system. Then we pre-read the first line from the opened file. We do this so that we can use in the nextRecord implementation that Flink is calling to check if any more items that can be read from a current split. All we need to do is to check if we have reached the end of the file:

@Override
public boolean reachedEnd() throws IOException {
    return nextLine == null;
}

Close method simply closes the file reader:

@Override
public void close() throws IOException {
    if (reader != null) {
        reader.close();
    }
}

Now the only thing that is left is to read records from the file. To do this, we need to implement the nextRecord method that reads a single line and converts it into a single TwitterFollower instance:

@Override
public TwitterFollower nextRecord(TwitterFollower reuse) throws IOException {
    String[] split = nextLine.split(" ");
    int userId = Integer.parseInt(split[1]);
    int followerId = Integer.parseInt(split[0]);

    reuse.setUser(userId);
    reuse.setFollower(followerId);
    nextLine = reader.readLine();

    return reuse;
}

Using built-in Flink classes

If you need to read data from a file system in Flink, I would not suggest to implement it yourself from scratch. Of course, it is interesting to understand how it can be implemented and useful to be able to implement one yourself, but Flink provides an abstract class called FileInputFormat that can be a good starting point. It can be extended to read data from file systems and supports more advanced features such as reading data from folders recursively and more advanced splitting that tries to balance sizes of input splits.

If FileInputFormat is too generic for you can use other more specific implementations as BinaryInputFormat for reading formats that use binary blocks of fixed size or DelimitedInputFormat that can be inherited if you need to implement a format with delimiter separated records.

Conclusions

In this post, you have learned how to read data from custom source in Flink, and now you can implement one yourself.

If you want to read the final version of the InputSource from this article, you can find it in my GithHub repo.

More information

If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink

Here is a short preview of this course:

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner. Creating courses for Pluralsight. Writing for DZone, SitePoint, and SimpleProgrammer.