Skip to content

feat: stream vardata #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:

- name: Test
run: yarn test
env:
MANAGEMENT_API_KEY: ${{ secrets.MANAGEMENT_API_KEY }}

- name: Configure Git User
run: |
Expand All @@ -58,7 +60,8 @@ jobs:
if: ${{ github.event.inputs.dryRun == 'true'}}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: yarn lerna version --no-push --no-git-tag-version --loglevel silly --yes
run: |
yarn lerna version --no-push --no-git-tag-version --loglevel silly --yes

- name: Setup NPM Token
if: ${{ github.event.inputs.dryRun == 'false'}}
Expand All @@ -71,6 +74,7 @@ jobs:
if: ${{ github.event.inputs.dryRun == 'false'}}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Adds all params to both version and publish just to be safe.
run: |
yarn lerna version --yes
yarn lerna publish from-git --yes --loglevel silly
yarn lerna version --conventionalPrerelease --preid=beta --no-private
yarn lerna publish from-git --loglevel silly --dist-tag=beta --pre-dist-tag=beta --conventionalPrerelease --preid=beta --no-private
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ jobs:

- name: Test
run: yarn test
env:
MANAGEMENT_API_KEY: ${{ secrets.MANAGEMENT_API_KEY }}
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ dist/

# Example Experiment tag script
packages/experiment-tag/example/

# Environment variables
.env*
5 changes: 3 additions & 2 deletions lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"useWorkspaces": true,
"command": {
"version": {
"allowBranch": "main",
"allowBranch": "stream-vardata",
"conventionalCommits": true,
"createRelease": "github",
"message": "chore(release): publish",
"preid": "beta"
"preid": "beta",
"conventionalPrerelease": true
}
}
}
1 change: 1 addition & 0 deletions packages/experiment-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@amplitude/experiment-core": "^0.11.0",
"@amplitude/ua-parser-js": "^0.7.31",
"base64-js": "1.5.1",
"eventsource": "^2",
"unfetch": "4.1.0"
},
"devDependencies": {
Expand Down
14 changes: 14 additions & 0 deletions packages/experiment-browser/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ export interface ExperimentConfig {
*/
flagConfigPollingIntervalMillis?: number;

/**
* If true, the client will stream updates for remote evaluation from the server.
* fetch() will update variants and initiate a connection to the server.
*/
streamVariants?: boolean;

/**
* The URL to stream remote evaluation updates from. This is only used if
* `streamVariants` is `true`.
*/
streamVariantsServerUrl?: string;

/**
* Explicitly enable or disable calling {@link fetch()} on {@link start()}:
*
Expand Down Expand Up @@ -190,6 +202,8 @@ export const Defaults: ExperimentConfig = {
automaticExposureTracking: true,
pollOnStart: true,
flagConfigPollingIntervalMillis: 300000,
streamVariants: false,
streamVariantsServerUrl: 'https://stream.lab.amplitude.com',
fetchOnStart: true,
automaticFetchOnAmplitudeIdentityChange: false,
userProvider: null,
Expand Down
160 changes: 64 additions & 96 deletions packages/experiment-browser/src/experimentClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import {
EvaluationApi,
EvaluationEngine,
EvaluationFlag,
EvaluationVariant,
FetchError,
FlagApi,
Poller,
SdkEvaluationApi,
SdkFlagApi,
SdkStreamEvaluationApi,
TimeoutError,
topologicalSort,
} from '@amplitude/experiment-core';
Expand All @@ -29,6 +31,7 @@ import {
import { LocalStorage } from './storage/local-storage';
import { SessionStorage } from './storage/session-storage';
import { FetchHttpClient, WrapperClient } from './transport/http';
import { defaultSseProvider } from './transport/stream';
import { exposureEvent } from './types/analytics';
import { Client, FetchOptions } from './types/client';
import { Exposure, ExposureTrackingProvider } from './types/exposure';
Expand All @@ -51,18 +54,22 @@ import {
} from './util/convert';
import { SessionAnalyticsProvider } from './util/sessionAnalyticsProvider';
import { SessionExposureTrackingProvider } from './util/sessionExposureTrackingProvider';
import {
VariantsFetchUpdater,
VariantsRetryAndFallbackWrapperUpdater,
VariantsStreamUpdater,
VariantUpdater,
} from './util/updaters';

// Configs which have been removed from the public API.
// May be added back in the future.
const fetchBackoffTimeout = 10000;
const fetchBackoffAttempts = 8;
const fetchBackoffMinMillis = 500;
const fetchBackoffMaxMillis = 10000;
const fetchBackoffScalar = 1.5;
const minFlagPollerIntervalMillis = 60000;
const streamConnectionTimeoutMillis = 3000;
const streamRetryIntervalMillis = 10 * 60 * 1000;

const euServerUrl = 'https://api.lab.eu.amplitude.com';
const euFlagsServerUrl = 'https://flag.lab.eu.amplitude.com';
const euStreamVariantsServerUrl = 'https://stream.lab.eu.amplitude.com';

/**
* The default {@link Client} used to fetch variations from Experiment's
Expand All @@ -76,7 +83,7 @@ export class ExperimentClient implements Client {
private readonly variants: LoadStoreCache<Variant>;
private readonly flags: LoadStoreCache<EvaluationFlag>;
private readonly flagApi: FlagApi;
private readonly evaluationApi: EvaluationApi;
private readonly variantUpdater: VariantUpdater;
private readonly engine: EvaluationEngine = new EvaluationEngine();
private user: ExperimentUser | undefined;
private userProvider: ExperimentUserProvider | undefined;
Expand Down Expand Up @@ -117,6 +124,11 @@ export class ExperimentClient implements Client {
(config?.serverZone?.toLowerCase() === 'eu'
? euFlagsServerUrl
: Defaults.flagsServerUrl),
streamVariantsServerUrl:
config?.streamVariantsServerUrl ||
(config?.serverZone?.toLowerCase() === 'eu'
? euStreamVariantsServerUrl
: Defaults.streamVariantsServerUrl),
// Force minimum flag config polling interval.
flagConfigPollingIntervalMillis:
config.flagConfigPollingIntervalMillis < minFlagPollerIntervalMillis
Expand Down Expand Up @@ -161,11 +173,27 @@ export class ExperimentClient implements Client {
this.config.flagsServerUrl,
httpClient,
);
this.evaluationApi = new SdkEvaluationApi(
const evaluationApi = new SdkEvaluationApi(
this.apiKey,
this.config.serverUrl,
httpClient,
);
this.variantUpdater = new VariantsFetchUpdater(evaluationApi);
if (config.streamVariants) {
const streamEvaluationApi = new SdkStreamEvaluationApi(
this.apiKey,
this.config.streamVariantsServerUrl,
defaultSseProvider,
streamConnectionTimeoutMillis,
evaluationApi,
);
const streamUpdater = new VariantsStreamUpdater(streamEvaluationApi);
this.variantUpdater = new VariantsRetryAndFallbackWrapperUpdater(
streamUpdater,
this.variantUpdater,
streamRetryIntervalMillis,
);
}
// Storage & Caching
let storage: Storage;
const storageInstanceName = internalInstanceName
Expand Down Expand Up @@ -233,6 +261,7 @@ export class ExperimentClient implements Client {
* Stop the local flag configuration poller.
*/
public stop() {
this.variantUpdater.stop(); // Stop the variant updater, no waiting needed.
if (!this.isRunning) {
return;
}
Expand Down Expand Up @@ -267,26 +296,38 @@ export class ExperimentClient implements Client {
user: ExperimentUser = this.user,
options?: FetchOptions,
): Promise<ExperimentClient> {
this.setUser(user || {});
try {
await this.fetchInternal(
user,
this.config.fetchTimeoutMillis,
this.config.retryFetchOnFailure,
options,
);
} catch (e) {
if (this.config.debug) {
if (e instanceof TimeoutError) {
console.debug(e);
} else {
console.error(e);
}
}
// Don't even try to fetch variants if API key is not set
if (!this.apiKey) {
throw Error('Experiment API key is empty');
}
this.setUser(user || {});
user = await this.addContextOrWait(user);
user = this.cleanUserPropsForFetch(user);
await this.variantUpdater.start(
async (results: Record<string, EvaluationVariant>) => {
// On receiving variants update.
this.debug('[Experiment] Received variants update');
await this.processVariants(results, options);
},
(err) => {
console.error(err);
},
{ user, options, config: this.config },
);
return this;
}

private async processVariants(
flagKeyToVariant: Record<string, EvaluationVariant>,
options?: FetchOptions,
): Promise<void> {
const variants: Variants = {};
Object.entries(flagKeyToVariant).forEach(([key, variant]) => {
variants[key] = convertEvaluationVariantToVariant(variant);
});
await this.storeVariants(variants, options);
}

/**
* Returns the variant for the provided key.
*
Expand Down Expand Up @@ -665,63 +706,12 @@ export class ExperimentClient implements Client {
return defaultSourceVariant;
}

private async fetchInternal(
user: ExperimentUser,
timeoutMillis: number,
retry: boolean,
options?: FetchOptions,
): Promise<Variants> {
// Don't even try to fetch variants if API key is not set
if (!this.apiKey) {
throw Error('Experiment API key is empty');
}

this.debug(`[Experiment] Fetch all: retry=${retry}`);

// Proactively cancel retries if active in order to avoid unnecessary API
// requests. A new failure will restart the retries.
if (retry) {
this.stopRetries();
}

try {
const variants = await this.doFetch(user, timeoutMillis, options);
await this.storeVariants(variants, options);
return variants;
} catch (e) {
if (retry && this.shouldRetryFetch(e)) {
void this.startRetries(user, options);
}
throw e;
}
}

private cleanUserPropsForFetch(user: ExperimentUser): ExperimentUser {
const cleanedUser = { ...user };
delete cleanedUser.cookie;
return cleanedUser;
}

private async doFetch(
user: ExperimentUser,
timeoutMillis: number,
options?: FetchOptions,
): Promise<Variants> {
user = await this.addContextOrWait(user);
user = this.cleanUserPropsForFetch(user);
this.debug('[Experiment] Fetch variants for user: ', user);
const results = await this.evaluationApi.getVariants(user, {
timeoutMillis: timeoutMillis,
...options,
});
const variants: Variants = {};
for (const key of Object.keys(results)) {
variants[key] = convertEvaluationVariantToVariant(results[key]);
}
this.debug('[Experiment] Received variants: ', variants);
return variants;
}

private async doFlags(): Promise<void> {
try {
let user: ExperimentUser;
Expand Down Expand Up @@ -780,28 +770,6 @@ export class ExperimentClient implements Client {
this.debug('[Experiment] Stored variants: ', variants);
}

private async startRetries(
user: ExperimentUser,
options: FetchOptions,
): Promise<void> {
this.debug('[Experiment] Retry fetch');
this.retriesBackoff = new Backoff(
fetchBackoffAttempts,
fetchBackoffMinMillis,
fetchBackoffMaxMillis,
fetchBackoffScalar,
);
void this.retriesBackoff.start(async () => {
await this.fetchInternal(user, fetchBackoffTimeout, false, options);
});
}

private stopRetries(): void {
if (this.retriesBackoff) {
this.retriesBackoff.cancel();
}
}

private addContext(user: ExperimentUser): ExperimentUser {
const providedUser = this.userProvider?.getUser();
const integrationUser = this.integrationManager.getUser();
Expand Down
15 changes: 15 additions & 0 deletions packages/experiment-browser/src/transport/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* @packageDocumentation
* @internal
*/

import { SSE, SSEProviderParams } from '@amplitude/experiment-core';
import EventSource from 'eventsource';

export const defaultSseProvider = (
url: string,
params: SSEProviderParams,
): SSE => {
const es = new EventSource(url, params);
return es;
};
Loading