Skip to content

feat(new transform): Add new incremental_to_absolute transform #23374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

GreyLilac09
Copy link
Contributor

@GreyLilac09 GreyLilac09 commented Jul 14, 2025

Summary

Create new incremental_to_absolute transform

Useful for:

  • avoiding duplicate metrics cache creation at the sink level
  • creating a historical record of metrics to account for lossy connections/file-based back-filling

Problem it solves: #23018

Vector configuration

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    expire_metrics_secs: 120s (default)

How did you test this PR?

Example 1
Example configuration:

data_dir: ./vector-data-dir
sources:
  s0:
    type: static_metrics
    interval_secs: 1
    metrics:
      - name: response_time
        kind: incremental
        value:
          counter:
            value: 1
        tags: {}

transforms:
  t0:
    type: incremental_to_absolute
    inputs:
      - s0
    expire_metrics_secs: 120
sinks:
  console:
    type: console
    inputs:
      - t0
    target: stdout
    encoding:
      codec: json
      json:
        pretty: true

Example output:

{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T04:02:06.446891Z",
  "kind": "absolute",
  "counter": {
    "value": 1.0
  }
}
{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T04:02:07.447752Z",
  "kind": "absolute",
  "counter": {
    "value": 2.0
  }
}
{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T04:02:08.447934Z",
  "kind": "absolute",
  "counter": {
    "value": 3.0
  }
}
{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T04:02:09.447506Z",
  "kind": "absolute",
  "counter": {
    "value": 4.0
  }
}

Example 2
If the interval of metrics exceeds expire_metrics_secs (eg for sparse metrics), the counters will get reset

data_dir: ./vector-data-dir
sources:
  s0:
    type: static_metrics
    interval_secs: 20
    metrics:
      - name: response_time
        kind: incremental
        value:
          counter:
            value: 1
        tags: {}

transforms:
  t0:
    type: incremental_to_absolute
    inputs:
      - s0
    expire_metrics_secs: 10
sinks:
  console:
    type: console
    inputs:
      - t0
    target: stdout
    encoding:
      codec: json
      json:
        pretty: true

Output:

{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T21:15:53.807364Z",
  "kind": "absolute",
  "counter": {
    "value": 1.0
  }
}
{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T21:16:13.808047Z",
  "kind": "absolute",
  "counter": {
    "value": 1.0
  }
}
{
  "name": "response_time",
  "namespace": "static",
  "timestamp": "2025-07-16T21:16:33.808625Z",
  "kind": "absolute",
  "counter": {
    "value": 1.0
  }
}

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run cargo vdev build licenses to regenerate the license inventory and commit the changes (if any). More details here.

@GreyLilac09 GreyLilac09 requested review from a team as code owners July 14, 2025 21:21
@github-actions github-actions bot added domain: transforms Anything related to Vector's transform components domain: external docs Anything related to Vector's external, public documentation labels Jul 14, 2025
Box::pin(stream! {
let mut done = false;
while !done {
tokio::select! {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this shouldn't be here? I just copied this from the aggregate transform

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason to use select with only one future to await.

@thomasqueirozb
Copy link
Contributor

Hey @GreyLilac09, thanks for the PR. Please update the test plan in your description including a vector config and expected output. This part allows us to run your code with a config and easily validate what the expected output should look like.

Here is an example config you can build on top of:

sources:
  s0:
    type: static_metrics
    interval_secs: 1
    metrics:
      - name: response_time
        kind: incremental
        value:
          counter:
            value: 1
        tags: {}

transforms:
  t0:
    type: remap
    inputs:
      - s0
    source: |-
      .tags.output = "some value"

sinks:
  console:
    type: console
    inputs:
      - t0
    target: stdout
    encoding:
      codec: json
      json:
        pretty: true

or you can create one from scratch. Thanks!

@thomasqueirozb thomasqueirozb added the meta: awaiting author Pull requests that are awaiting their author. label Jul 15, 2025
@github-actions github-actions bot removed the meta: awaiting author Pull requests that are awaiting their author. label Jul 15, 2025
Copy link

@iadjivon iadjivon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All set from Docs!

@pront pront added the meta: awaiting author Pull requests that are awaiting their author. label Jul 15, 2025
@github-actions github-actions bot removed the meta: awaiting author Pull requests that are awaiting their author. label Jul 15, 2025
@github-actions github-actions bot added the domain: core Anything related to core crates i.e. vector-core, core-common, etc label Jul 16, 2025
@@ -58,11 +58,14 @@ impl MetricData {

/// Consumes this metric, returning it as an absolute metric.
///
/// If the metric was already absolute, nothing is changed.
/// The interval_ms is removed. If the metric was already absolute, nothing is changed.
Copy link
Contributor Author

@GreyLilac09 GreyLilac09 Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think without this change, DD sink (which use the presence of interval_ms to calculate rate) will be messed up

into_absolute appears to be only used by the remote write sink, and the resulting value doesn't have the interval_ms involved in the serialization, so I think this change is fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. An absolute metric value represents a value at time t.

@pront pront requested a review from Copilot July 16, 2025 19:50
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new incremental_to_absolute transform that converts incremental metrics to absolute metrics while preserving the cumulative values. This is useful for avoiding duplicate metric caches at sink levels and creating historical records for scenarios with lossy connections or file-based backfilling.

Key changes:

  • Implements the core transform logic with TTL-based metric expiration
  • Adds comprehensive documentation and configuration examples
  • Includes unit tests covering incremental-to-absolute conversion and pass-through behavior for already-absolute metrics

Reviewed Changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/transforms/incremental_to_absolute.rs Core implementation of the transform with metric conversion logic and tests
website/cue/reference/components/transforms/incremental_to_absolute.cue Documentation and configuration reference for the new transform
lib/vector-core/src/event/metric/data.rs Updates into_absolute() method to remove interval_ms when converting metrics
src/transforms/mod.rs Registers the new transform module
website/cue/reference/components.cue Adds feature definition for the transform
Cargo.toml Adds feature flags for the new transform
changelog.d/incremental_to_absolute_transform.feature.md Changelog entry documenting the new feature
Comments suppressed due to low confidence (2)

website/cue/reference/components/transforms/incremental_to_absolute.cue:50

  • The example title 'Aggregate over 5 seconds' is misleading. This transform converts incremental to absolute metrics but doesn't aggregate over time windows. A more accurate title would be 'Convert incremental counter to absolute'.
			title: "Aggregate over 5 seconds"

})
}
pub fn transform_one(&mut self, event: Event) -> Option<Event> {
if let Some(metric) = self.data.make_absolute(event.as_metric().clone()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably modify make_absolute to set interval_ms to None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is done in lib/vector-core/src/event/metric/data.rs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all paths end up hitting that conversion function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, make_absolute hits incremental_to_absolute which hits metrics.make_absolute()

@pront pront added the meta: awaiting author Pull requests that are awaiting their author. label Jul 16, 2025
@github-actions github-actions bot removed the meta: awaiting author Pull requests that are awaiting their author. label Jul 16, 2025
@GreyLilac09
Copy link
Contributor Author

GreyLilac09 commented Jul 17, 2025

After thinking about this some more, I'm not so sure if just the expire_metrics_secs would be the right approach. The problem is that a lot of incremental counters are sparse (eg they could be incremented as a statsd counter every few hours or days), and the current approach just miss all of them. I think a better approach would be to implement the MetricSet (see https://github.com/vectordotdev/vector/blob/master/src/sinks/util/buffer/metrics/normalize.rs) as an LRU cache rather than as a IndexMap, and have a configurable max size.

  1. I initially was thinking about this approach, but shied away from it because it would involve a much more substantial change to the code than I felt making at the time. However, I do think it's the right way to go about this.
  2. The problem with just an LRU cache is someone might have a scenario where they need to be able to handle extremely high bursts of data without dropping. In this scenario, the expire_metrics_secs would be useful.
    a. in this scenario, the cache would just eventually grow to the max size and stay at that size until restart. If the LRU cache max size is say 256 MB, one month later Vector could just sit at 256 MB allocated memory for Vector, even if it hasn't received any of those incremental counters for 27 days
    b. they might not be able to predict the size of the burst ahead of time, and they need to be flexible, so having a fixed max size would cause a lot of inaccurate data when that burst comes and it exceeds the allowed size

Thus, I would propose an additional configuration, eg

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    cache_max_size: 268435488 (default)
    expire_metrics_secs: 120s (default)

where cache_max_size is in bytes

Alternatively, maybe it would just be better to group the cache configs like we do for buffer and batch? So the config would be like

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    cache:
      max_size: 268435488
      timeout_secs: 120s

Curious to hear your thoughts. I'd also eventually like to add this config to the prom remote write sink (eg from this PR).

I can also do add the LRU cache in a separate PR, but it would not be ideal to change the config later (eg. go from expire_metrics_secs to cache.timeout_secs) if it can be avoided, so if we go with the second config we'd probably want to do it in this PR.

@pront pront added the meta: awaiting author Pull requests that are awaiting their author. label Jul 17, 2025
@GreyLilac09 GreyLilac09 requested a review from pront July 17, 2025 23:28
@github-actions github-actions bot removed the meta: awaiting author Pull requests that are awaiting their author. label Jul 17, 2025
@pront
Copy link
Member

pront commented Jul 18, 2025

he problem is that a lot of incremental counters are sparse (eg they could be incremented as a statsd counter every few hours or days), and the current approach just miss all of them.

Can you explain with an example?

From a UX perspective the following is better:

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    cache:
      max_size: 268435488
      timeout_secs: 120s

@GreyLilac09
Copy link
Contributor Author

For example, if we increment (+1) count every 10 minutes, and the expire_metrics_secs is 5 minutes, that count would always just show up as 1 (unchanging) in prometheus and the increase in value is never logged

@GreyLilac09
Copy link
Contributor Author

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    cache:
      max_size: 268435488
      timeout_secs: 120s

@pront that makes sense, if this is the case we should also change the config of prom remote write sink (#23286) to be the same. I think the plan would be to modify this PR to use the LRU cache with this config, and in a separate follow-up PR modify the prom remote write config to have the same?

@pront
Copy link
Member

pront commented Jul 18, 2025

transforms:
  incremental_to_absolute:
    type: incremental_to_absolute
    cache:
      max_size: 268435488
      timeout_secs: 120s

@pront that makes sense, if this is the case we should also change the config of prom remote write sink (#23286) to be the same. I think the plan would be to modify this PR to use the LRU cache with this config, and in a separate follow-up PR modify the prom remote write config to have the same?

Hi @GreyLilac09, this makes sense to me!

@pront pront added the meta: awaiting author Pull requests that are awaiting their author. label Jul 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: core Anything related to core crates i.e. vector-core, core-common, etc domain: external docs Anything related to Vector's external, public documentation domain: transforms Anything related to Vector's transform components meta: awaiting author Pull requests that are awaiting their author.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants