Skip to content

Accumulators

Jaromir Vanek edited this page Jun 15, 2017 · 1 revision

Accumulators allow user to collect statistics and metrics during the flow execution. Accumulators API is inspired by the Hadoop/MapReduce counters.

The most simple type of accumulator is counter, but there are also other, more complex, types. Currently following types of accumulators are supported:

  • Counter - Accumulates values of long type effectively resulting in sum.
  • Histogram - Records distribution of long values.
  • Timer - Records distribution of time measurements.

How to use

To enable accumulators functionality the Executor needs to be provided with chosen AccumulatorProvider. There are a few predefined providers but it is also possible to create a custom provider that will send the metrics to some external system.

Using the predefined providers allows usage of Spark or Flink native accumulators. Results are then visible on the web interface. More information in Spark documentation or Flink documentation.

// using Spark native accumulators
SparkExecutor exec = new SparkExecutor();
exec.setAccumulatorProvider(SparkNativeAccumulators.Factory.get());

// using Flink native accumulators
FlinkExecutor exec = new FlinkExecutor();
exec.setAccumulatorProvider(FlinkNativeAccumulators.Factory.get());

API

You can create and use an instance of accumulator via provided Context or Collector inside operator user-defined functions.

// MapElements without use of accumulators don't need access to Context
Dataset<String> words = MapElements.of(dataset)
    .using(s -> s.toLowerCase())

// MapElements with access to Context can use accumulators
Dataset<String> mapped = MapElements.of(dataset)
    .using((input, context) -> {
      // use simple counter
      context.getCounter("my-counter").increment();
  
      // use histogram
      context.getHistogram("my-hist").add(5L);

      // use timer
      context.getTimer("my-timer").add(Duration.ofSeconds(5L));
  
      return input.toLowerCase();
    })

Generally accumulators are available in all user-defined functions that can access the Collector

// usage in FlatMap operator
Dataset<String> queries = FlatMap.of(input)
    .using((elem, collector) -> {
          collector.getCounter("my-counter").increment();
          collector.collect(elem.toLowerCase());
        })

// usage in ReduceByKey operator
ReduceByKey.of(input)
        .keyBy(Pair::getFirst)
        .valueBy(Pair::getSecond)
        .reduceBy((nums, collector) -> {
          long result = 0L;
          for (long n : nums) {
            collector.getCounter("sum").increment(n);
            result += n;
          }
          collector.collect(result);
        })
Clone this wiki locally