diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index bf03c02e..01e5aaef 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -48,6 +48,8 @@ jobs: - name: Test run: yarn test + env: + MANAGEMENT_API_KEY: ${{ secrets.MANAGEMENT_API_KEY }} - name: Configure Git User run: | @@ -58,7 +60,8 @@ jobs: if: ${{ github.event.inputs.dryRun == 'true'}} env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: yarn lerna version --no-push --no-git-tag-version --loglevel silly --yes + run: | + yarn lerna version --no-push --no-git-tag-version --loglevel silly --yes - name: Setup NPM Token if: ${{ github.event.inputs.dryRun == 'false'}} @@ -71,6 +74,7 @@ jobs: if: ${{ github.event.inputs.dryRun == 'false'}} env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # Adds all params to both version and publish just to be safe. run: | - yarn lerna version --yes - yarn lerna publish from-git --yes --loglevel silly + yarn lerna version --conventionalPrerelease --preid=beta --no-private + yarn lerna publish from-git --loglevel silly --dist-tag=beta --pre-dist-tag=beta --conventionalPrerelease --preid=beta --no-private diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dc7e6701..8d763a85 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,3 +47,5 @@ jobs: - name: Test run: yarn test + env: + MANAGEMENT_API_KEY: ${{ secrets.MANAGEMENT_API_KEY }} diff --git a/.gitignore b/.gitignore index 5757ec87..85f90cec 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ dist/ # Example Experiment tag script packages/experiment-tag/example/ + +# Environment variables +.env* \ No newline at end of file diff --git a/lerna.json b/lerna.json index fbc25b49..0a517788 100644 --- a/lerna.json +++ b/lerna.json @@ -7,11 +7,12 @@ "useWorkspaces": true, "command": { "version": { - "allowBranch": "main", + "allowBranch": "stream-vardata", "conventionalCommits": true, "createRelease": "github", "message": "chore(release): publish", - "preid": "beta" + "preid": "beta", + "conventionalPrerelease": true } } } diff --git a/packages/experiment-browser/package.json b/packages/experiment-browser/package.json index f29d002d..bd8526f7 100644 --- a/packages/experiment-browser/package.json +++ b/packages/experiment-browser/package.json @@ -38,6 +38,7 @@ "@amplitude/experiment-core": "^0.11.0", "@amplitude/ua-parser-js": "^0.7.31", "base64-js": "1.5.1", + "eventsource": "^2", "unfetch": "4.1.0" }, "devDependencies": { diff --git a/packages/experiment-browser/src/config.ts b/packages/experiment-browser/src/config.ts index b181b737..d669f367 100644 --- a/packages/experiment-browser/src/config.ts +++ b/packages/experiment-browser/src/config.ts @@ -94,6 +94,18 @@ export interface ExperimentConfig { */ flagConfigPollingIntervalMillis?: number; + /** + * If true, the client will stream updates for remote evaluation from the server. + * fetch() will update variants and initiate a connection to the server. + */ + streamVariants?: boolean; + + /** + * The URL to stream remote evaluation updates from. This is only used if + * `streamVariants` is `true`. + */ + streamVariantsServerUrl?: string; + /** * Explicitly enable or disable calling {@link fetch()} on {@link start()}: * @@ -190,6 +202,8 @@ export const Defaults: ExperimentConfig = { automaticExposureTracking: true, pollOnStart: true, flagConfigPollingIntervalMillis: 300000, + streamVariants: false, + streamVariantsServerUrl: 'https://stream.lab.amplitude.com', fetchOnStart: true, automaticFetchOnAmplitudeIdentityChange: false, userProvider: null, diff --git a/packages/experiment-browser/src/experimentClient.ts b/packages/experiment-browser/src/experimentClient.ts index eab54be9..9544754c 100644 --- a/packages/experiment-browser/src/experimentClient.ts +++ b/packages/experiment-browser/src/experimentClient.ts @@ -7,11 +7,13 @@ import { EvaluationApi, EvaluationEngine, EvaluationFlag, + EvaluationVariant, FetchError, FlagApi, Poller, SdkEvaluationApi, SdkFlagApi, + SdkStreamEvaluationApi, TimeoutError, topologicalSort, } from '@amplitude/experiment-core'; @@ -29,6 +31,7 @@ import { import { LocalStorage } from './storage/local-storage'; import { SessionStorage } from './storage/session-storage'; import { FetchHttpClient, WrapperClient } from './transport/http'; +import { defaultSseProvider } from './transport/stream'; import { exposureEvent } from './types/analytics'; import { Client, FetchOptions } from './types/client'; import { Exposure, ExposureTrackingProvider } from './types/exposure'; @@ -51,18 +54,22 @@ import { } from './util/convert'; import { SessionAnalyticsProvider } from './util/sessionAnalyticsProvider'; import { SessionExposureTrackingProvider } from './util/sessionExposureTrackingProvider'; +import { + VariantsFetchUpdater, + VariantsRetryAndFallbackWrapperUpdater, + VariantsStreamUpdater, + VariantUpdater, +} from './util/updaters'; // Configs which have been removed from the public API. // May be added back in the future. -const fetchBackoffTimeout = 10000; -const fetchBackoffAttempts = 8; -const fetchBackoffMinMillis = 500; -const fetchBackoffMaxMillis = 10000; -const fetchBackoffScalar = 1.5; const minFlagPollerIntervalMillis = 60000; +const streamConnectionTimeoutMillis = 3000; +const streamRetryIntervalMillis = 10 * 60 * 1000; const euServerUrl = 'https://api.lab.eu.amplitude.com'; const euFlagsServerUrl = 'https://flag.lab.eu.amplitude.com'; +const euStreamVariantsServerUrl = 'https://stream.lab.eu.amplitude.com'; /** * The default {@link Client} used to fetch variations from Experiment's @@ -76,7 +83,7 @@ export class ExperimentClient implements Client { private readonly variants: LoadStoreCache; private readonly flags: LoadStoreCache; private readonly flagApi: FlagApi; - private readonly evaluationApi: EvaluationApi; + private readonly variantUpdater: VariantUpdater; private readonly engine: EvaluationEngine = new EvaluationEngine(); private user: ExperimentUser | undefined; private userProvider: ExperimentUserProvider | undefined; @@ -117,6 +124,11 @@ export class ExperimentClient implements Client { (config?.serverZone?.toLowerCase() === 'eu' ? euFlagsServerUrl : Defaults.flagsServerUrl), + streamVariantsServerUrl: + config?.streamVariantsServerUrl || + (config?.serverZone?.toLowerCase() === 'eu' + ? euStreamVariantsServerUrl + : Defaults.streamVariantsServerUrl), // Force minimum flag config polling interval. flagConfigPollingIntervalMillis: config.flagConfigPollingIntervalMillis < minFlagPollerIntervalMillis @@ -161,11 +173,27 @@ export class ExperimentClient implements Client { this.config.flagsServerUrl, httpClient, ); - this.evaluationApi = new SdkEvaluationApi( + const evaluationApi = new SdkEvaluationApi( this.apiKey, this.config.serverUrl, httpClient, ); + this.variantUpdater = new VariantsFetchUpdater(evaluationApi); + if (config.streamVariants) { + const streamEvaluationApi = new SdkStreamEvaluationApi( + this.apiKey, + this.config.streamVariantsServerUrl, + defaultSseProvider, + streamConnectionTimeoutMillis, + evaluationApi, + ); + const streamUpdater = new VariantsStreamUpdater(streamEvaluationApi); + this.variantUpdater = new VariantsRetryAndFallbackWrapperUpdater( + streamUpdater, + this.variantUpdater, + streamRetryIntervalMillis, + ); + } // Storage & Caching let storage: Storage; const storageInstanceName = internalInstanceName @@ -233,6 +261,7 @@ export class ExperimentClient implements Client { * Stop the local flag configuration poller. */ public stop() { + this.variantUpdater.stop(); // Stop the variant updater, no waiting needed. if (!this.isRunning) { return; } @@ -267,26 +296,38 @@ export class ExperimentClient implements Client { user: ExperimentUser = this.user, options?: FetchOptions, ): Promise { - this.setUser(user || {}); - try { - await this.fetchInternal( - user, - this.config.fetchTimeoutMillis, - this.config.retryFetchOnFailure, - options, - ); - } catch (e) { - if (this.config.debug) { - if (e instanceof TimeoutError) { - console.debug(e); - } else { - console.error(e); - } - } + // Don't even try to fetch variants if API key is not set + if (!this.apiKey) { + throw Error('Experiment API key is empty'); } + this.setUser(user || {}); + user = await this.addContextOrWait(user); + user = this.cleanUserPropsForFetch(user); + await this.variantUpdater.start( + async (results: Record) => { + // On receiving variants update. + this.debug('[Experiment] Received variants update'); + await this.processVariants(results, options); + }, + (err) => { + console.error(err); + }, + { user, options, config: this.config }, + ); return this; } + private async processVariants( + flagKeyToVariant: Record, + options?: FetchOptions, + ): Promise { + const variants: Variants = {}; + Object.entries(flagKeyToVariant).forEach(([key, variant]) => { + variants[key] = convertEvaluationVariantToVariant(variant); + }); + await this.storeVariants(variants, options); + } + /** * Returns the variant for the provided key. * @@ -665,63 +706,12 @@ export class ExperimentClient implements Client { return defaultSourceVariant; } - private async fetchInternal( - user: ExperimentUser, - timeoutMillis: number, - retry: boolean, - options?: FetchOptions, - ): Promise { - // Don't even try to fetch variants if API key is not set - if (!this.apiKey) { - throw Error('Experiment API key is empty'); - } - - this.debug(`[Experiment] Fetch all: retry=${retry}`); - - // Proactively cancel retries if active in order to avoid unnecessary API - // requests. A new failure will restart the retries. - if (retry) { - this.stopRetries(); - } - - try { - const variants = await this.doFetch(user, timeoutMillis, options); - await this.storeVariants(variants, options); - return variants; - } catch (e) { - if (retry && this.shouldRetryFetch(e)) { - void this.startRetries(user, options); - } - throw e; - } - } - private cleanUserPropsForFetch(user: ExperimentUser): ExperimentUser { const cleanedUser = { ...user }; delete cleanedUser.cookie; return cleanedUser; } - private async doFetch( - user: ExperimentUser, - timeoutMillis: number, - options?: FetchOptions, - ): Promise { - user = await this.addContextOrWait(user); - user = this.cleanUserPropsForFetch(user); - this.debug('[Experiment] Fetch variants for user: ', user); - const results = await this.evaluationApi.getVariants(user, { - timeoutMillis: timeoutMillis, - ...options, - }); - const variants: Variants = {}; - for (const key of Object.keys(results)) { - variants[key] = convertEvaluationVariantToVariant(results[key]); - } - this.debug('[Experiment] Received variants: ', variants); - return variants; - } - private async doFlags(): Promise { try { let user: ExperimentUser; @@ -780,28 +770,6 @@ export class ExperimentClient implements Client { this.debug('[Experiment] Stored variants: ', variants); } - private async startRetries( - user: ExperimentUser, - options: FetchOptions, - ): Promise { - this.debug('[Experiment] Retry fetch'); - this.retriesBackoff = new Backoff( - fetchBackoffAttempts, - fetchBackoffMinMillis, - fetchBackoffMaxMillis, - fetchBackoffScalar, - ); - void this.retriesBackoff.start(async () => { - await this.fetchInternal(user, fetchBackoffTimeout, false, options); - }); - } - - private stopRetries(): void { - if (this.retriesBackoff) { - this.retriesBackoff.cancel(); - } - } - private addContext(user: ExperimentUser): ExperimentUser { const providedUser = this.userProvider?.getUser(); const integrationUser = this.integrationManager.getUser(); diff --git a/packages/experiment-browser/src/transport/stream.ts b/packages/experiment-browser/src/transport/stream.ts new file mode 100644 index 00000000..f2efcebc --- /dev/null +++ b/packages/experiment-browser/src/transport/stream.ts @@ -0,0 +1,15 @@ +/** + * @packageDocumentation + * @internal + */ + +import { SSE, SSEProviderParams } from '@amplitude/experiment-core'; +import EventSource from 'eventsource'; + +export const defaultSseProvider = ( + url: string, + params: SSEProviderParams, +): SSE => { + const es = new EventSource(url, params); + return es; +}; diff --git a/packages/experiment-browser/src/util/updaters.ts b/packages/experiment-browser/src/util/updaters.ts new file mode 100644 index 00000000..609bb29f --- /dev/null +++ b/packages/experiment-browser/src/util/updaters.ts @@ -0,0 +1,360 @@ +import { + EvaluationApi, + EvaluationVariant, + FetchError, + GetVariantsOptions, + StreamEvaluationApi, + TimeoutError, +} from '@amplitude/experiment-core'; + +import { ExperimentConfig } from '../config'; +import { FetchOptions } from '../types/client'; +import { ExperimentUser } from '../types/user'; + +import { Backoff } from './backoff'; + +export interface Updater { + start( + onUpdate: (data: unknown) => void, + onError: (err: Error) => void, + params?: Record, + ): Promise; + stop(): Promise; +} + +type VariantUpdateCallback = (data: Record) => void; +type VariantErrorCallback = (err: Error) => void; +type VariantUpdaterParams = { + user: ExperimentUser; + config: ExperimentConfig; + options: GetVariantsOptions; +}; +export interface VariantUpdater extends Updater { + start( + onUpdate: VariantUpdateCallback, + onError: VariantErrorCallback, + params: VariantUpdaterParams, + ): Promise; + stop(): Promise; +} + +function isErrorRetriable(e: Error | ErrorEvent): boolean { + if (e instanceof FetchError) { + const ferr = e as FetchError; + return ( + ferr.statusCode < 400 || ferr.statusCode >= 500 || ferr.statusCode === 429 + ); + } + + return true; +} + +/** + * This updater streams the variants from the server and calls the onUpdate callback with the results. + * This updater does not retry the stream if it fails, it will call the onError callback with the error. + * If it encounters a non-retriable error, all future starts are errored. + */ +export class VariantsStreamUpdater implements VariantUpdater { + private evaluationApi: StreamEvaluationApi; + private hasNonretriableError = false; + + constructor(evaluationApi: StreamEvaluationApi) { + this.evaluationApi = evaluationApi; + } + + async start( + onUpdate: VariantUpdateCallback, + onError: VariantErrorCallback, + params: VariantUpdaterParams, + ): Promise { + if (this.hasNonretriableError) { + throw new Error('Stream updater has non-retriable error, not starting'); + } + await this.stop(); + try { + await this.evaluationApi.streamVariants( + params.user, + params.options, + onUpdate, + async (error) => { + await this.handleError(error); + onError(error); + }, + ); + } catch (error) { + console.error( + '[Experiment] Stream updater failed to start: ' + + error + + error.statusCode, + ); + await this.handleError(error); + throw error; + } + } + + async handleError(error: Error): Promise { + await this.stop(); + if (!isErrorRetriable(error)) { + this.hasNonretriableError = true; + console.error( + '[Experiment] Stream updater has non-retriable error: ' + error, + ); + } + } + + async stop(): Promise { + await this.evaluationApi.close(); + } +} + +const fetchBackoffTimeout = 10000; +const fetchBackoffAttempts = 8; +const fetchBackoffMinMillis = 500; +const fetchBackoffMaxMillis = 10000; +const fetchBackoffScalar = 1.5; + +/** + * This updater fetches the variants from the server and calls the onUpdate callback with the results. + * This updater does not continuously poll the server, it only fetches the variants once. + * It will retry the fetch if it fails, with exponential backoff. + * The retry will stop if the fetch succeeds or if the max number of retries is reached. + * The retry will also stop if a non-retriable error is encountered. + * The retry will also stop if the user calls the stop method. + */ +export class VariantsFetchUpdater implements Updater { + private evaluationApi: EvaluationApi; + retriesBackoff: Backoff; + + constructor(evaluationApi: EvaluationApi) { + this.evaluationApi = evaluationApi; + } + + /** + * Perform a fetch to the server and call the onUpdate callback with the results. + * If the fetch fails, it will retry the fetch with exponential backoff. + * Always return success, even if the fetch fails. + * @param onUpdate This callback is called with the results of the fetch. + * @param onError This callback is ignored. + * @param params The params object contains the user, config, and options. + * @returns A promise that resolves when the first fetch finishes. Always resolves, even if the fetch fails. + */ + async start( + onUpdate: VariantUpdateCallback, + onError: VariantErrorCallback, + params: VariantUpdaterParams, + ): Promise { + const { user, config, options } = params; + + try { + if (config.retryFetchOnFailure) { + this.stopRetries(); + } + + try { + await this.fetchInternal( + user, + options, + config.fetchTimeoutMillis, + onUpdate, + ); + } catch (e) { + if (config.retryFetchOnFailure && isErrorRetriable(e)) { + void this.startRetries(user, options, onUpdate); + } + throw e; + } + } catch (e) { + if (config.debug) { + if (e instanceof TimeoutError) { + console.debug(e); + } else { + console.error(e); + } + } + } + } + + private async fetchInternal( + user: ExperimentUser, + options: FetchOptions, + timeoutMillis: number, + onUpdate: (data: Record) => void, + ): Promise { + const results = await this.evaluationApi.getVariants(user, { + timeoutMillis: timeoutMillis, + ...options, + }); + onUpdate(results); + } + + private async startRetries( + user: ExperimentUser, + options: FetchOptions, + onUpdate: (data: Record) => void, + ): Promise { + this.retriesBackoff = new Backoff( + fetchBackoffAttempts, + fetchBackoffMinMillis, + fetchBackoffMaxMillis, + fetchBackoffScalar, + ); + void this.retriesBackoff.start(async () => { + await this.fetchInternal(user, options, fetchBackoffTimeout, onUpdate); + }); + } + + private stopRetries(): void { + if (this.retriesBackoff) { + this.retriesBackoff.cancel(); + } + } + async stop(): Promise { + this.stopRetries(); + } +} + +/** + * This class retries the main updater and, if it fails, falls back to the fallback updater. + * The main updater will keep retrying every set interval and, if succeeded, the fallback updater will be stopped. + * If it has falled back to fallback updater, if the fallback updated failed to start, it will retry starting the fallback updater. + */ +export class RetryAndFallbackWrapperUpdater implements Updater { + private readonly mainUpdater: Updater; + private readonly fallbackUpdater: Updater; + private readonly retryIntervalMillisMin: number; + private readonly retryIntervalMillisRange: number; + private mainRetryTimer: NodeJS.Timeout | null = null; // To retry main updater after initial start. + private fallbackRetryTimer: NodeJS.Timeout | null = null; // To make sure fallback start is retried if failed to start when main updater failed. + + constructor( + mainUpdater: Updater, + fallbackUpdater: Updater, + retryIntervalMillis: number, + ) { + this.mainUpdater = mainUpdater; + this.fallbackUpdater = fallbackUpdater; + this.retryIntervalMillisMin = retryIntervalMillis * 0.8; + this.retryIntervalMillisRange = + retryIntervalMillis * 1.2 - this.retryIntervalMillisMin; + } + + /** + * If main start succeeded, return. + * If main start failed, start fallback updater. + * If fallback start failed, throw exception. + */ + async start( + onUpdate: (data: unknown) => void, + onError: (err: Error) => void, + params: Record, + ): Promise { + await this.stop(); + + try { + await this.mainUpdater.start( + onUpdate, + async (err) => { + this.fallbackUpdater + .start(onUpdate, onError, params) + .catch((error) => { + this.startFallbackRetryTimer(onUpdate, onError, params); + }); + this.startMainRetryTimer(onUpdate, onError, params); + }, + params, + ); + } catch (error) { + await this.fallbackUpdater.start(onUpdate, onError, params); + this.startMainRetryTimer(onUpdate, onError, params); + } + } + + startMainRetryTimer(onUpdate, onError, params) { + if (this.mainRetryTimer) { + clearTimeout(this.mainRetryTimer); + this.mainRetryTimer = null; + } + + const retryTimer = setTimeout(async () => { + try { + await this.mainUpdater.start( + onUpdate, + (err) => { + this.fallbackUpdater + .start(onUpdate, onError, params) + .catch((error) => { + this.startFallbackRetryTimer(onUpdate, onError, params); + }); + this.startMainRetryTimer(onUpdate, onError, params); + }, + params, + ); + } catch { + this.startMainRetryTimer(onUpdate, onError, params); + return; + } + if (this.fallbackRetryTimer) { + clearTimeout(this.fallbackRetryTimer); + this.fallbackRetryTimer = null; + } + this.fallbackUpdater.stop(); + }, Math.ceil(this.retryIntervalMillisMin + Math.random() * this.retryIntervalMillisRange)); + this.mainRetryTimer = retryTimer; + } + + startFallbackRetryTimer(onUpdate, onError, params) { + if (this.fallbackRetryTimer) { + clearTimeout(this.fallbackRetryTimer); + this.fallbackRetryTimer = null; + } + const retryTimer = setTimeout(async () => { + try { + await this.fallbackUpdater.start(onUpdate, onError, params); + } catch { + this.startFallbackRetryTimer(onUpdate, onError, params); + } + }, Math.ceil(this.retryIntervalMillisMin + Math.random() * this.retryIntervalMillisRange)); + this.fallbackRetryTimer = retryTimer; + } + + async stop(): Promise { + /* + * No locks needed for await and asyncs. + * If stop is called, the intervals are cancelled. Callbacks are not called. + * If the callback has already started, the updater.start in callback is scheduled before the updater.stop in this stop() func. + * So either no updater.start() is performed, or the updater.start() is scheduled before the updater.stop(). + */ + // Cancelling timers must be done before stopping the updaters. + if (this.mainRetryTimer) { + clearTimeout(this.mainRetryTimer); + this.mainRetryTimer = null; + } + if (this.fallbackRetryTimer) { + clearTimeout(this.fallbackRetryTimer); + this.fallbackRetryTimer = null; + } + await this.mainUpdater.stop(); + await this.fallbackUpdater.stop(); + } +} + +export class VariantsRetryAndFallbackWrapperUpdater + extends RetryAndFallbackWrapperUpdater + implements VariantUpdater +{ + constructor( + mainUpdater: VariantUpdater, + fallbackUpdater: VariantUpdater, + retryIntervalMillis: number, + ) { + super(mainUpdater, fallbackUpdater, retryIntervalMillis); + } + + async start( + onUpdate: VariantUpdateCallback, + onError: VariantErrorCallback, + params: VariantUpdaterParams, + ): Promise { + return super.start(onUpdate, onError, params); + } +} diff --git a/packages/experiment-browser/test/client.test.ts b/packages/experiment-browser/test/client.test.ts index c4f10617..e65265f8 100644 --- a/packages/experiment-browser/test/client.test.ts +++ b/packages/experiment-browser/test/client.test.ts @@ -1,5 +1,10 @@ import { AnalyticsConnector } from '@amplitude/analytics-connector'; -import { FetchError, safeGlobal } from '@amplitude/experiment-core'; +import { + EvaluationVariant, + FetchError, + safeGlobal, + SdkEvaluationApi, +} from '@amplitude/experiment-core'; import { Defaults } from 'src/config'; import { ExperimentEvent, IntegrationPlugin } from 'src/types/plugin'; @@ -18,6 +23,7 @@ import { } from '../src'; import { HttpClient, SimpleResponse } from '../src/types/transport'; import { randomString } from '../src/util/randomstring'; +import { VariantsFetchUpdater } from '../src/util/updaters'; import { mockClientStorage } from './util/mock'; @@ -1129,20 +1135,22 @@ describe('fetch retry with different response codes', () => { mockClientStorage(client); jest - .spyOn(ExperimentClient.prototype as any, 'doFetch') + .spyOn(SdkEvaluationApi.prototype as any, 'getVariants') .mockImplementation( async (user?: ExperimentUser, options?: FetchOptions) => { - return new Promise((resolve, reject) => { - if (responseCode === 0) { - reject(new Error(errorMessage)); - } else { - reject(new FetchError(responseCode, errorMessage)); - } - }); + return new Promise>( + (resolve, reject) => { + if (responseCode === 0) { + reject(new Error(errorMessage)); + } else { + reject(new FetchError(responseCode, errorMessage)); + } + }, + ); }, ); const retryMock = jest.spyOn( - ExperimentClient.prototype as any, + VariantsFetchUpdater.prototype as any, 'startRetries', ); try { @@ -1367,3 +1375,16 @@ describe('flag config polling interval config', () => { expect(client['config'].flagConfigPollingIntervalMillis).toEqual(900000); }); }); + +describe('client stream variants', () => { + test('stream variants, success', async () => { + const client = new ExperimentClient(API_KEY, { + streamVariants: true, + }); + mockClientStorage(client); + await client.fetch(testUser); + const variant = client.variant(serverKey); + expect(variant).toEqual(serverVariant); + client.stop(); + }); +}); diff --git a/packages/experiment-browser/test/stream.test.ts b/packages/experiment-browser/test/stream.test.ts new file mode 100644 index 00000000..4f8bfab8 --- /dev/null +++ b/packages/experiment-browser/test/stream.test.ts @@ -0,0 +1,145 @@ +/* eslint-disable no-empty */ +import path from 'path'; + +import { + EvaluationVariant, + GetVariantsOptions, + SdkEvaluationApi, + SdkStreamEvaluationApi, + StreamEvaluationApi, +} from '@amplitude/experiment-core'; +import * as dotenv from 'dotenv'; +import EventSource from 'eventsource'; + +import { Defaults } from '../src/config'; +import { FetchHttpClient, WrapperClient } from '../src/transport/http'; + +import { sleep } from './util/misc'; + +dotenv.config({ + path: path.join( + __dirname, + '../', + process.env['ENVIRONMENT'] ? '.env.' + process.env['ENVIRONMENT'] : '.env', + ), +}); + +if (!process.env['MANAGEMENT_API_KEY']) { + throw new Error( + 'No env vars found. If running on local, have you created .env file correct environment variables? Checkout README.md', + ); +} + +const SERVER_URL = process.env['SERVER_URL'] || Defaults.serverUrl; +const STREAM_SERVER_URL = + process.env['STREAM_SERVER_URL'] || Defaults.streamVariantsServerUrl; +const MANAGEMENT_API_SERVER_URL = + process.env['MANAGEMENT_API_SERVER_URL'] || + 'https://experiment.amplitude.com'; +const DEPLOYMENT_KEY = + process.env['DEPLOYMENT_KEY'] || 'client-DvWljIjiiuqLbyjqdvBaLFfEBrAvGuA3'; +const MANAGEMENT_API_KEY = process.env['MANAGEMENT_API_KEY']; +const FLAG_KEY = 'sdk-ci-stream-vardata-test'; + +const USER = {}; +const OPTIONS: GetVariantsOptions = {}; + +// Test stream is successfully connected and data is valid. +// The main purpose is to test and ensure the SDK stream interface works with stream server. +// This test may be flaky if multiple edits to the flag happens simultaneously, +// i.e. multiple invocation of this test is run at the same time. +// If two edits are made in a very very very short period (few seconds), the first edit may not be streamed. +jest.retryTimes(2); +test('SDK stream is compatible with stream server (flaky possible, see comments)', async () => { + const api: StreamEvaluationApi = new SdkStreamEvaluationApi( + DEPLOYMENT_KEY, + STREAM_SERVER_URL, + (url, params) => { + return new EventSource(url, params); + }, + 5000, // A bit more generous timeout than the default. + ); + + const streamVariants: Record[] = []; + let streamError = undefined; + const connectedPromise = new Promise((resolve, reject) => { + api + .streamVariants( + USER, + OPTIONS, + (variants: Record) => { + streamVariants.push(variants); + resolve(); + }, + (err) => { + streamError = err; + reject(err); + }, + ) + .catch((err) => { + reject(err); + }); + }); + await connectedPromise; + + // Get variant from the fetch api to compare. + const httpClient = FetchHttpClient; + const fetchApi = new SdkEvaluationApi( + DEPLOYMENT_KEY, + SERVER_URL, + new WrapperClient(httpClient), + ); + const fetchVariants = await fetchApi.getVariants(USER, OPTIONS); + + // At least one vardata streamed should be the same as the one fetched. + // There can be other updates after stream establishment and before fetch. + await sleep(5000); // Assume there's an update right before fetch but after stream, wait for stream to receive that data. + expect( + // Find the one that match using payload of our test flag. + streamVariants.filter( + (f) => f[FLAG_KEY]['payload'] === fetchVariants[FLAG_KEY]['payload'], + )[0], + ).toStrictEqual(fetchVariants); + + // Test that stream is kept alive. + await sleep(40000); + expect(streamError).toBeUndefined(); + + // Get flag id using management-api. + const getFlagIdRequest = await httpClient.request( + `${MANAGEMENT_API_SERVER_URL}/api/1/flags?key=${FLAG_KEY}`, + 'GET', + { + Authorization: 'Bearer ' + MANAGEMENT_API_KEY, + 'Content-Type': 'application/json', + Accept: '*/*', + }, + '', + ); + expect(getFlagIdRequest.status).toBe(200); + const flagId = JSON.parse(getFlagIdRequest.body)['flags'][0]['id']; + + // Call management api to edit deployment. Then wait for stream to update. + const randNumber = Math.random(); + const modifyFlagReq = await httpClient.request( + `${MANAGEMENT_API_SERVER_URL}/api/1/flags/${flagId}/variants/on`, + 'PATCH', + { + Authorization: 'Bearer ' + MANAGEMENT_API_KEY, + 'Content-Type': 'application/json', + Accept: '*/*', + }, + `{"payload": ${randNumber}}`, + 10000, + ); + expect(modifyFlagReq.status).toBe(200); + await sleep(5000); // 5s is generous enough for update to stream. + + // Check that at least one of the updates happened during this time have the random number we generated. + // This means that the stream is working and we are getting updates. + expect( + streamVariants.filter((f) => f[FLAG_KEY]['payload'] === randNumber).length, + ).toBeGreaterThanOrEqual(1); + + api.close(); +}, 60000); diff --git a/packages/experiment-browser/test/util/misc.ts b/packages/experiment-browser/test/util/misc.ts index 0076dd2b..25166c31 100644 --- a/packages/experiment-browser/test/util/misc.ts +++ b/packages/experiment-browser/test/util/misc.ts @@ -6,3 +6,9 @@ export const clearAllCookies = () => { document.cookie = `${cookieName}=; expires=Thu, 01 Jan 1970 00:00:00 GMT; path=/`; } }; + +export const sleep = async (millis: number): Promise => { + return new Promise((resolve) => { + setTimeout(resolve, millis); + }); +}; diff --git a/packages/experiment-browser/test/util/updaters.test.ts b/packages/experiment-browser/test/util/updaters.test.ts new file mode 100644 index 00000000..c53f5aca --- /dev/null +++ b/packages/experiment-browser/test/util/updaters.test.ts @@ -0,0 +1,441 @@ +import { + SdkEvaluationApi, + SdkStreamEvaluationApi, +} from '@amplitude/experiment-core'; +import { FetchError } from '@amplitude/experiment-core'; + +import { + RetryAndFallbackWrapperUpdater, + Updater, + VariantsFetchUpdater, + VariantsStreamUpdater, +} from '../..//src/util/updaters'; + +import { sleep } from './misc'; + +describe('VariantsStreamUpdater tests', () => { + test('connect success and receive data', async () => { + const mockStreamApi = { + streamVariants: jest.fn(), + close: jest.fn(), + }; + const updater = new VariantsStreamUpdater(mockStreamApi); + const onUpdate = jest.fn(); + await updater.start(onUpdate, jest.fn(), { + user: {}, + config: {}, + options: {}, + }); + + expect(onUpdate).toHaveBeenCalledTimes(0); + await mockStreamApi.streamVariants.mock.lastCall[2]({ + 'test-flag': {}, + }); + expect(onUpdate).toHaveBeenCalledTimes(1); + await mockStreamApi.streamVariants.mock.lastCall[2]({ + 'test-flag': {}, + }); + expect(onUpdate).toHaveBeenCalledTimes(2); + }); + + test('connect error throws', async () => { + const mockStreamApi = { + streamVariants: jest + .fn() + .mockRejectedValue(new FetchError(413, 'Payload too large')), + close: jest.fn(), + } as unknown as SdkStreamEvaluationApi; + const updater = new VariantsStreamUpdater(mockStreamApi); + const onUpdate = jest.fn(); + await expect( + updater.start(onUpdate, jest.fn(), { + user: {}, + config: {}, + options: {}, + }), + ).rejects.toThrow('Payload too large'); + expect(onUpdate).toHaveBeenCalledTimes(0); + expect(updater['hasNonretriableError']).toBe(true); + }); + + test('connect success then stream error', async () => { + let registeredOnUpdate; + let registeredOnError; + const mockStreamApi = { + streamVariants: jest.fn( + async ( + _: unknown, + __: unknown, + onUpdate: (data) => void, + onError: (error) => void, + ) => { + registeredOnUpdate = onUpdate; + registeredOnError = onError; + }, + ), + close: jest.fn(), + }; + const updater = new VariantsStreamUpdater(mockStreamApi); + const onUpdate = jest.fn(); + const onError = jest.fn(); + await updater.start(onUpdate, onError, { + user: {}, + config: {}, + options: {}, + }); + + mockStreamApi.streamVariants.mock.lastCall[2]({ 'test-flag': {} }); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(mockStreamApi.close).toHaveBeenCalledTimes(1); // Close called once on start. + await mockStreamApi.streamVariants.mock.lastCall[3]( + new Error('Stream error'), + ); + expect(onError).toHaveBeenCalledTimes(1); + expect(mockStreamApi.close).toHaveBeenCalledTimes(2); // Close called again on error. + expect(updater['hasNonretriableError']).toBe(false); + }); +}); + +describe('VariantsFetchUpdater tests', () => { + test('fetches variant', async () => { + const mockEvalApi = { + getVariants: jest.fn(async () => { + return { + 'test-flag': {}, + }; + }), + }; + const updater = new VariantsFetchUpdater(mockEvalApi); + const onUpdate = jest.fn(); + const onError = jest.fn(); + await updater.start(onUpdate, onError, { + user: {}, + config: {}, + options: {}, + }); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith({ + 'test-flag': {}, + }); + await updater.start(onUpdate, onError, { + user: {}, + config: {}, + options: {}, + }); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledWith({ + 'test-flag': {}, + }); + expect(onError).toHaveBeenCalledTimes(0); + }); + + test('first fetch failed retriable would retry but does not throw', async () => { + const mockEvalApi = { + getVariants: jest + .fn() + .mockRejectedValueOnce(new FetchError(500, 'Internal Server Error')) + .mockResolvedValueOnce({ + 'test-flag': {}, + }), + }; + const updater = new VariantsFetchUpdater(mockEvalApi); + const onUpdate = jest.fn(); + const onError = jest.fn(); + await updater.start(onUpdate, onError, { + user: {}, + config: { retryFetchOnFailure: true }, + options: {}, + }); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledTimes(0); + await sleep(750); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith({ + 'test-flag': {}, + }); + expect(onError).toHaveBeenCalledTimes(0); + }); + + test('first fetch failed nonretriable would not retry and does not throw', async () => { + const mockEvalApi = { + getVariants: jest + .fn() + .mockRejectedValue(new FetchError(413, 'Payload too large')), + }; + const updater = new VariantsFetchUpdater(mockEvalApi); + const onUpdate = jest.fn(); + const onError = jest.fn(); + await updater.start(onUpdate, onError, { + user: {}, + config: { retryFetchOnFailure: true }, + options: {}, + }); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledTimes(0); + await sleep(2000); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledTimes(0); + expect(onError).toHaveBeenCalledTimes(0); + }); + + test('all fetches fails does nothing', async () => { + const mockEvalApi = { + getVariants: jest + .fn() + .mockRejectedValue(new FetchError(500, 'Internal Server Error')), + }; + const updater = new VariantsFetchUpdater(mockEvalApi); + const onUpdate = jest.fn(); + const onError = jest.fn(); + await updater.start(onUpdate, onError, { + user: {}, + config: { retryFetchOnFailure: true }, + options: {}, + }); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledTimes(0); + await sleep(20000); + expect(mockEvalApi.getVariants).toHaveBeenCalledTimes(8); + expect(onUpdate).toHaveBeenCalledTimes(0); + expect(onError).toHaveBeenCalledTimes(0); + }, 30000); +}); + +describe('RetryAndFallbackWrapperUpdater tests', () => { + test('main start success, no fallback start, wrapper start success', async () => { + const mainUpdater = { + start: jest.fn(), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest.fn(), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater as Updater, + fallbackUpdater as Updater, + 2000, + ); + const onUpdate = jest.fn(); + await wrapperUpdater.start(onUpdate, jest.fn(), {}); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(0); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('asdf'); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith('asdf'); + // Reset stop function mocks to reset called times. + mainUpdater.stop = jest.fn(); + fallbackUpdater.stop = jest.fn(); + await wrapperUpdater.stop(); + expect(mainUpdater.stop).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); + }); + + test('main start failed, fallback start success, wrapper start success', async () => { + const mainUpdater = { + start: jest.fn().mockRejectedValue(new Error('Main updater error')), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest.fn(), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater as Updater, + fallbackUpdater as Updater, + 2000, + ); + const onUpdate = jest.fn(); + await wrapperUpdater.start(onUpdate, jest.fn(), {}); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Verify data flows. + fallbackUpdater.start.mock.lastCall[0]('asdf'); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith('asdf'); + // Reset stop function mocks to reset called times. + mainUpdater.stop = jest.fn(); + fallbackUpdater.stop = jest.fn(); + await wrapperUpdater.stop(); + expect(mainUpdater.stop).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); + }); + + test('main start failed, fallback start failed, wrapper start fail', async () => { + const mainUpdater = { + start: jest.fn().mockRejectedValue(new Error('Main updater error')), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest.fn().mockRejectedValue(new Error('Fallback updater error')), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater, + fallbackUpdater, + 2000, + ); + await expect( + wrapperUpdater.start(jest.fn(), jest.fn(), {}), + ).rejects.toThrow('Fallback updater error'); + }); + + test('main start success, then failed, fallback starts, main retry success, fallback stopped', async () => { + const mainUpdater = { + start: jest.fn(), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest.fn().mockResolvedValue(undefined), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater, + fallbackUpdater, + 4000, + ); + const onUpdate = jest.fn(); + await wrapperUpdater.start(onUpdate, jest.fn(), {}); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(0); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('asdf'); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith('asdf'); + // Signal main updater to fail. + mainUpdater.start.mock.lastCall[1](new Error('Main updater error')); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Verify data flows. + fallbackUpdater.start.mock.lastCall[0]('fallback data'); + expect(onUpdate).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledWith('fallback data'); + // Wait for retry. + await sleep(5000); + expect(mainUpdater.start).toHaveBeenCalledTimes(2); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('main data'); + expect(onUpdate).toHaveBeenCalledTimes(3); + expect(onUpdate).toHaveBeenCalledWith('main data'); + // Reset stop function mocks to reset called times. + mainUpdater.stop = jest.fn(); + fallbackUpdater.stop = jest.fn(); + await wrapperUpdater.stop(); + expect(mainUpdater.stop).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); + }); + + test('main start success, then failed, fallback start failed, fallback retry success, main retry success, fallback stopped', async () => { + const mainUpdater = { + start: jest + .fn() + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(undefined) + .mockResolvedValueOnce(undefined), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest + .fn() + .mockRejectedValueOnce(new Error('Fallback start error')) + .mockResolvedValueOnce(undefined), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater, + fallbackUpdater, + 4000, + ); + const onUpdate = jest.fn(); + await wrapperUpdater.start(onUpdate, jest.fn(), {}); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(0); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('asdf'); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith('asdf'); + // Signal main updater to fail. + mainUpdater.start.mock.lastCall[1](new Error('Main updater error')); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Wait for main and fallback retry. + await sleep(5000); + expect(mainUpdater.start).toHaveBeenCalledTimes(2); // Main still failed to start. + expect(fallbackUpdater.start).toHaveBeenCalledTimes(2); // Fallback would start success. + // Verify data flows through fallback. + fallbackUpdater.start.mock.lastCall[0]('fallback data'); + expect(onUpdate).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledWith('fallback data'); + // Wait for main retry. + fallbackUpdater.stop = jest.fn(); // Reset fallback stop counter. + await sleep(5000); + expect(mainUpdater.start).toHaveBeenCalledTimes(3); // Main success. + expect(fallbackUpdater.start).toHaveBeenCalledTimes(2); // No more fallback retry. + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); // Verify fallback stopped. + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('main data'); + expect(onUpdate).toHaveBeenCalledTimes(3); + expect(onUpdate).toHaveBeenCalledWith('main data'); + // Reset stop function mocks to reset called times. + mainUpdater.stop = jest.fn(); + fallbackUpdater.stop = jest.fn(); + await wrapperUpdater.stop(); + expect(mainUpdater.stop).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); + }, 15000); + + test('main start success, then failed, fallback start failed, main retry success, fallback stopped retrying', async () => { + const mainUpdater = { + start: jest.fn(), + stop: jest.fn(), + }; + const fallbackUpdater = { + start: jest.fn(async () => { + await sleep(2500); + throw new Error('Fallback start error'); + }), + stop: jest.fn(), + }; + const wrapperUpdater = new RetryAndFallbackWrapperUpdater( + mainUpdater, + fallbackUpdater, + 4000, + ); + const onUpdate = jest.fn(); + await wrapperUpdater.start(onUpdate, jest.fn(), {}); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(0); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('asdf'); + expect(onUpdate).toHaveBeenCalledTimes(1); + expect(onUpdate).toHaveBeenCalledWith('asdf'); + // Signal main updater to fail. + mainUpdater.start.mock.lastCall[1](new Error('Main updater error')); + expect(mainUpdater.start).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Ensure fallback updater failed and enters retry. + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Wait for main retry. + await sleep(5000); + expect(mainUpdater.start).toHaveBeenCalledTimes(2); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Verify data flows. + mainUpdater.start.mock.lastCall[0]('main data'); + expect(onUpdate).toHaveBeenCalledTimes(2); + expect(onUpdate).toHaveBeenCalledWith('main data'); + // Make sure fallback stopped retrying. + await sleep(5000); + expect(fallbackUpdater.start).toHaveBeenCalledTimes(1); + // Reset stop function mocks to reset called times. + mainUpdater.stop = jest.fn(); + fallbackUpdater.stop = jest.fn(); + await wrapperUpdater.stop(); + expect(mainUpdater.stop).toHaveBeenCalledTimes(1); + expect(fallbackUpdater.stop).toHaveBeenCalledTimes(1); + }, 15000); +}); diff --git a/packages/experiment-core/src/api/evaluation-stream-api.ts b/packages/experiment-core/src/api/evaluation-stream-api.ts new file mode 100644 index 00000000..92100b78 --- /dev/null +++ b/packages/experiment-core/src/api/evaluation-stream-api.ts @@ -0,0 +1,160 @@ +import { Base64 } from 'js-base64'; + +import { EvaluationVariant } from '../evaluation/flag'; +import { + DEFAULT_EVENT_TYPE, + SSEProvider, + SSEStream, +} from '../transport/stream'; + +import { EvaluationApi, GetVariantsOptions } from './evaluation-api'; + +const STREAM_CONNECTION_TIMEOUT_MILLIS = 3000; + +export interface StreamEvaluationApi { + streamVariants( + user: Record, + options?: GetVariantsOptions, + onUpdate?: (variants: Record) => void, + onError?: (error: Error) => void, + ): Promise; + close(): Promise; +} + +export class SdkStreamEvaluationApi implements StreamEvaluationApi { + private readonly deploymentKey: string; + private readonly serverUrl: string; + private readonly sseProvider: SSEProvider; + private readonly fetchEvalApi?: EvaluationApi; + private readonly connectionTimeoutMillis: number; + + private stream?: SSEStream; + + constructor( + deploymentKey: string, + serverUrl: string, + sseProvider: SSEProvider, + connectionTimeoutMillis = STREAM_CONNECTION_TIMEOUT_MILLIS, + fetchEvalApi?: EvaluationApi, + ) { + this.deploymentKey = deploymentKey; + this.serverUrl = serverUrl; + this.sseProvider = sseProvider; + this.connectionTimeoutMillis = connectionTimeoutMillis; + this.fetchEvalApi = fetchEvalApi; + } + + async streamVariants( + user: Record, + options?: GetVariantsOptions, + onUpdate?: (variants: Record) => void, + onError?: (error: Error) => void, + ): Promise { + if (this.stream) { + await this.close(); + } + + const userJsonBase64 = Base64.encodeURL(JSON.stringify(user)); + const headers: Record = { + Authorization: `Api-Key ${this.deploymentKey}`, + 'X-Amp-Exp-User': userJsonBase64, + }; + if (options?.flagKeys) { + headers['X-Amp-Exp-Flag-Keys'] = Base64.encodeURL( + JSON.stringify(options.flagKeys), + ); + } + if (options?.trackingOption) { + headers['X-Amp-Exp-Track'] = options.trackingOption; + } + + const url = new URL(`${this.serverUrl}/sdk/stream/v1/vardata`); + if (options?.evaluationMode) { + url.searchParams.append('eval_mode', options?.evaluationMode); + } + if (options?.deliveryMethod) { + url.searchParams.append('delivery_method', options?.deliveryMethod); + } + + return new Promise((resolve, reject) => { + this.stream = new SSEStream(this.sseProvider, url.toString(), headers); + let isConnecting = true; + + const connectionTimeout = setTimeout(() => { + if (isConnecting) { + this.close(); + reject(new Error('Connection timed out.')); + } + }, this.connectionTimeoutMillis); + + const onErrorSseCb = (error: Error) => { + if (isConnecting) { + isConnecting = false; + this.close(); + clearTimeout(connectionTimeout); + reject(error); + } else { + this.close(); + onError?.(error); + } + }; + const onDataUpdateSseCb = (data: string) => { + let variants; + try { + variants = JSON.parse(data); + } catch (error) { + onErrorSseCb(new Error('Error parsing variant data: ' + error)); + return; + } + if (isConnecting) { + isConnecting = false; + clearTimeout(connectionTimeout); + resolve(); + } + onUpdate?.(variants); + }; + // The following is to support receiving a signal and fetching the variants. + // This would be helpful if the variant data is too large to be pushed. + // const onSignalUpdateCb = async () => { + // // Signaled that there's a push. + // if (isConnecting) { + // isConnecting = false; + // clearTimeout(connectionTimeout); + // resolve(); + // } + // if (!this.fetchEvalApi) { + // onErrorSseCb( + // new Error( + // 'No fetchEvalApi provided for variant data that is too large to push.', + // ), + // ); + // return; + // } + + // let variants; + // try { + // variants = await this.fetchEvalApi.getVariants(user, options); + // } catch (error) { + // onErrorSseCb( + // new Error('Error fetching variants on signal: ' + error), + // ); + // } + // onUpdate?.(variants || {}); + // }; + + this.stream.connect( + { + push_data: onDataUpdateSseCb, + // push_signal: onSignalUpdateCb, + [DEFAULT_EVENT_TYPE]: onDataUpdateSseCb, + }, + onErrorSseCb, + ); + }); + } + + async close(): Promise { + this.stream?.close(); + this.stream = undefined; + } +} diff --git a/packages/experiment-core/src/index.ts b/packages/experiment-core/src/index.ts index d5f1bb83..675e2e06 100644 --- a/packages/experiment-core/src/index.ts +++ b/packages/experiment-core/src/index.ts @@ -17,6 +17,16 @@ export { } from './api/evaluation-api'; export { FlagApi, SdkFlagApi, GetFlagsOptions } from './api/flag-api'; export { HttpClient, HttpRequest, HttpResponse } from './transport/http'; +export { + StreamEvaluationApi, + SdkStreamEvaluationApi, +} from './api/evaluation-stream-api'; +export { + SSE, + SSEStream, + SSEProvider, + SSEProviderParams, +} from './transport/stream'; export { Poller } from './util/poller'; export { safeGlobal, diff --git a/packages/experiment-core/src/transport/stream.ts b/packages/experiment-core/src/transport/stream.ts new file mode 100644 index 00000000..a58d0117 --- /dev/null +++ b/packages/experiment-core/src/transport/stream.ts @@ -0,0 +1,175 @@ +import { FetchError } from '../evaluation/error'; + +const KEEP_ALIVE_DATA = ' '; +const KEEP_ALIVE_INTERVAL_MILLIS = 33000; // 30 seconds with a 3 seconds buffer +const RECONNECTION_INTERVAL_MILLIS = 30 * 60 * 1000; +export const DEFAULT_EVENT_TYPE = 'message'; + +type ErrorEvent = { + status?: number; + message: string; +}; + +export interface SSE { + addEventListener( + event: string, + callback: (event: MessageEvent | ErrorEvent) => void, + ): void; + close(): void; +} +export type SSEProviderParams = { + headers: Record; +}; +export type SSEProvider = (url: string, params: SSEProviderParams) => SSE; + +export class SSEStream { + streamProvider: SSEProvider; + url: string; + headers: Record; + + es?: SSE; + onEventTypeUpdate: Record void> = {}; + onError?: (error: Error) => void; + keepAliveInterval: number; + keepAliveTimer?: NodeJS.Timeout; + reconnectionIntervalMillisMin: number; + reconnectionIntervalMillisRange: number; + reconnectionTimeout?: NodeJS.Timeout; + + constructor( + streamProvider: SSEProvider, + url: string, + headers: Record, + keepAliveInterval = KEEP_ALIVE_INTERVAL_MILLIS, + reconnectionIntervalMillis = RECONNECTION_INTERVAL_MILLIS, + ) { + this.streamProvider = streamProvider; + this.url = url; + this.headers = headers; + this.keepAliveInterval = keepAliveInterval; + // Make the jitter range to be 0.9 to 1.1 of the reconnection interval. + this.reconnectionIntervalMillisMin = Math.floor( + reconnectionIntervalMillis * 0.9, + ); + this.reconnectionIntervalMillisRange = + Math.ceil(reconnectionIntervalMillis * 1.1) - + this.reconnectionIntervalMillisMin; + } + + /** + * Connect to the stream and listen for updates. + * Autonmatically handles keep alive packets and reconnection after a long period of time. + * Whenever there's an error, the stream is closed, then onError callback is called. + * @param onUpdate If onUpdate is provided, it will be called with the data received from the stream with DEFAULT_EVENT_TYPE. + * @param onEventTypeUpdate A mapping from event type to callbacks. Routes the event data for different event types received from the stream to different callbacks. + * @param onError If onError is provided, it will be called with the error received from the stream. + */ + connect( + onUpdate: (data: string) => void, + onError?: (error: Error) => void, + ): void; + connect( + onEventTypeUpdate: Record void>, + onError?: (error: Error) => void, + ): void; + connect( + onUpdateCb: + | ((data: string) => void) + | Record void>, + onError?: (error: Error) => void, + ): void { + if (typeof onUpdateCb === 'function') { + // onUpdate: (data: string) => void, + this.onEventTypeUpdate = { [DEFAULT_EVENT_TYPE]: onUpdateCb }; + } else { + // onEventTypeUpdate: Record void>, + this.onEventTypeUpdate = onUpdateCb as Record< + string, + (data: string) => void + >; + if (!(DEFAULT_EVENT_TYPE in this.onEventTypeUpdate)) { + // Ensure there's always a default to receive keep alive data. + // eslint-disable-next-line @typescript-eslint/no-empty-function + this.onEventTypeUpdate[DEFAULT_EVENT_TYPE] = () => {}; + } + } + this.onError = onError; + if (this.es) { + this.close(); + } + + this.es = this.streamProvider(this.url, { headers: this.headers }); + for (const eventType in this.onEventTypeUpdate) { + this.es.addEventListener(eventType, (event) => { + this.resetKeepAlive(); + const msgEvent = event as MessageEvent; + if (msgEvent.data === KEEP_ALIVE_DATA) { + // This is a keep-alive message, ignore it + return; + } + try { + this.onEventTypeUpdate[eventType](msgEvent.data); + } catch { + // Don't care about errors in the callback. + } + }); + } + this.es.addEventListener('error', (err) => { + this.close(); + const error = err as ErrorEvent; + const newError = error.status + ? new FetchError(error.status, error.message) + : new Error(`Error in stream: ${JSON.stringify(error)}`); + try { + this.onError?.(newError); + } catch { + // Don't care about errors in the callback. + } + }); + this.resetKeepAlive(); + this.setReconnectionTimeout(); + } + + resetKeepAlive(): void { + if (this.keepAliveTimer) { + clearTimeout(this.keepAliveTimer); + this.keepAliveTimer = undefined; + } + if (this.es) { + this.keepAliveTimer = setTimeout(() => { + this.close(); + this.onError?.(Error('Keep-alive timeout')); + }, this.keepAliveInterval * 1.1); + } + } + + setReconnectionTimeout(): void { + if (this.reconnectionTimeout) { + clearTimeout(this.reconnectionTimeout); + this.reconnectionTimeout = undefined; + } + if (this.es) { + if ( + this.reconnectionIntervalMillisMin > 0 && + this.reconnectionIntervalMillisRange > 0 + ) { + this.reconnectionTimeout = setTimeout(() => { + this.connect(this.onEventTypeUpdate, this.onError); + }, Math.ceil(Math.random() * this.reconnectionIntervalMillisRange + this.reconnectionIntervalMillisMin)); + } + } + } + + close(): void { + if (this.keepAliveTimer) { + clearTimeout(this.keepAliveTimer); + this.keepAliveTimer = undefined; + } + if (this.reconnectionTimeout) { + clearTimeout(this.reconnectionTimeout); + this.reconnectionTimeout = undefined; + } + this.es?.close(); + this.es = undefined; + } +} diff --git a/packages/experiment-core/test/api/evaluation-stream-api.test.ts b/packages/experiment-core/test/api/evaluation-stream-api.test.ts new file mode 100644 index 00000000..fff24f77 --- /dev/null +++ b/packages/experiment-core/test/api/evaluation-stream-api.test.ts @@ -0,0 +1,163 @@ +import { FetchError, SdkStreamEvaluationApi } from '../../src'; +import { sleep } from '../utils'; + +const VARDATA = { + 'peter-test-stream': { + key: 'treatment', + metadata: { + experimentKey: 'exp-1', + }, + value: 'treatment', + }, +}; +describe('EvaluationStreamApi tests', () => { + let registeredListeners: Record void>; + let mockStreamProvider: jest.Mock; + let api: SdkStreamEvaluationApi; + let mockOnDataUpdate: jest.Mock; + let mockOnError: jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + registeredListeners = {}; + mockStreamProvider = jest.fn((url, param) => { + expect(url).toBe('https://url/sdk/stream/v1/vardata'); + expect(param.headers['Authorization']).toBe('Api-Key apikey'); + return { + addEventListener: jest.fn( + (eventType: string, cb: (data: unknown) => void) => { + registeredListeners[eventType] = cb; + }, + ), + close: jest.fn(), + }; + }); + + api = new SdkStreamEvaluationApi( + 'apikey', + 'https://url', + mockStreamProvider, + 2000, + ); + mockOnDataUpdate = jest.fn(); + mockOnError = jest.fn(); + }); + + test('connect and receive data and parses', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + registeredListeners['message']({ + data: JSON.stringify(VARDATA), + }); + await connectPromise; + expect(mockOnDataUpdate).toHaveBeenCalledTimes(1); + expect(mockOnDataUpdate).toHaveBeenCalledWith(VARDATA); + expect(mockOnError).not.toHaveBeenCalled(); + }); + test('connect http error', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + registeredListeners['error']({ + status: 500, + message: 'Internal Server Error', + }); + await expect(connectPromise).rejects.toThrow( + new FetchError(500, 'Internal Server Error'), + ); + expect(mockOnDataUpdate).not.toHaveBeenCalled(); + expect(mockOnError).toHaveBeenCalledTimes(0); + }); + test('connect parsing error', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + registeredListeners['message']({ + data: 'not json', + }); + await expect(connectPromise).rejects.toThrow(); + expect(mockOnDataUpdate).not.toHaveBeenCalled(); + expect(mockOnError).not.toHaveBeenCalled(); + }); + test('connect timeout', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await Promise.race([ + expect(connectPromise).rejects.toThrow(), + sleep(2100).then(() => { + throw new Error('Timeout'); + }), + ]); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + expect(mockOnDataUpdate).not.toHaveBeenCalled(); + expect(mockOnError).toHaveBeenCalledTimes(0); + }); + test('stream error stops api', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + registeredListeners['message']({ + data: JSON.stringify(VARDATA), + }); + await connectPromise; + expect(mockOnDataUpdate).toHaveBeenCalledTimes(1); + expect(mockOnDataUpdate).toHaveBeenCalledWith(VARDATA); + expect(mockOnError).not.toHaveBeenCalled(); + + registeredListeners['error']({ + message: 'Socket closed', + }); + expect(mockOnError).toHaveBeenCalledTimes(1); + expect(mockOnError).toHaveBeenCalledWith( + new Error('Error in stream: {"message":"Socket closed"}'), + ); + }); + test('stream error stops api', async () => { + const connectPromise = api.streamVariants( + {}, + undefined, + mockOnDataUpdate, + mockOnError, + ); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + registeredListeners['message']({ + data: JSON.stringify(VARDATA), + }); + await connectPromise; + expect(mockOnDataUpdate).toHaveBeenCalledTimes(1); + expect(mockOnDataUpdate).toHaveBeenCalledWith(VARDATA); + expect(mockOnError).not.toHaveBeenCalled(); + + registeredListeners['message']({ + data: 'not json', + }); + expect(mockOnDataUpdate).toHaveBeenCalledTimes(1); + expect(mockOnError).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/experiment-core/test/transport/stream.test.ts b/packages/experiment-core/test/transport/stream.test.ts new file mode 100644 index 00000000..4a074289 --- /dev/null +++ b/packages/experiment-core/test/transport/stream.test.ts @@ -0,0 +1,149 @@ +import { SSEStream } from '../../src/transport/stream'; +import { sleep } from '../utils'; + +describe('SSEStream', () => { + let addEventListener: jest.Mock; + let close: jest.Mock; + let mockStreamProvider: jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + addEventListener = jest.fn(); + close = jest.fn(); + mockStreamProvider = jest.fn(() => ({ + addEventListener, + close, + })); + }); + + test('should connect and receive data, keeps alive, reconnects', async () => { + const stream = new SSEStream( + mockStreamProvider, + 'http://localhost:7999', + { + header1: 'value1', + }, + 4000, + 7000, + ); + const mockOnDataUpdate = jest.fn(); + const mockOnError = jest.fn(); + + // Test new connection makes a call to the stream provider. + stream.connect(mockOnDataUpdate, mockOnError); + expect(mockStreamProvider).toHaveBeenCalledWith('http://localhost:7999', { + headers: { + header1: 'value1', + }, + }); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + + // Test that the event listener is set up correctly. + addEventListener.mock.calls[0][1]({ + type: 'message', + data: 'apple', + }); + expect(addEventListener.mock.calls[0][0]).toBe('message'); + expect(mockOnDataUpdate).toHaveBeenCalledWith('apple'); + expect(addEventListener.mock.calls[1][0]).toBe('error'); + expect(mockOnError).not.toHaveBeenCalled(); + + // Test keepalive avoids new connection. + await sleep(4000); + addEventListener.mock.calls[0][1]({ + type: 'message', + data: ' ', + }); + await sleep(1000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + + // Test keepalive data is not passed to the callback. + expect(mockOnDataUpdate).not.toHaveBeenCalledWith(' '); + expect(mockOnError).not.toHaveBeenCalled(); + + // Test reconnection reconnects and receives data correctly. + await sleep(Math.ceil(4000)); + expect(mockStreamProvider).toHaveBeenCalledTimes(2); + addEventListener.mock.calls[0][1]({ + type: 'message', + data: 'banana', + }); + expect(addEventListener.mock.calls[2][0]).toBe('message'); + expect(mockOnDataUpdate).toHaveBeenCalledWith('banana'); + expect(addEventListener.mock.calls[3][0]).toBe('error'); + expect(mockOnError).not.toHaveBeenCalled(); + }, 12000); + + test('able to subscribe for multiple event types', async () => { + const stream = new SSEStream(mockStreamProvider, 'url', {}); + const mockOnData1Update = jest.fn(); + const mockOnData2Update = jest.fn(); + const mockOnError = jest.fn(); + + // Test new connection makes a call to the stream provider. + stream.connect( + { channel1: mockOnData1Update, channel2: mockOnData2Update }, + mockOnError, + ); + expect(mockStreamProvider).toHaveBeenCalledWith('url', { headers: {} }); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + + // Test that the event listener is set up correctly. + const registeredListeners = addEventListener.mock.calls.reduce( + (acc, curr) => { + acc[curr[0]] = curr[1]; + return acc; + }, + {}, + ); + expect(registeredListeners['channel1']).toBeDefined(); + expect(registeredListeners['channel2']).toBeDefined(); + expect(registeredListeners['message']).toBeDefined(); // default is always added. + expect(registeredListeners['error']).toBeDefined(); + + // Send data for each channel. + registeredListeners['channel1']({ + type: 'channel1', + data: 'apple', + }); + expect(mockOnData1Update).toBeCalledTimes(1); + expect(mockOnData2Update).not.toHaveBeenCalled(); + expect(mockOnError).not.toHaveBeenCalled(); + expect(mockOnData1Update).toHaveBeenCalledWith('apple'); + registeredListeners['channel2']({ + type: 'channel2', + data: 'banana', + }); + expect(mockOnData1Update).toBeCalledTimes(1); + expect(mockOnData2Update).toBeCalledTimes(1); + expect(mockOnError).not.toHaveBeenCalled(); + expect(mockOnData2Update).toHaveBeenCalledWith('banana'); + }); + + test('keepalive failure causes error', async () => { + const stream = new SSEStream(mockStreamProvider, 'url', {}, 3000); + const mockOnDataUpdate = jest.fn(); + const mockOnError = jest.fn(); + stream.connect(mockOnDataUpdate, mockOnError); + await sleep(4000); + expect(mockStreamProvider).toHaveBeenCalledTimes(1); + expect(mockOnDataUpdate).not.toHaveBeenCalled(); + expect(mockOnError).toHaveBeenCalledTimes(1); + expect(mockOnError).toHaveBeenCalledWith(new Error('Keep-alive timeout')); + expect(close).toHaveBeenCalled(); + }); + + test('error event causes error callback', async () => { + const stream = new SSEStream(mockStreamProvider, 'url', {}); + const mockOnDataUpdate = jest.fn(); + const mockOnError = jest.fn(); + stream.connect(mockOnDataUpdate, mockOnError); + addEventListener.mock.calls[1][1]({ + message: 'error', + }); + expect(mockOnDataUpdate).not.toHaveBeenCalled(); + expect(mockOnError).toHaveBeenCalledWith( + new Error('Error in stream: {"message":"error"}'), + ); + }); +}); diff --git a/packages/experiment-core/test/utils.ts b/packages/experiment-core/test/utils.ts new file mode 100644 index 00000000..3ac327ce --- /dev/null +++ b/packages/experiment-core/test/utils.ts @@ -0,0 +1,4 @@ +export const sleep = async (millis: number) => + new Promise((r) => { + setTimeout(r, millis); + }); diff --git a/packages/experiment-tag/src/experiment.ts b/packages/experiment-tag/src/experiment.ts index 88501892..32048b4b 100644 --- a/packages/experiment-tag/src/experiment.ts +++ b/packages/experiment-tag/src/experiment.ts @@ -31,7 +31,7 @@ import { urlWithoutParamsAndAnchor, UUID, concatenateQueryParamsOf, -} from './util'; +} from './utils'; import { WebExperimentClient } from './web-experiment'; export const PAGE_NOT_TARGETED_SEGMENT_NAME = 'Page not targeted'; diff --git a/packages/experiment-tag/src/util.ts b/packages/experiment-tag/src/utils.ts similarity index 100% rename from packages/experiment-tag/src/util.ts rename to packages/experiment-tag/src/utils.ts diff --git a/packages/experiment-tag/test/experiment.test.ts b/packages/experiment-tag/test/experiment.test.ts index 495cd40b..14152fa9 100644 --- a/packages/experiment-tag/test/experiment.test.ts +++ b/packages/experiment-tag/test/experiment.test.ts @@ -7,7 +7,7 @@ import { PAGE_NOT_TARGETED_SEGMENT_NAME, DefaultWebExperimentClient, } from 'src/experiment'; -import * as util from 'src/util'; +import * as utils from 'src/utils'; import { stringify } from 'ts-jest'; import { @@ -65,7 +65,7 @@ describe('initializeExperiment', () => { jest.spyOn(ExperimentClient.prototype, 'setUser'); jest.spyOn(ExperimentClient.prototype, 'all'); const mockExposure = jest.spyOn(ExperimentClient.prototype, 'exposure'); - jest.spyOn(util, 'UUID').mockReturnValue('mock'); + jest.spyOn(utils, 'UUID').mockReturnValue('mock'); let mockGlobal; let antiFlickerSpy; @@ -769,7 +769,7 @@ describe('helper methods', () => { // @ts-ignore mockGetGlobalScope.mockReturnValue(mockGlobal); apiKey++; - jest.spyOn(util, 'UUID').mockReturnValue('mock'); + jest.spyOn(utils, 'UUID').mockReturnValue('mock'); jest.clearAllMocks(); }); diff --git a/packages/experiment-tag/test/util.test.ts b/packages/experiment-tag/test/utils.test.ts similarity index 99% rename from packages/experiment-tag/test/util.test.ts rename to packages/experiment-tag/test/utils.test.ts index d1271d98..8882926c 100644 --- a/packages/experiment-tag/test/util.test.ts +++ b/packages/experiment-tag/test/utils.test.ts @@ -4,7 +4,7 @@ import { getUrlParams, matchesUrl, urlWithoutParamsAndAnchor, -} from 'src/util'; +} from 'src/utils'; // Mock the getGlobalScope function const spyGetGlobalScope = jest.spyOn(coreUtil, 'getGlobalScope'); diff --git a/yarn.lock b/yarn.lock index 368d5ae4..888eeac8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -48,12 +48,13 @@ "@babel/highlight" "^7.22.5" "@babel/code-frame@^7.10.4": - version "7.22.13" - resolved "https://registry.yarnpkg.com/@babel/code-frame/-/code-frame-7.22.13.tgz#e3c1c099402598483b7a8c46a721d1038803755e" - integrity sha512-XktuhWlJ5g+3TJXc5upd9Ks1HutSArik6jf2eAjYFyIOf4ej3RN+184cZbzDvbPnuTJIUhPKKJE3cIsYTiAT3w== + version "7.26.2" + resolved "https://registry.yarnpkg.com/@babel/code-frame/-/code-frame-7.26.2.tgz#4b5fab97d33338eff916235055f0ebc21e573a85" + integrity sha512-RJlIHRueQgwWitWgF8OdFYGZX328Ax5BCemNGlqHfplnRT9ESi8JkFlvaVYbS+UubVY6dpv87Fs2u5M29iNFVQ== dependencies: - "@babel/highlight" "^7.22.13" - chalk "^2.4.2" + "@babel/helper-validator-identifier" "^7.25.9" + js-tokens "^4.0.0" + picocolors "^1.0.0" "@babel/compat-data@^7.22.5", "@babel/compat-data@^7.22.6", "@babel/compat-data@^7.22.9": version "7.22.9" @@ -257,6 +258,11 @@ resolved "https://registry.yarnpkg.com/@babel/helper-validator-identifier/-/helper-validator-identifier-7.22.5.tgz#9544ef6a33999343c8740fa51350f30eeaaaf193" integrity sha512-aJXu+6lErq8ltp+JhkJUfk1MTGyuA4v7f3pA+BJ5HLfNC6nAQ0Cpi9uOquUj8Hehg0aUiHzWQbOVJGao6ztBAQ== +"@babel/helper-validator-identifier@^7.25.9": + version "7.25.9" + resolved "https://registry.yarnpkg.com/@babel/helper-validator-identifier/-/helper-validator-identifier-7.25.9.tgz#24b64e2c3ec7cd3b3c547729b8d16871f22cbdc7" + integrity sha512-Ed61U6XJc3CVRfkERJWDz4dJwKe7iLmmJsbOGu9wSloNSFttHV0I8g6UAgb7qnK5ly5bGLPd4oXZlxCdANBOWQ== + "@babel/helper-validator-option@^7.22.5": version "7.22.5" resolved "https://registry.yarnpkg.com/@babel/helper-validator-option/-/helper-validator-option-7.22.5.tgz#de52000a15a177413c8234fa3a8af4ee8102d0ac" @@ -289,15 +295,6 @@ chalk "^2.0.0" js-tokens "^4.0.0" -"@babel/highlight@^7.22.13": - version "7.22.13" - resolved "https://registry.yarnpkg.com/@babel/highlight/-/highlight-7.22.13.tgz#9cda839e5d3be9ca9e8c26b6dd69e7548f0cbf16" - integrity sha512-C/BaXcnnvBCmHTpz/VGZ8jgtE2aYlW4hxDhseJAWZb7gqGM/qtCK6iZUb0TyKFf7BOUsBH7Q7fkRsDRhg1XklQ== - dependencies: - "@babel/helper-validator-identifier" "^7.22.5" - chalk "^2.4.2" - js-tokens "^4.0.0" - "@babel/parser@^7.1.0", "@babel/parser@^7.14.7", "@babel/parser@^7.20.7", "@babel/parser@^7.22.5", "@babel/parser@^7.22.7": version "7.22.7" resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.22.7.tgz#df8cf085ce92ddbdbf668a7f186ce848c9036cae" @@ -992,17 +989,17 @@ integrity sha512-x/rqGMdzj+fWZvCOYForTghzbtqPDZ5gPwaoNGHdgDfF2QA/XZbCBp4Moo5scrkAMPhB7z26XM/AaHuIJdgauA== "@babel/runtime-corejs3@^7.10.2": - version "7.22.11" - resolved "https://registry.yarnpkg.com/@babel/runtime-corejs3/-/runtime-corejs3-7.22.11.tgz#bf65b846cb4a03e1594dba9850c4632a992ddc04" - integrity sha512-NhfzUbdWbiE6fCFypbWCPu6AR8xre31EOPF7wwAIJEvGQ2avov04eymayWinCuyXmV1b0+jzoXP/HYzzUYdvwg== + version "7.27.0" + resolved "https://registry.yarnpkg.com/@babel/runtime-corejs3/-/runtime-corejs3-7.27.0.tgz#c766df350ec7a2caf3ed64e3659b100954589413" + integrity sha512-UWjX6t+v+0ckwZ50Y5ShZLnlk95pP5MyW/pon9tiYzl3+18pkTHTFNTKr7rQbfRXPkowt2QAn30o1b6oswszew== dependencies: core-js-pure "^3.30.2" regenerator-runtime "^0.14.0" "@babel/runtime@^7.10.2", "@babel/runtime@^7.10.3": - version "7.22.11" - resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.22.11.tgz#7a9ba3bbe406ad6f9e8dd4da2ece453eb23a77a4" - integrity sha512-ee7jVNlWN09+KftVOu9n7S8gQzD/Z6hN/I8VBRXW4P1+Xe7kJGXMwu8vds4aGIMHZnNbdpSWCfZZtinytpcAvA== + version "7.27.0" + resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.27.0.tgz#fbee7cf97c709518ecc1f590984481d5460d4762" + integrity sha512-VtPOkrdPHZsKc/clNqyi9WUA8TINkZ4cGk63UUE3u4pmB2k+ZMQRDuIOagv8UVd6j7k0T3+RRIb7beKTebNbcw== dependencies: regenerator-runtime "^0.14.0" @@ -2351,9 +2348,9 @@ integrity sha512-iO9ZQHkZxHn4mSakYV0vFHAVDyEOIJQrV2uZ06HxEPcx+mt8swXoZHIbaaJ2crJYFfErySgktuTZ3BeLz+XmFA== "@types/yargs@^15.0.0": - version "15.0.15" - resolved "https://registry.yarnpkg.com/@types/yargs/-/yargs-15.0.15.tgz#e609a2b1ef9e05d90489c2f5f45bbfb2be092158" - integrity sha512-IziEYMU9XoVj8hWg7k+UJrXALkGFjWJhn5QFEv9q4p+v40oZhSuC135M38st8XPjICL7Ey4TV64ferBGUoJhBg== + version "15.0.19" + resolved "https://registry.yarnpkg.com/@types/yargs/-/yargs-15.0.19.tgz#328fb89e46109ecbdb70c295d96ff2f46dfd01b9" + integrity sha512-2XUaGVmyQjgyAZldf0D0c14vvo/yv0MhQBSTJcejMMaitsn3nxCB6TmH4G0ZQf+uxROOa9mpanoSm8h6SG/1ZA== dependencies: "@types/yargs-parser" "*" @@ -3106,7 +3103,7 @@ chalk@4.1.0: ansi-styles "^4.1.0" supports-color "^7.1.0" -chalk@^2.0.0, chalk@^2.4.2: +chalk@^2.0.0: version "2.4.2" resolved "https://registry.yarnpkg.com/chalk/-/chalk-2.4.2.tgz#cd42541677a54333cf541a49108c1432b44c9424" integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== @@ -3434,9 +3431,9 @@ core-js-compat@^3.31.0: browserslist "^4.21.9" core-js-pure@^3.30.2: - version "3.32.1" - resolved "https://registry.yarnpkg.com/core-js-pure/-/core-js-pure-3.32.1.tgz#5775b88f9062885f67b6d7edce59984e89d276f3" - integrity sha512-f52QZwkFVDPf7UEQZGHKx6NYxsxmVGJe5DIvbzOdRMJlmT6yv0KDjR8rmy3ngr/t5wU54c7Sp/qIJH0ppbhVpQ== + version "3.41.0" + resolved "https://registry.yarnpkg.com/core-js-pure/-/core-js-pure-3.41.0.tgz#349fecad168d60807a31e83c99d73d786fe80811" + integrity sha512-71Gzp96T9YPk63aUvE5Q5qP+DryB4ZloUZPSOebGM88VNw8VNfvdA7z6kGA8iGOTEzAomsRidp4jXSmUIJsL+Q== core-util-is@~1.0.0: version "1.0.3" @@ -4092,6 +4089,11 @@ events@^3.3.0: resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q== +eventsource@^2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508" + integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA== + execa@5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/execa/-/execa-5.0.0.tgz#4029b0007998a841fbd1032e5f4de86a3c1e3376" @@ -7543,9 +7545,9 @@ regenerator-runtime@^0.13.11: integrity sha512-kY1AZVr2Ra+t+piVaJ4gxaFaReZVH40AKNo7UCX6W+dEwBo/2oZJzqfuN1qLq1oL45o56cPaTXELwrTh8Fpggg== regenerator-runtime@^0.14.0: - version "0.14.0" - resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.14.0.tgz#5e19d68eb12d486f797e15a3c6a918f7cec5eb45" - integrity sha512-srw17NI0TUWHuGa5CFGGmhfNIeja30WMBfbslPNhf6JrqQlLN5gcrvig1oqPxiVaXb0oW0XRKtH6Nngs5lKCIA== + version "0.14.1" + resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz#356ade10263f685dda125100cd862c1db895327f" + integrity sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw== regenerator-transform@^0.15.1: version "0.15.1" @@ -8040,16 +8042,7 @@ string-length@^4.0.1: char-regex "^1.0.2" strip-ansi "^6.0.0" -"string-width-cjs@npm:string-width@^4.2.0": - version "4.2.3" - resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" - integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== - dependencies: - emoji-regex "^8.0.0" - is-fullwidth-code-point "^3.0.0" - strip-ansi "^6.0.1" - -"string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0", "string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -8108,14 +8101,7 @@ string_decoder@~1.1.1: dependencies: safe-buffer "~5.1.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1": - version "6.0.1" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" - integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== - dependencies: - ansi-regex "^5.0.1" - -strip-ansi@^6.0.0, strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -8856,16 +8842,7 @@ wordwrap@^1.0.0: resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb" integrity sha512-gvVzJFlPycKc5dZN4yPkP8w7Dc37BtP1yczEneOb4uq34pXZcvrtRTmWV8W+Ume+XCxKgbjM+nevkyFPMybd4Q== -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": - version "7.0.0" - resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" - integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== - dependencies: - ansi-styles "^4.0.0" - string-width "^4.1.0" - strip-ansi "^6.0.0" - -wrap-ansi@^7.0.0: +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0: version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==