diff --git a/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts b/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts new file mode 100644 index 0000000000..84470e0429 --- /dev/null +++ b/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts @@ -0,0 +1,244 @@ +import { SegmentEvent, createTestEvent, createTestIntegration } from '@segment/actions-core' +import nock from 'nock' +import { API_URL } from '../config' +import Braze from '../index' + +beforeEach(() => nock.cleanAll()) + +const testDestination = createTestIntegration(Braze) + +const settings = { + api_key: 'my-api-key' +} + +const timestamp = '2024-07-22T20:08:49.7931Z' + +describe('MultiStatus', () => { + describe('trackEvent', () => { + const mapping = { + profile: { + '@path': '$.properties' + }, + metric_name: { + '@path': '$.event' + }, + properties: { + '@path': '$.properties' + }, + time: { + '@path': '$.timestamp' + }, + unique_id: { + '@path': '$.messageId' + } + } + + it("should successfully handle those payload where phone_number is invalid and couldn't be converted to E164 format", async () => { + nock(API_URL).post('/event-bulk-create-jobs/').reply(202, {}) + + const events: SegmentEvent[] = [ + // Event with invalid phone_number + createTestEvent({ + type: 'track', + timestamp, + properties: { + country_code: 'IN', + phone_number: '701271', + email: 'valid@gmail.com' + } + }), + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'valid@gmail.com' + } + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The First event fails as pre-request validation fails for having invalid phone_number and could not be converted to E164 format + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.', + errorreporter: 'DESTINATION' + }) + + // The Second event doesn't fail as there is no error reported by Klaviyo API + expect(response[1]).toMatchObject({ + status: 200, + body: 'success' + }) + }) + + it('should successfully handle a batch of events with complete success response from Klaviyo API', async () => { + nock(API_URL).post('/event-bulk-create-jobs/').reply(202, {}) + + const events: SegmentEvent[] = [ + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'valid@gmail.com' + } + }), + // Event without any user identifier + createTestEvent({ + type: 'track', + timestamp + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The first event doesn't fail as there is no error reported by Klaviyo API + expect(response[0]).toMatchObject({ + status: 200, + body: 'success' + }) + + // The second event fails as pre-request validation fails for not having any user identifier + expect(response[1]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.', + errorreporter: 'DESTINATION' + }) + }) + + it('should successfully handle a batch of events with failure response from Klaviyo API', async () => { + // Mocking a 400 response from Klaviyo API + const mockResponse = { + errors: [ + { + id: '752f7ece-af20-44e0-aa3a-b13290d98e72', + status: 400, + code: 'invalid', + title: 'Invalid input.', + detail: 'Invalid email address', + source: { + pointer: '/data/attributes/events-bulk-create/data/0/attributes/email' + }, + links: {}, + meta: {} + } + ] + } + nock(API_URL).post('/event-bulk-create-jobs/').reply(400, mockResponse) + + const events: SegmentEvent[] = [ + // Invalid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'invalid_email' + } + }), + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + external_id: 'Xi1234' + } + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The first doesn't fail as there is no error reported by Braze API + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Invalid email address', + sent: { + profile: { + email: 'invalid_email' + }, + metric_name: 'Test Event', + properties: { + email: 'invalid_email' + }, + time: timestamp + }, + body: '{"id":"752f7ece-af20-44e0-aa3a-b13290d98e72","status":400,"code":"invalid","title":"Invalid input.","detail":"Invalid email address","source":{"pointer":"/data/attributes/events-bulk-create/data/0/attributes/email"},"links":{},"meta":{}}' + }) + + // The second event fails as Braze API reports an error + expect(response[1]).toMatchObject({ + status: 429, + sent: { + profile: { + external_id: 'Xi1234' + }, + metric_name: 'Test Event', + properties: { + external_id: 'Xi1234' + }, + time: timestamp + }, + body: 'Retry' + }) + }) + + it('should successfully handle a batch when all payload is invalid', async () => { + const events: SegmentEvent[] = [ + // Event with invalid phone_number + createTestEvent({ + type: 'track', + timestamp, + properties: { + country_code: 'IN', + phone_number: '701271', + email: 'valid@gmail.com' + } + }), + // Event without any user identifier + createTestEvent({ + type: 'track', + timestamp, + properties: {} + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The First event fails as pre-request validation fails for having invalid phone_number and could not be converted to E164 format + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.', + errorreporter: 'DESTINATION' + }) + + // The second event fails as pre-request validation fails for not having any user identifier + expect(response[1]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.', + errorreporter: 'DESTINATION' + }) + }) + }) +}) diff --git a/packages/destination-actions/src/destinations/klaviyo/config.ts b/packages/destination-actions/src/destinations/klaviyo/config.ts index e49dca683f..1999022b0c 100644 --- a/packages/destination-actions/src/destinations/klaviyo/config.ts +++ b/packages/destination-actions/src/destinations/klaviyo/config.ts @@ -1,5 +1,5 @@ export const API_URL = 'https://a.klaviyo.com/api' -export const REVISION_DATE = '2023-09-15' +export const REVISION_DATE = '2024-05-15' export const COUNTRY_CODES = [ { label: 'AD - Andorra', value: 'AD' }, { label: 'AE - United Arab Emirates', value: 'AE' }, diff --git a/packages/destination-actions/src/destinations/klaviyo/functions.ts b/packages/destination-actions/src/destinations/klaviyo/functions.ts index 2d9f2365b9..5ec92f62e5 100644 --- a/packages/destination-actions/src/destinations/klaviyo/functions.ts +++ b/packages/destination-actions/src/destinations/klaviyo/functions.ts @@ -3,8 +3,11 @@ import { RequestClient, DynamicFieldResponse, IntegrationError, - PayloadValidationError + PayloadValidationError, + MultiStatusResponse, + HTTPError } from '@segment/actions-core' +import { JSONLikeObject } from '@segment/actions-core' import { API_URL, REVISION_DATE } from './config' import { Settings } from './generated-types' import { @@ -20,10 +23,14 @@ import { UnsubscribeProfile, UnsubscribeEventData, GroupedProfiles, - AdditionalAttributes + AdditionalAttributes, + KlaviyoAPIErrorResponse } from './types' import { Payload } from './upsertProfile/generated-types' +import { Payload as TrackEventPayload } from './trackEvent/generated-types' +import dayjs from '../../lib/dayjs' import { PhoneNumberUtil, PhoneNumberFormat } from 'google-libphonenumber' +import { eventBulkCreateRegex } from './properties' const phoneUtil = PhoneNumberUtil.getInstance() @@ -451,3 +458,169 @@ export function processPhoneNumber(initialPhoneNumber?: string, country_code?: s return phone_number } + +export async function sendBatchedTrackEvent(request: RequestClient, payloads: TrackEventPayload[]) { + const multiStatusResponse = new MultiStatusResponse() + const { filteredPayloads, validPayloadIndicesBitmap } = validateAndPreparePayloads(payloads, multiStatusResponse) + // if there are no payloads with valid phone number/email/external_id, return multiStatusResponse + if (!filteredPayloads.length) { + return multiStatusResponse + } + const payloadToSend = { + data: { + type: 'event-bulk-create-job', + attributes: { + 'events-bulk-create': { + data: filteredPayloads + } + } + } + } + + try { + await request(`${API_URL}/event-bulk-create-jobs/`, { + method: 'POST', + json: payloadToSend + }) + } catch (err) { + if (err instanceof HTTPError) { + const errorResponse = await err?.response?.json() + handleKlaviyoAPIErrorResponse( + payloads as object as JSONLikeObject[], + errorResponse, + multiStatusResponse, + validPayloadIndicesBitmap, + eventBulkCreateRegex + ) + } else { + // Bubble up the error and let Actions Framework handle it + throw err + } + } + return multiStatusResponse +} + +function validateAndPreparePayloads(payloads: TrackEventPayload[], multiStatusResponse: MultiStatusResponse) { + const filteredPayloads: JSONLikeObject[] = [] + const validPayloadIndicesBitmap: number[] = [] + + payloads.forEach((payload, originalBatchIndex) => { + const { email, phone_number, external_id, anonymous_id, country_code } = payload.profile + if (!email && !phone_number && !external_id && !anonymous_id) { + multiStatusResponse.setErrorResponseAtIndex(originalBatchIndex, { + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.' + }) + return + } + + if (phone_number) { + // Validate and convert the phone number if present + const validPhoneNumber = validateAndConvertPhoneNumber(phone_number, country_code as string) + // If the phone number is not valid, skip this payload + if (!validPhoneNumber) { + multiStatusResponse.setErrorResponseAtIndex(originalBatchIndex, { + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.' + }) + return // Skip this payload + } + + // Update the payload's phone number with the validated format + payload.profile.phone_number = validPhoneNumber + delete payload?.profile?.country_code + } + + const profileToAdd = constructBulkCreateEventPayload(payload) + filteredPayloads.push(profileToAdd as JSONLikeObject) + validPayloadIndicesBitmap.push(originalBatchIndex) + multiStatusResponse.setSuccessResponseAtIndex(originalBatchIndex, { + status: 200, + sent: profileToAdd as JSONLikeObject, + body: 'success' + }) + }) + + return { filteredPayloads, validPayloadIndicesBitmap } +} + +function constructBulkCreateEventPayload(payload: TrackEventPayload) { + return { + type: 'event-bulk-create', + attributes: { + profile: { + data: { + type: 'profile', + attributes: payload.profile + } + }, + events: { + data: [ + { + type: 'event', + attributes: { + metric: { + data: { + type: 'metric', + attributes: { + name: payload.metric_name + } + } + }, + properties: { ...payload.properties }, + time: payload?.time ? dayjs(payload.time).toISOString() : undefined, + value: payload.value, + unique_id: payload.unique_id + } + } + ] + } + } + } +} + +function handleKlaviyoAPIErrorResponse( + payloads: JSONLikeObject[], + response: KlaviyoAPIErrorResponse, + multiStatusResponse: MultiStatusResponse, + validPayloadIndicesBitmap: number[], + regex: RegExp +) { + if (response?.errors && Array.isArray(response.errors)) { + const invalidIndexSet = new Set() + response.errors.forEach((error: KlaviyoAPIError) => { + const indexInOriginalPayload = getIndexFromErrorPointer(error.source.pointer, validPayloadIndicesBitmap, regex) + if (indexInOriginalPayload !== -1 && !multiStatusResponse.isErrorResponseAtIndex(indexInOriginalPayload)) { + multiStatusResponse.setErrorResponseAtIndex(indexInOriginalPayload, { + status: error.status, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: error.detail, + sent: payloads[indexInOriginalPayload], + body: JSON.stringify(error) + }) + invalidIndexSet.add(indexInOriginalPayload) + } + }) + + for (const index of validPayloadIndicesBitmap) { + if (!invalidIndexSet.has(index)) { + multiStatusResponse.setSuccessResponseAtIndex(index, { + status: 429, + sent: payloads[index], + body: 'Retry' + }) + } + } + } +} + +function getIndexFromErrorPointer(pointer: string, validPayloadIndicesBitmap: number[], regex: RegExp) { + const match = regex.exec(pointer) + if (match && match[1]) { + const index = parseInt(match[1], 10) + return validPayloadIndicesBitmap[index] !== undefined ? validPayloadIndicesBitmap[index] : -1 + } + return -1 +} diff --git a/packages/destination-actions/src/destinations/klaviyo/properties.ts b/packages/destination-actions/src/destinations/klaviyo/properties.ts index f2f440a40b..6228b7e9f7 100644 --- a/packages/destination-actions/src/destinations/klaviyo/properties.ts +++ b/packages/destination-actions/src/destinations/klaviyo/properties.ts @@ -162,3 +162,4 @@ export const country_code: InputField = { ] } } +export const eventBulkCreateRegex = /\/data\/attributes\/events-bulk-create\/data\/(\d+)/ diff --git a/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts b/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts index 2422660431..c88449fd2a 100644 --- a/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts +++ b/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts @@ -50,4 +50,12 @@ export interface Payload { * */ unique_id?: string + /** + * When enabled, the action will use the klaviyo batch API. + */ + enable_batching?: boolean + /** + * Maximum number of events to include in each batch. Actual batch sizes may be lower. + */ + batch_size?: number } diff --git a/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts b/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts index 0c59233859..b38988727e 100644 --- a/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts +++ b/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts @@ -1,12 +1,11 @@ import type { ActionDefinition } from '@segment/actions-core' import type { Settings } from '../generated-types' import type { Payload } from './generated-types' - import { PayloadValidationError } from '@segment/actions-core' import { API_URL } from '../config' -import { processPhoneNumber } from '../functions' -import { country_code } from '../properties' -import dayjs from 'dayjs' +import { batch_size, enable_batching, country_code } from '../properties' +import { processPhoneNumber, sendBatchedTrackEvent } from '../functions' +import dayjs from '../../../lib/dayjs' const action: ActionDefinition = { title: 'Track Event', @@ -91,7 +90,9 @@ const action: ActionDefinition = { default: { '@path': '$.messageId' } - } + }, + enable_batching: { ...enable_batching }, + batch_size: { ...batch_size } }, perform: (request, { payload }) => { const { email, phone_number: initialPhoneNumber, external_id, anonymous_id, country_code } = payload.profile @@ -128,11 +129,13 @@ const action: ActionDefinition = { } } } - return request(`${API_URL}/events/`, { method: 'POST', json: eventData }) + }, + performBatch: (request, { payload }) => { + return sendBatchedTrackEvent(request, payload) } } diff --git a/packages/destination-actions/src/destinations/klaviyo/types.ts b/packages/destination-actions/src/destinations/klaviyo/types.ts index fc645ad0c5..d28bcf24f6 100644 --- a/packages/destination-actions/src/destinations/klaviyo/types.ts +++ b/packages/destination-actions/src/destinations/klaviyo/types.ts @@ -219,3 +219,17 @@ export interface AdditionalAttributes { title?: string image?: string } +export interface KlaviyoAPIError { + id: string + status: number + code: string + title: string + detail: string + source: { + pointer: string + parameter?: string + } +} +export interface KlaviyoAPIErrorResponse { + errors: KlaviyoAPIError[] +}