-
Notifications
You must be signed in to change notification settings - Fork 59
Home
Testing Apache Flink's DataSet
and DataStream
API with Flinkspector is almost identical,
except for data streams working with time characteristics. That's why no separate documentation, for testing these API's, is provided. The chapter on input describes how to define timed input for streaming data flows. Most examples are made for DataSet
and DataStream
.
The Framework can be utilised by using one of the base classes for JUnit:
class Test extends DataSetTestBase {
@org.junit.Test
public myTest() {
DataSet<Integer> dataSet = createTestDataSet(asList(1,2,3))
.map((MapFunction<Integer,Integer>) (value) -> {return value + 1});
ExpectedRecords<Integer> expected =
new ExpectedRecords<Integer>().expectAll(asList(2,3,4))
assertDataSet(dataSet, expected);
}
}
The concept of Flinkspector is to define a list of input for each input of a transformation and specify expectations for each endpoint.
The best tactic for testing a Flink job, with Flinkspector, is to divide the whole processing logic of the job into smaller processing steps. Do this by bundling multiple transformations into a method.
public static DataStream<Tuple2<Integer, String>> aggregateViews(
DataStream<Tuple2<Integer, String>> stream) {
return stream.timeWindowAll(Time.of(20, seconds)).sum(0);
}
You will be rewarded with the opportunity to incrementally test processing steps while the whole logic of the job has not been defined. Side effects include: more comprehensible code and the possibility to easily recompose steps.
Don't use anonymous or private inner classes for
MapFunction
,FilterFunction
etc., even if Flinkspector enables you to test them. Test all functions you've written using separate unit tests.