-
Notifications
You must be signed in to change notification settings - Fork 11
Accumulators
Accumulators allow user to collect statistics and metrics during the flow execution. Accumulators API is inspired by the Hadoop/MapReduce counters.
The most simple type of accumulator is counter, but there are also other, more complex, types. Currently following types of accumulators are supported:
- Counter - Accumulates values of long type effectively resulting in sum.
- Histogram - Records distribution of long values.
- Timer - Records distribution of time measurements.
To enable accumulators functionality the Executor
needs to be provided with chosen AccumulatorProvider
. There are a few predefined providers but it is also possible to create a custom provider that will send the metrics to some external system.
Using the predefined providers allows usage of Spark or Flink native accumulators. Results are then visible on the web interface. More information in Spark documentation or Flink documentation.
// using Spark native accumulators
SparkExecutor exec = new SparkExecutor();
exec.setAccumulatorProvider(SparkNativeAccumulators.Factory.get());
// using Flink native accumulators
FlinkExecutor exec = new FlinkExecutor();
exec.setAccumulatorProvider(FlinkNativeAccumulators.Factory.get());
You can create and use an instance of accumulator via provided Context
or Collector
inside operator user-defined functions.
// MapElements without use of accumulators don't need access to Context
Dataset<String> words = MapElements.of(dataset)
.using(s -> s.toLowerCase())
// MapElements with access to Context can use accumulators
Dataset<String> mapped = MapElements.of(dataset)
.using((input, context) -> {
// use simple counter
context.getCounter("my-counter").increment();
// use histogram
context.getHistogram("my-hist").add(5L);
// use timer
context.getTimer("my-timer").add(Duration.ofSeconds(5L));
return input.toLowerCase();
})
Generally accumulators are available in all user-defined functions that can access the Collector
// usage in FlatMap operator
Dataset<String> queries = FlatMap.of(input)
.using((elem, collector) -> {
collector.getCounter("my-counter").increment();
collector.collect(elem.toLowerCase());
})
// usage in ReduceByKey operator
ReduceByKey.of(input)
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.reduceBy((nums, collector) -> {
long result = 0L;
for (long n : nums) {
collector.getCounter("sum").increment(n);
result += n;
}
collector.collect(result);
})