Skip to content

Commit

Permalink
fix(telemetry): fix polling gauges and timers (#1218)
Browse files Browse the repository at this point in the history
  • Loading branch information
massimo-pacher-tw committed Jan 28, 2024
1 parent 080d570 commit 957000f
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 49 deletions.
2 changes: 2 additions & 0 deletions igor-core/igor-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {

// TODO(rz): Get rid of this dependency!
implementation "com.squareup.retrofit:retrofit"

testImplementation "com.netflix.spectator:spectator-reg-micrometer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import net.logstash.logback.argument.StructuredArguments;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -44,16 +48,16 @@ public abstract class CommonPollingMonitor<I extends DeltaItem, T extends Pollin
protected final Id missedNotificationId;
private final DiscoveryStatusListener discoveryStatusListener;
private final AtomicLong lastPoll = new AtomicLong();
private final Id itemsCachedId;
private final Id itemsOverThresholdId;
private final Id pollCycleFailedId;
private final Id pollCycleTimingId;
private final Optional<LockService> lockService;
private ScheduledFuture<?> monitor;
private final CommonPollingMonitorInstrumentation instrumentation;
protected Logger log = LoggerFactory.getLogger(getClass());
protected TaskScheduler scheduler;
protected final DynamicConfigService dynamicConfigService;

private Map<String, AtomicInteger> itemsOverThresholdMap = new ConcurrentHashMap<>();
private Map<String, AtomicInteger> itemsCachedMap = new ConcurrentHashMap<>();

public CommonPollingMonitor(
IgorConfigurationProperties igorProperties,
Registry registry,
Expand All @@ -67,12 +71,9 @@ public CommonPollingMonitor(
this.discoveryStatusListener = discoveryStatusListener;
this.lockService = lockService;
this.scheduler = scheduler;
this.instrumentation = new CommonPollingMonitorInstrumentation(registry);

itemsCachedId = registry.createId("pollingMonitor.newItems");
itemsOverThresholdId = registry.createId("pollingMonitor.itemsOverThreshold");
pollCycleFailedId = registry.createId("pollingMonitor.failed");
missedNotificationId = registry.createId("pollingMonitor.missedEchoNotification");
pollCycleTimingId = registry.createId("pollingMonitor.pollTiming");
}

@Override
Expand All @@ -86,20 +87,19 @@ public void onApplicationEvent(RemoteStatusChangedEvent event) {
this.monitor =
scheduler.schedule(
() ->
registry
.timer(pollCycleTimingId.withTag("monitor", getClass().getSimpleName()))
.record(
() -> {
if (isInService()) {
poll(true);
lastPoll.set(System.currentTimeMillis());
} else {
log.info(
"not in service (lastPoll: {})",
(lastPoll.get() == 0) ? "n/a" : lastPoll.toString());
lastPoll.set(0);
}
}),
instrumentation.trackPollCycleTime(
this.getName(),
() -> {
if (isInService()) {
poll(true);
lastPoll.set(System.currentTimeMillis());
} else {
log.info(
"not in service (lastPoll: {})",
(lastPoll.get() == 0) ? "n/a" : lastPoll.toString());
lastPoll.set(0);
}
}),
new PeriodicTrigger(getPollInterval(), TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -163,7 +163,17 @@ protected String getLockName(String name, String partition) {
}

protected void internalPollSingle(PollContext ctx) {
String monitorName = getClass().getSimpleName();
String monitorName =
!StringUtils.isBlank(this.getName()) ? this.getName() : getClass().getSimpleName();

itemsCachedMap.putIfAbsent(ctx.partitionName, new AtomicInteger(0));
itemsOverThresholdMap.putIfAbsent(ctx.partitionName, new AtomicInteger(0));

instrumentation.trackItemsCached(
itemsCachedMap.get(ctx.partitionName), monitorName, ctx.partitionName);

instrumentation.trackItemsOverThreshold(
itemsOverThresholdMap.get(ctx.partitionName), monitorName, ctx.partitionName);

try {
T delta = generateDelta(ctx);
Expand All @@ -175,58 +185,40 @@ protected void internalPollSingle(PollContext ctx) {
boolean sendEvents = !ctx.fastForward;
int deltaSize = delta.getItems().size();
if (deltaSize > upperThreshold) {
registry
.gauge(
itemsOverThresholdId.withTags(
"monitor", monitorName, "partition", ctx.partitionName))
.set(deltaSize);
itemsOverThresholdMap.get(ctx.partitionName).set(deltaSize);
if (ctx.fastForward) {
log.warn(
"Fast forwarding items ({}) in {} {}",
deltaSize,
itemsOverThresholdMap.get(ctx.partitionName).get(),
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName));
sendEvents = false;
} else {
log.error(
"Number of items ({}) to cache exceeds upper threshold ({}) in {} {}",
deltaSize,
itemsOverThresholdMap.get(ctx.partitionName).get(),
upperThreshold,
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName));
return;
}
} else {
registry
.gauge(
itemsOverThresholdId.withTags(
"monitor", monitorName, "partition", ctx.partitionName))
.set(0);
itemsOverThresholdMap.get(ctx.partitionName).set(0);
}

sendEvents = sendEvents && isSendEventsEnabled();

commitDelta(delta, sendEvents);
registry
.gauge(itemsCachedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(deltaSize);
itemsCachedMap.get(ctx.partitionName).set(deltaSize);
} catch (Exception e) {
log.error(
"Failed to update monitor items for {}:{}",
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName),
e);
registry
.counter(
pollCycleFailedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.increment();
registry
.gauge(itemsCachedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(0);
registry
.gauge(
itemsOverThresholdId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(0);
instrumentation.trackPollCycleFailed(monitorName, ctx.partitionName);
itemsCachedMap.get(ctx.partitionName).set(0);
itemsOverThresholdMap.get(ctx.partitionName).set(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Wise, PLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.igor.polling;

import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.concurrent.atomic.AtomicInteger;

public class CommonPollingMonitorInstrumentation {

private final Registry registry;
private final Id itemsCachedId;
private final Id itemsOverThresholdId;
private final Id pollCycleFailedId;
private final Id pollCycleTimingId;

public CommonPollingMonitorInstrumentation(Registry registry) {
this.registry = registry;
itemsCachedId = registry.createId("pollingMonitor.newItems");
itemsOverThresholdId = registry.createId("pollingMonitor.itemsOverThreshold");
pollCycleFailedId = registry.createId("pollingMonitor.failed");
pollCycleTimingId = registry.createId("pollingMonitor.pollTiming");
}

public void trackItemsCached(AtomicInteger numberOfItems, String monitor, String partition) {
Gauge gauge =
(Gauge) registry.get(itemsCachedId.withTags("monitor", monitor, "partition", partition));

/*
Spectator gauges are slightly different from Micrometer ones: the polling gauge
has been deprecated in favour of PolledMeter.
Previous implementation resulted in NaN most of the times, while we want to store observations
on Prometheus. We don't need a DistributionSummary, but just a gauge which doesn't get garbage
collected. For a unique combination of (metricName, tags), PolledMeter will sum up the observed
values, so to avoid this, we set the gauge ONCE and we pass a strong reference holding the
latest value to be observed.
*/

if (gauge == null) {
PolledMeter.using(registry)
.withId(itemsCachedId.withTags("monitor", monitor, "partition", partition))
.monitorValue(numberOfItems);
}
}

public void trackItemsOverThreshold(
AtomicInteger numberOfItems, String monitor, String partition) {
Gauge gauge =
(Gauge)
registry.get(itemsOverThresholdId.withTags("monitor", monitor, "partition", partition));
if (gauge == null) {
PolledMeter.using(registry)
.withId(itemsOverThresholdId.withTags("monitor", monitor, "partition", partition))
.monitorValue(numberOfItems);
}
}

public void trackPollCycleTime(String monitor, Runnable lambda) {
registry.timer(pollCycleTimingId.withTags("monitor", monitor)).record(lambda);
}

public void trackPollCycleFailed(String monitor, String partition) {
registry
.counter(getPollCycleFailedId().withTags("monitor", monitor, "partition", partition))
.increment();
}

public Id getItemsCachedId() {
return itemsCachedId;
}

public Id getItemsOverThresholdId() {
return itemsOverThresholdId;
}

public Id getPollCycleFailedId() {
return pollCycleFailedId;
}

public Id getPollCycleTimingId() {
return pollCycleTimingId;
}
}
Loading

0 comments on commit 957000f

Please sign in to comment.