-
Notifications
You must be signed in to change notification settings - Fork 928
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4595 from activepieces/feat/isolate-worker
feat: isolate worker form backend
- Loading branch information
Showing
14 changed files
with
328 additions
and
180 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
42 changes: 42 additions & 0 deletions
42
packages/server/api/src/app/flows/flow/flow-worker.controller.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import { FastifyPluginAsyncTypebox, Type } from '@fastify/type-provider-typebox' | ||
import { StatusCodes } from 'http-status-codes' | ||
import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization' | ||
import { flowVersionService } from '../flow-version/flow-version.service' | ||
import { flowService } from './flow.service' | ||
import { PopulatedFlow, PrincipalType } from '@activepieces/shared' | ||
|
||
export const flowWorkerController: FastifyPluginAsyncTypebox = async (fastify) => { | ||
fastify.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject) | ||
|
||
fastify.get('/', GetLockedVersionRequest, async (request) => { | ||
const flowVersion = await flowVersionService.getOneOrThrow(request.query.versionId) | ||
// Check if the flow version is owned by the current project | ||
const flow = await flowService.getOneOrThrow({ | ||
id: flowVersion.flowId, | ||
projectId: request.principal.projectId, | ||
}) | ||
const lockedVersion = await flowVersionService.lockPieceVersions({ | ||
flowVersion, | ||
projectId: request.principal.projectId, | ||
}) | ||
return { | ||
...flow, | ||
version: lockedVersion, | ||
} | ||
}, | ||
) | ||
} | ||
|
||
const GetLockedVersionRequest = { | ||
config: { | ||
allowedPrincipals: [PrincipalType.WORKER], | ||
}, | ||
schema: { | ||
querystring: Type.Object({ | ||
versionId: Type.String(), | ||
}), | ||
response: { | ||
[StatusCodes.OK]: PopulatedFlow, | ||
}, | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
194 changes: 194 additions & 0 deletions
194
packages/server/api/test/integration/ce/flows/flow-consume.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
import { FastifyInstance } from 'fastify' | ||
import { setupApp } from '../../../../src/app/app' | ||
import { databaseConnection } from '../../../../src/app/database/database-connection' | ||
import { flowWorker } from '../../../../src/app/workers/flow-worker/flow-worker' | ||
import { | ||
createMockFlow, | ||
createMockFlowRun, | ||
createMockFlowVersion, | ||
createMockPlatform, | ||
createMockProject, | ||
createMockUser, | ||
} from '../../../helpers/mocks' | ||
import { fileCompressor } from '@activepieces/server-shared' | ||
import { | ||
ActionType, | ||
ExecutionType, | ||
FlowRunStatus, | ||
FlowStatus, | ||
FlowVersionState, | ||
PackageType, | ||
PieceType, | ||
RunEnvironment, | ||
TriggerType, | ||
} from '@activepieces/shared' | ||
|
||
let app: FastifyInstance | null = null | ||
|
||
beforeAll(async () => { | ||
await databaseConnection.initialize() | ||
app = await setupApp() | ||
await app.listen({ | ||
host: '0.0.0.0', | ||
port: 3000, | ||
}) | ||
}) | ||
|
||
afterAll(async () => { | ||
await databaseConnection.destroy() | ||
await app?.close() | ||
}) | ||
|
||
describe('flow execution', () => { | ||
it('should execute simple flow with code and data mapper', async () => { | ||
const mockUser = createMockUser() | ||
await databaseConnection.getRepository('user').save([mockUser]) | ||
|
||
const mockPlatform = createMockPlatform({ ownerId: mockUser.id }) | ||
await databaseConnection.getRepository('platform').save([mockPlatform]) | ||
|
||
const mockProject = createMockProject({ ownerId: mockUser.id, platformId: mockPlatform.id }) | ||
await databaseConnection.getRepository('project').save([mockProject]) | ||
|
||
const mockFlow = createMockFlow({ | ||
projectId: mockProject.id, | ||
status: FlowStatus.ENABLED, | ||
}) | ||
await databaseConnection.getRepository('flow').save([mockFlow]) | ||
|
||
const mockFlowVersion = createMockFlowVersion({ | ||
flowId: mockFlow.id, | ||
updatedBy: mockUser.id, | ||
state: FlowVersionState.LOCKED, | ||
trigger: { | ||
type: TriggerType.PIECE, | ||
settings: { | ||
pieceName: '@activepieces/piece-schedule', | ||
pieceVersion: '0.1.0', | ||
input: { | ||
run_on_weekends: false, | ||
}, | ||
triggerName: 'everyHourTrigger', | ||
'pieceType': PieceType.OFFICIAL, | ||
'packageType': PackageType.REGISTRY, | ||
inputUiInfo: {}, | ||
}, | ||
valid: true, | ||
name: 'webhook', | ||
displayName: 'Webhook', | ||
nextAction: { | ||
name: 'echo_step', | ||
displayName: 'Echo Step', | ||
type: ActionType.CODE, | ||
settings: { | ||
inputUiInfo: {}, | ||
input: { | ||
key: '{{ 1 + 2 }}', | ||
}, | ||
sourceCode: { | ||
packageJson: '{}', | ||
code: ` | ||
export const code = async (inputs) => { | ||
return inputs; | ||
}; | ||
`, | ||
}, | ||
}, | ||
nextAction: { | ||
name: 'datamapper', | ||
displayName: 'Datamapper', | ||
type: ActionType.PIECE, | ||
settings: { | ||
inputUiInfo: {}, | ||
pieceName: '@activepieces/piece-data-mapper', | ||
pieceVersion: '0.3.0', | ||
packageType: 'REGISTRY', | ||
pieceType: 'OFFICIAL', | ||
actionName: 'advanced_mapping', | ||
input: { | ||
mapping: { | ||
key: '{{ 1 + 2 }}', | ||
}, | ||
}, | ||
}, | ||
valid: true, | ||
}, | ||
valid: true, | ||
}, | ||
}, | ||
}) | ||
await databaseConnection | ||
.getRepository('flow_version') | ||
.save([mockFlowVersion]) | ||
|
||
const mockFlowRun = createMockFlowRun({ | ||
flowVersionId: mockFlowVersion.id, | ||
projectId: mockProject.id, | ||
flowId: mockFlow.id, | ||
status: FlowRunStatus.RUNNING, | ||
}) | ||
await databaseConnection.getRepository('flow_run').save([mockFlowRun]) | ||
|
||
await flowWorker.executeFlow({ | ||
flowVersionId: mockFlowVersion.id, | ||
projectId: mockProject.id, | ||
environment: RunEnvironment.PRODUCTION, | ||
runId: mockFlowRun.id, | ||
payload: {}, | ||
executionType: ExecutionType.BEGIN, | ||
}) | ||
|
||
const flowRun = await databaseConnection | ||
.getRepository('flow_run') | ||
.findOneByOrFail({ | ||
id: mockFlowRun.id, | ||
}) | ||
expect(flowRun.status).toEqual(FlowRunStatus.SUCCEEDED) | ||
|
||
const file = await databaseConnection | ||
.getRepository('file') | ||
.findOneByOrFail({ | ||
id: flowRun.logsFileId, | ||
}) | ||
const decompressedData = await fileCompressor.decompress({ | ||
data: file.data, | ||
compression: file.compression, | ||
}) | ||
expect( | ||
JSON.parse(decompressedData.toString('utf-8')).executionState, | ||
).toEqual({ | ||
steps: { | ||
webhook: { | ||
type: 'PIECE_TRIGGER', | ||
status: 'SUCCEEDED', | ||
input: {}, | ||
output: {}, | ||
}, | ||
echo_step: { | ||
type: 'CODE', | ||
status: 'SUCCEEDED', | ||
input: { | ||
key: 3, | ||
}, | ||
output: { | ||
key: 3, | ||
}, | ||
duration: expect.any(Number), | ||
}, | ||
datamapper: { | ||
type: 'PIECE', | ||
status: 'SUCCEEDED', | ||
input: { | ||
mapping: { | ||
key: 3, | ||
}, | ||
}, | ||
output: { | ||
key: 3, | ||
}, | ||
duration: expect.any(Number), | ||
}, | ||
}, | ||
}) | ||
}, 60000) | ||
}) |
Oops, something went wrong.