Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: resolve max statement in watch-processor #1770

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"follow-redirects": "1.15.9",
"http-status-codes": "^2.3.0",
"json-pointer": "^0.6.2",
"kubernetes-fluent-client": "3.3.8",
"kubernetes-fluent-client": "^3.4.0",
"pino": "9.6.0",
"pino-pretty": "13.0.0",
"prom-client": "15.1.3",
Expand Down
94 changes: 91 additions & 3 deletions src/lib/processors/watch-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
// SPDX-FileCopyrightText: 2023-Present The Pepr Authors
import { afterAll, beforeEach, describe, expect, it, jest } from "@jest/globals";
import { GenericClass, K8s, KubernetesObject, kind } from "kubernetes-fluent-client";
import { K8sInit, WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { K8sInit, WatchPhase, WatcherType } from "kubernetes-fluent-client/dist/fluent/types";
import { WatchCfg, WatchEvent, Watcher } from "kubernetes-fluent-client/dist/fluent/watch";
import { Capability } from "../core/capability";
import { setupWatch, logEvent, queueKey, getOrCreateQueue } from "./watch-processor";
import { setupWatch, logEvent, queueKey, getOrCreateQueue, registerWatchEventHandlers } from "./watch-processor";
import Log from "../telemetry/logger";
import { metricsCollector } from "../telemetry/metrics";
import { metricsCollector, MetricsCollectorInstance } from "../telemetry/metrics";
import { EventEmitter } from "stream";

type onCallback = (eventName: string | symbol, listener: (msg: string) => void) => void;

Expand Down Expand Up @@ -154,7 +155,7 @@
setupWatch(capabilities);
});

it("should setup watches with correct phases for different events", async () => {

Check warning on line 158 in src/lib/processors/watch-processor.test.ts

View workflow job for this annotation

GitHub Actions / format

Async arrow function has too many statements (39). Maximum allowed is 20
const watchCallbackCreate = jest.fn();
const watchCallbackUpdate = jest.fn();
const watchCallbackDelete = jest.fn();
Expand Down Expand Up @@ -415,3 +416,90 @@
expect(firstQueue).toBe(secondQueue);
});
});

describe("registerWatchEventHandlers", () => {
let watcher: WatcherType<GenericClass>;
let logEvent: jest.Mock;
let metricsCollector: MetricsCollectorInstance;

beforeEach(() => {
const eventEmitter = new EventEmitter();

watcher = {
events: eventEmitter,
} as unknown as WatcherType<GenericClass>;

jest.spyOn(eventEmitter, "on");
logEvent = jest.fn();

metricsCollector = {
incCacheMiss: jest.fn(),
initCacheMissWindow: jest.fn(),
incRetryCount: jest.fn(),
} as unknown as MetricsCollectorInstance;

registerWatchEventHandlers(watcher, logEvent, metricsCollector);
});
it("log event on CONNECT", () => {
watcher.events.emit(WatchEvent.CONNECT, "url");
expect(logEvent).toHaveBeenCalledWith(WatchEvent.CONNECT, "url");
});
it("log event on DATA_ERROR", () => {
watcher.events.emit(WatchEvent.DATA_ERROR, new Error("data_error"));
expect(logEvent).toHaveBeenCalledWith(WatchEvent.DATA_ERROR, "data_error");
});

it("log event on RECONNECT", () => {
watcher.events.emit(WatchEvent.RECONNECT, 1);
expect(logEvent).toHaveBeenCalledWith(WatchEvent.RECONNECT, "Reconnecting after 1 attempt");
});

it("log event on RECONNECT_PENDING", () => {
watcher.events.emit(WatchEvent.RECONNECT_PENDING);
expect(logEvent).toHaveBeenCalledWith(WatchEvent.RECONNECT_PENDING);
});

it("log event on ABORT", () => {
watcher.events.emit(WatchEvent.ABORT, new Error("abort"));
expect(logEvent).toHaveBeenCalledWith(WatchEvent.ABORT, "abort");
});
it("log event on OLD_RESOURCE_VERSION", () => {
watcher.events.emit(WatchEvent.OLD_RESOURCE_VERSION, "old_resource_version");
expect(logEvent).toHaveBeenCalledWith(WatchEvent.OLD_RESOURCE_VERSION, "old_resource_version");
});
it("log event on NETWORK_ERROR", () => {
watcher.events.emit(WatchEvent.NETWORK_ERROR, new Error("network_error"));
expect(logEvent).toHaveBeenCalledWith(WatchEvent.NETWORK_ERROR, "network_error");
});

it("log event on LIST_ERROR", () => {
watcher.events.emit(WatchEvent.LIST_ERROR, new Error("network_error"));
expect(logEvent).toHaveBeenCalledWith(WatchEvent.LIST_ERROR, "network_error");
});

it("log event on LIST", () => {
watcher.events.emit(WatchEvent.LIST, { apiVersion: "v1", items: [] });
expect(logEvent).toHaveBeenCalledWith(
WatchEvent.LIST,
JSON.stringify({ apiVersion: "v1", items: [] }, undefined, 2),
);
});

it("log event on CACHE_MISS", () => {
watcher.events.emit(WatchEvent.CACHE_MISS, "2025-02-05T04:14:39.535Z");
expect(metricsCollector.incCacheMiss).toHaveBeenCalledWith("2025-02-05T04:14:39.535Z");
});
it("log event on INIT_CACHE_MISS", () => {
watcher.events.emit(WatchEvent.INIT_CACHE_MISS, "2025-02-05T04:14:39.535Z");
expect(metricsCollector.initCacheMissWindow).toHaveBeenCalledWith("2025-02-05T04:14:39.535Z");
});
it("log event on INC_RESYNC_FAILURE_COUNT", () => {
watcher.events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, 1);
expect(metricsCollector.incRetryCount).toHaveBeenCalledWith(1);
});

it("log event on DATA", () => {
watcher.events.emit(WatchEvent.DATA);
expect(logEvent).not.toHaveBeenCalled();
});
});
91 changes: 58 additions & 33 deletions src/lib/processors/watch-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import Log from "../telemetry/logger";
import { Binding } from "../types";
import { Capability } from "../core/capability";
import { Event } from "../enums";
import { K8s, KubernetesObject, WatchCfg, WatchEvent } from "kubernetes-fluent-client";
import { K8s, KubernetesObject, WatchCfg, WatchEvent, GenericClass } from "kubernetes-fluent-client";
import { Queue } from "../core/queue";
import { WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { WatchPhase, WatcherType } from "kubernetes-fluent-client/dist/fluent/types";
import { KubernetesListObject } from "kubernetes-fluent-client/dist/types";
import { filterNoMatchReason } from "../filter/filter";
import { metricsCollector } from "../telemetry/metrics";
import { metricsCollector, MetricsCollectorInstance } from "../telemetry/metrics";
import { removeFinalizer } from "../finalizer";

// stores Queue instances
Expand Down Expand Up @@ -157,36 +158,8 @@ async function runBinding(
}
}, watchCfg);

// If failure continues, log and exit
watcher.events.on(WatchEvent.GIVE_UP, err => {
Log.error(err, "Watch failed after 5 attempts, giving up");
process.exit(1);
});

watcher.events.on(WatchEvent.CONNECT, url => logEvent(WatchEvent.CONNECT, url));

watcher.events.on(WatchEvent.DATA_ERROR, err => logEvent(WatchEvent.DATA_ERROR, err.message));
watcher.events.on(WatchEvent.RECONNECT, retryCount =>
logEvent(WatchEvent.RECONNECT, `Reconnecting after ${retryCount} attempt${retryCount === 1 ? "" : "s"}`),
);
watcher.events.on(WatchEvent.RECONNECT_PENDING, () => logEvent(WatchEvent.RECONNECT_PENDING));
watcher.events.on(WatchEvent.GIVE_UP, err => logEvent(WatchEvent.GIVE_UP, err.message));
watcher.events.on(WatchEvent.ABORT, err => logEvent(WatchEvent.ABORT, err.message));
watcher.events.on(WatchEvent.OLD_RESOURCE_VERSION, err => logEvent(WatchEvent.OLD_RESOURCE_VERSION, err));
watcher.events.on(WatchEvent.NETWORK_ERROR, err => logEvent(WatchEvent.NETWORK_ERROR, err.message));
watcher.events.on(WatchEvent.LIST_ERROR, err => logEvent(WatchEvent.LIST_ERROR, err.message));
watcher.events.on(WatchEvent.LIST, list => logEvent(WatchEvent.LIST, JSON.stringify(list, undefined, 2)));
watcher.events.on(WatchEvent.CACHE_MISS, windowName => {
metricsCollector.incCacheMiss(windowName);
});

watcher.events.on(WatchEvent.INIT_CACHE_MISS, windowName => {
metricsCollector.initCacheMissWindow(windowName);
});

watcher.events.on(WatchEvent.INC_RESYNC_FAILURE_COUNT, retryCount => {
metricsCollector.incRetryCount(retryCount);
});
// Register event handlers
registerWatchEventHandlers(watcher, logEvent, metricsCollector);

// Start the watch
try {
Expand All @@ -205,3 +178,55 @@ export function logEvent(event: WatchEvent, message: string = "", obj?: Kubernet
Log.debug(logMessage);
}
}

export type WatchEventArgs<K extends WatchEvent, T extends GenericClass> = {
[WatchEvent.LIST]: KubernetesListObject<InstanceType<T>>;
[WatchEvent.RECONNECT]: number;
[WatchEvent.CACHE_MISS]: string;
[WatchEvent.INIT_CACHE_MISS]: string;
[WatchEvent.GIVE_UP]: Error;
[WatchEvent.ABORT]: Error;
[WatchEvent.OLD_RESOURCE_VERSION]: string;
[WatchEvent.NETWORK_ERROR]: Error;
[WatchEvent.LIST_ERROR]: Error;
[WatchEvent.DATA_ERROR]: Error;
[WatchEvent.CONNECT]: string;
[WatchEvent.RECONNECT_PENDING]: undefined;
[WatchEvent.DATA]: undefined;
[WatchEvent.INC_RESYNC_FAILURE_COUNT]: number;
}[K];

export type LogEventFunction = (event: WatchEvent, message?: string) => void;
export function registerWatchEventHandlers(
watcher: WatcherType<GenericClass>,
logEvent: LogEventFunction,
metricsCollector: MetricsCollectorInstance,
): void {
const eventHandlers: {
[K in WatchEvent]?: (arg: WatchEventArgs<K, GenericClass>) => void;
} = {
[WatchEvent.DATA]: () => null,
[WatchEvent.GIVE_UP]: err => {
// If failure continues, log and exit
logEvent(WatchEvent.GIVE_UP, err.message);
process.exit(1);
},
[WatchEvent.CONNECT]: url => logEvent(WatchEvent.CONNECT, url),
[WatchEvent.DATA_ERROR]: err => logEvent(WatchEvent.DATA_ERROR, err.message),
[WatchEvent.RECONNECT]: retryCount =>
logEvent(WatchEvent.RECONNECT, `Reconnecting after ${retryCount} attempt${retryCount === 1 ? "" : "s"}`),
[WatchEvent.RECONNECT_PENDING]: () => logEvent(WatchEvent.RECONNECT_PENDING),
[WatchEvent.ABORT]: err => logEvent(WatchEvent.ABORT, err.message),
[WatchEvent.OLD_RESOURCE_VERSION]: errMessage => logEvent(WatchEvent.OLD_RESOURCE_VERSION, errMessage),
[WatchEvent.NETWORK_ERROR]: err => logEvent(WatchEvent.NETWORK_ERROR, err.message),
[WatchEvent.LIST_ERROR]: err => logEvent(WatchEvent.LIST_ERROR, err.message),
[WatchEvent.LIST]: list => logEvent(WatchEvent.LIST, JSON.stringify(list, undefined, 2)),
[WatchEvent.CACHE_MISS]: windowName => metricsCollector.incCacheMiss(windowName),
[WatchEvent.INIT_CACHE_MISS]: windowName => metricsCollector.initCacheMissWindow(windowName),
[WatchEvent.INC_RESYNC_FAILURE_COUNT]: retryCount => metricsCollector.incRetryCount(retryCount),
};

Object.entries(eventHandlers).forEach(([event, handler]) => {
watcher.events.on(event, handler);
});
}
2 changes: 1 addition & 1 deletion src/lib/telemetry/metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ test("incCacheMiss increments cache miss gauge", async () => {
test("incRetryCount increments retry count gauge", async () => {
const collector: MetricsCollector = new MetricsCollector("testPrefix");

collector.incRetryCount("1");
collector.incRetryCount(1);

const metrics: string = await collector.getMetrics();
expect(metrics).toMatch(/testPrefix_resync_failure_count{count="1"} 1/);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/telemetry/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Log from "./logger";

const loggingPrefix = "MetricsCollector";

type MetricsCollectorInstance = InstanceType<typeof MetricsCollector>;
export type MetricsCollectorInstance = InstanceType<typeof MetricsCollector>;
interface MetricNames {
errors: string;
alerts: string;
Expand Down Expand Up @@ -148,8 +148,8 @@ export class MetricsCollector {
* Increments the retry count gauge.
* @param count - The count to increment by.
*/
incRetryCount = (count: string): void => {
this.incGauge(this.#metricNames.resyncFailureCount, { count });
incRetryCount = (count: number): void => {
this.incGauge(this.#metricNames.resyncFailureCount, { count: count.toString() });
};

/**
Expand Down
Loading