-
Notifications
You must be signed in to change notification settings - Fork 923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: piece branching #4599
feat: piece branching #4599
Changes from 26 commits
dadcf4f
81a9761
8c97cf8
98e36c3
94fe22d
30356eb
cb59db9
eb41659
1424846
7373ad8
779ba07
2b253de
620d18b
1cab75e
05bf68c
2e47e03
bf11b2f
264f950
bdbc66d
b25f628
8d5cc10
317c2fa
b315441
5c13741
b260819
94ede0f
528c5ae
581fbf5
625e86a
8cf826e
7fa641f
8af8123
aa742c8
a706879
24d96fe
f6d11e5
138bd25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
--- | ||
title: 'Piece Branching' | ||
icon: 'hammer' | ||
description: 'Make a piece branchable' | ||
--- | ||
Activepieces allows pieces to be directly branched from depending on the action by adding the expected outputs in the `outputs` array of objects when creating an action. | ||
|
||
### Waiting for approval action | ||
|
||
We will see the branching feature within the context of the approval process, using the waiting for approval action as an example. | ||
|
||
**Example:** | ||
|
||
```typescript | ||
import { createAction } from '@activepieces/pieces-framework'; | ||
import { ExecutionType, PauseType, branchedPieceResponse } from '@activepieces/shared'; | ||
|
||
export const waitForApprovalLink = createAction({ | ||
name: 'wait_for_approval', | ||
displayName: 'Wait for Approval', | ||
description: 'Pauses the flow and wait for the approval from the user', | ||
props: {}, | ||
errorHandlingOptions: { | ||
continueOnFailure: { | ||
hide: true, | ||
}, | ||
retryOnFailure: { | ||
hide: true, | ||
}, | ||
}, | ||
// Here we define the names of the outputs expected to be branched from the piece | ||
outputs: [ | ||
{ name: 'approved' }, | ||
{ name: 'denied' } | ||
], | ||
async run(ctx) { | ||
if (ctx.executionType === ExecutionType.BEGIN) { | ||
ctx.run.pause({ | ||
pauseMetadata: { | ||
type: PauseType.WEBHOOK, | ||
response: {}, | ||
}, | ||
}); | ||
|
||
// Return an empty Map while execution is paused | ||
return branchedPieceResponse() | ||
} else { | ||
// Return a Map populated with the expected returned data | ||
return branchedPieceResponse({ | ||
version: 'v1', // Optional: adding a version for the expected response. Defaults to 'v1'. | ||
islamaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
approved: ctx.resumePayload.queryParams['action'] === 'approve', | ||
denied: ctx.resumePayload.queryParams['action'] !== 'approve', | ||
}) | ||
} | ||
}, | ||
}); | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import { URL } from 'url' | ||
import { ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework' | ||
import { Action, ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, RunFunctionReturnType, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework' | ||
import { ActionType, assertNotNullOrUndefined, AUTHENTICATION_PROPERTY_NAME, ExecutionType, FlowRunStatus, GenericStepOutput, isNil, PauseType, PieceAction, StepOutputStatus } from '@activepieces/shared' | ||
import { continueIfFailureHandler, handleExecutionError, runWithExponentialBackoff } from '../helper/error-handling' | ||
import { pieceLoader } from '../helper/piece-loader' | ||
|
@@ -9,9 +9,19 @@ import { createContextStore } from '../services/storage.service' | |
import { ActionHandler, BaseExecutor } from './base-executor' | ||
import { EngineConstants } from './context/engine-constants' | ||
import { ExecutionVerdict, FlowExecutorContext } from './context/flow-execution-context' | ||
import { flowExecutor } from './flow-executor' | ||
|
||
type HookResponse = { stopResponse: StopHookParams | undefined, pauseResponse: PauseHookParams | undefined, tags: string[], stopped: boolean, paused: boolean } | ||
|
||
type RunBranchablePieceWithVersion = { | ||
version: 'v1' | ||
pieceOutput: RunFunctionReturnType | ||
executionState: FlowExecutorContext | ||
action: PieceAction | ||
constants: EngineConstants | ||
stepOutput: GenericStepOutput<ActionType.PIECE, unknown> | ||
} | ||
|
||
export const pieceExecutor: BaseExecutor<PieceAction> = { | ||
async handle({ | ||
action, | ||
|
@@ -128,6 +138,18 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat | |
pauseMetadata: hookResponse.pauseResponse.pauseMetadata, | ||
}) | ||
} | ||
if (action.children && hasBranches(pieceAction)) { | ||
const pieceOutput = output as RunFunctionReturnType | ||
|
||
return await runBranchablePieceWithVersion({ | ||
executionState, | ||
pieceOutput, | ||
action, | ||
constants, | ||
stepOutput, | ||
version: pieceOutput.version, | ||
}) | ||
} | ||
|
||
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined) | ||
} | ||
|
@@ -144,6 +166,45 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat | |
} | ||
} | ||
|
||
function hasBranches(pieceAction: Action): boolean { | ||
return !isNil(pieceAction.outputs) && pieceAction.outputs.length > 1 | ||
} | ||
|
||
async function runBranchablePieceWithVersion({ | ||
pieceOutput, | ||
executionState, | ||
action, | ||
constants, | ||
stepOutput, | ||
version = 'v1', | ||
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> { | ||
const versions = { | ||
v1: async (): Promise<FlowExecutorContext> => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is disaster, If approval piece is changed that mean we need to change the engine and make engine doesn't work at all. We have documented how branching should work in the framework but this will not work at all as it require approved to be hardcoded and onSuccessAction It can be rewritten like that async function runBranchablePieceWithVersion({
pieceOutput,
executionState,
action,
constants,
stepOutput,
version = 'v1',
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> {
const versions = {
v1: async (): Promise<FlowExecutorContext> => {
let newExecutionContext = executionState
for (const [branchKey, _branchValue] of pieceOutput.output.entries()) {
const childAction = action.children[branchKey]
if (isNil(childAction)) {
continue
}
newExecutionContext = await flowExecutor.execute({
action: childAction,
executionState: newExecutionContext,
constants,
})
break;
}
return newExecutionContext.upsertStep(action.name, stepOutput
.setOutput(pieceOutput.output))
.increaseTask()
.setVerdict(ExecutionVerdict.RUNNING, undefined)
},
}
return versions[version]()
} |
||
let newExecutionContext = executionState | ||
let outputValue = undefined | ||
for (const [k, v] of pieceOutput.output.entries()) { | ||
if (v === false || v === undefined || isNil(v)) { | ||
continue | ||
} | ||
|
||
if (isNil(action.children)) { | ||
continue | ||
} | ||
|
||
newExecutionContext = await flowExecutor.execute({ | ||
action: action.children.filter(child => child.name === k)[0].action, | ||
executionState: newExecutionContext, | ||
constants, | ||
}) | ||
outputValue = v | ||
} | ||
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(outputValue)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined) | ||
}, | ||
} | ||
|
||
return versions[version]() | ||
} | ||
|
||
const createTagsManager = (hookResponse: HookResponse): TagsManager => { | ||
return { | ||
add: async (params: { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,38 @@ | ||
import { createAction } from '@activepieces/pieces-framework'; | ||
import { ExecutionType, PauseType } from '@activepieces/shared'; | ||
|
||
export const waitForApprovalLink = createAction({ | ||
name: 'wait_for_approval', | ||
displayName: 'Wait for Approval', | ||
description: 'Pauses the flow and wait for the approval from the user', | ||
props: {}, | ||
errorHandlingOptions: { | ||
continueOnFailure: { | ||
hide: true, | ||
}, | ||
retryOnFailure: { | ||
hide: true, | ||
}, | ||
}, | ||
async run(ctx) { | ||
if (ctx.executionType === ExecutionType.BEGIN) { | ||
ctx.run.pause({ | ||
pauseMetadata: { | ||
type: PauseType.WEBHOOK, | ||
response: {} | ||
}, | ||
}); | ||
|
||
return { | ||
approved: true, | ||
}; | ||
} else { | ||
return { | ||
approved: ctx.resumePayload.queryParams['action'] === 'approve', | ||
}; | ||
} | ||
}, | ||
}); | ||
import { createAction } from '@activepieces/pieces-framework'; | ||
import { ExecutionType, PauseType, branchedPieceResponse } from '@activepieces/shared'; | ||
|
||
export const waitForApprovalLink = createAction({ | ||
name: 'wait_for_approval', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You will also have to bump |
||
displayName: 'Wait for Approval', | ||
description: 'Pauses the flow and wait for the approval from the user', | ||
props: {}, | ||
errorHandlingOptions: { | ||
continueOnFailure: { | ||
hide: true, | ||
}, | ||
retryOnFailure: { | ||
hide: true, | ||
}, | ||
}, | ||
outputs: [ | ||
{ name: 'approved' }, | ||
{ name: 'denied' } | ||
], | ||
async run(ctx) { | ||
if (ctx.executionType === ExecutionType.BEGIN) { | ||
ctx.run.pause({ | ||
pauseMetadata: { | ||
type: PauseType.WEBHOOK, | ||
response: {}, | ||
}, | ||
}); | ||
|
||
return branchedPieceResponse() | ||
} else { | ||
return branchedPieceResponse({ | ||
approved: ctx.resumePayload.queryParams['action'] === 'approve', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are depending on the value to decide where to branch this is wrong, what if both were true. What if i want to include information like it were approved by who? I think you should use key as to execute branch or not this can be rewritten like that async function runBranchablePieceWithVersion({
pieceOutput,
executionState,
action,
constants,
stepOutput,
version = 'v1',
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> {
const versions = {
v1: async (): Promise<FlowExecutorContext> => {
let newExecutionContext = executionState
for (const [branchKey, _branchValue] of pieceOutput.output.entries()) {
const childAction = action.children[branchKey]
if (isNil(childAction)) {
continue
}
newExecutionContext = await flowExecutor.execute({
action: childAction,
executionState: newExecutionContext,
constants,
})
break;
}
return newExecutionContext.upsertStep(action.name, stepOutput
.setOutput(pieceOutput.output))
.increaseTask()
.setVerdict(ExecutionVerdict.RUNNING, undefined)
},
}
return versions[version]()
} Also you can change generate approval link to async run(ctx) {
return {
approveLink: ctx.generateResumeUrl({
queryParams: { action: 'approve' },
}),
denyLink: ctx.generateResumeUrl({
queryParams: { action: 'deny' },
}),
};
``` |
||
denied: ctx.resumePayload.queryParams['action'] !== 'approve', | ||
abuaboud marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}) | ||
} | ||
}, | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,86 @@ | ||
import { Static, Type } from '@sinclair/typebox'; | ||
import { ActionContext } from '../context'; | ||
import { ActionBase } from '../piece-metadata'; | ||
import { InputPropertyMap } from '../property'; | ||
import { PieceAuthProperty } from '../property/authentication'; | ||
|
||
export type ActionRunner<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = | ||
(ctx: ActionContext<PieceAuth, ActionProps>) => Promise<unknown | void> | ||
|
||
export const ErrorHandlingOptionsParam = Type.Object({ | ||
retryOnFailure: Type.Object({ | ||
defaultValue: Type.Optional(Type.Boolean()), | ||
hide: Type.Optional(Type.Boolean()), | ||
}), | ||
continueOnFailure: Type.Object({ | ||
defaultValue: Type.Optional(Type.Boolean()), | ||
hide: Type.Optional(Type.Boolean()), | ||
}), | ||
}) | ||
export type ErrorHandlingOptionsParam = Static<typeof ErrorHandlingOptionsParam> | ||
|
||
type CreateActionParams<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = { | ||
/** | ||
* A dummy parameter used to infer {@code PieceAuth} type | ||
*/ | ||
name: string | ||
auth?: PieceAuth | ||
displayName: string | ||
description: string | ||
props: ActionProps | ||
run: ActionRunner<PieceAuth, ActionProps> | ||
test?: ActionRunner<PieceAuth, ActionProps> | ||
requireAuth?: boolean | ||
errorHandlingOptions?: ErrorHandlingOptionsParam | ||
} | ||
|
||
export class IAction<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> implements ActionBase { | ||
constructor( | ||
public readonly name: string, | ||
public readonly displayName: string, | ||
public readonly description: string, | ||
public readonly props: ActionProps, | ||
public readonly run: ActionRunner<PieceAuth, ActionProps>, | ||
public readonly test: ActionRunner<PieceAuth, ActionProps>, | ||
public readonly requireAuth: boolean, | ||
public readonly errorHandlingOptions: ErrorHandlingOptionsParam, | ||
) { } | ||
} | ||
|
||
export type Action< | ||
PieceAuth extends PieceAuthProperty = any, | ||
ActionProps extends InputPropertyMap = any, | ||
> = IAction<PieceAuth, ActionProps> | ||
|
||
export const createAction = < | ||
PieceAuth extends PieceAuthProperty = PieceAuthProperty, | ||
ActionProps extends InputPropertyMap = any | ||
>( | ||
params: CreateActionParams<PieceAuth, ActionProps>, | ||
) => { | ||
return new IAction( | ||
params.name, | ||
params.displayName, | ||
params.description, | ||
params.props, | ||
params.run, | ||
params.test ?? params.run, | ||
params.requireAuth ?? true, | ||
params.errorHandlingOptions ?? { | ||
continueOnFailure: { | ||
defaultValue: false, | ||
}, | ||
retryOnFailure: { | ||
defaultValue: false, | ||
} | ||
}, | ||
) | ||
} | ||
import { Static, Type } from '@sinclair/typebox'; | ||
import { ActionContext } from '../context'; | ||
import { ActionBase, ActionOutput } from '../piece-metadata'; | ||
import { InputPropertyMap } from '../property'; | ||
import { PieceAuthProperty } from '../property/authentication'; | ||
|
||
export type RunFunctionReturnType = { | ||
version: 'v1' | ||
output: Map<string, boolean | undefined | null> | ||
} | ||
|
||
export type ActionRunner<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = | ||
(ctx: ActionContext<PieceAuth, ActionProps>) => Promise<RunFunctionReturnType | unknown | void> | ||
|
||
export const ErrorHandlingOptionsParam = Type.Object({ | ||
retryOnFailure: Type.Object({ | ||
defaultValue: Type.Optional(Type.Boolean()), | ||
hide: Type.Optional(Type.Boolean()), | ||
}), | ||
continueOnFailure: Type.Object({ | ||
defaultValue: Type.Optional(Type.Boolean()), | ||
hide: Type.Optional(Type.Boolean()), | ||
}), | ||
}) | ||
export type ErrorHandlingOptionsParam = Static<typeof ErrorHandlingOptionsParam> | ||
|
||
type CreateActionParams<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = { | ||
/** | ||
* A dummy parameter used to infer {@code PieceAuth} type | ||
*/ | ||
name: string | ||
auth?: PieceAuth | ||
displayName: string | ||
description: string | ||
props: ActionProps | ||
run: ActionRunner<PieceAuth, ActionProps> | ||
test?: ActionRunner<PieceAuth, ActionProps> | ||
requireAuth?: boolean | ||
outputs?: ActionOutput[] | ||
errorHandlingOptions?: ErrorHandlingOptionsParam | ||
} | ||
|
||
export class IAction<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> implements ActionBase { | ||
constructor( | ||
public readonly name: string, | ||
public readonly displayName: string, | ||
public readonly description: string, | ||
public readonly props: ActionProps, | ||
public readonly run: ActionRunner<PieceAuth, ActionProps>, | ||
public readonly test: ActionRunner<PieceAuth, ActionProps>, | ||
public readonly requireAuth: boolean, | ||
public readonly outputs: ActionOutput[], | ||
public readonly errorHandlingOptions: ErrorHandlingOptionsParam, | ||
) { } | ||
} | ||
|
||
export type Action< | ||
PieceAuth extends PieceAuthProperty = any, | ||
ActionProps extends InputPropertyMap = any, | ||
> = IAction<PieceAuth, ActionProps> | ||
|
||
export const createAction = < | ||
PieceAuth extends PieceAuthProperty = PieceAuthProperty, | ||
ActionProps extends InputPropertyMap = any | ||
>( | ||
params: CreateActionParams<PieceAuth, ActionProps>, | ||
) => { | ||
return new IAction( | ||
params.name, | ||
params.displayName, | ||
params.description, | ||
params.props, | ||
params.run, | ||
params.test ?? params.run, | ||
params.requireAuth ?? true, | ||
params.outputs ?? [], | ||
params.errorHandlingOptions ?? { | ||
continueOnFailure: { | ||
defaultValue: false, | ||
}, | ||
retryOnFailure: { | ||
defaultValue: false, | ||
} | ||
}, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a name since you are using it as an id
We can change this to an object where the key is the name of the branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue would be mainly with adding other properties to the branch in case we would need to add this later, if we would go with an object, I suggest this way:
by this way, we would be able to extend the functionality further in the future. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map of objects work