Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: piece branching #4599

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dadcf4f
feat: make piece able to be branched from
islamaf Apr 30, 2024
81a9761
feat: make wait for approval action branchable
islamaf Apr 30, 2024
8c97cf8
fix: get branchable option from action, not piece
islamaf Apr 30, 2024
98e36c3
feat: use outputs array to define the batches to run
islamaf May 1, 2024
94fe22d
fix: drawing line component with button after piece
islamaf May 1, 2024
30356eb
feat: handle the outputs returned by the wait for approval action
islamaf May 1, 2024
cb59db9
fix: handling only approved or not conditions
islamaf May 1, 2024
eb41659
feat: approval controller and service
islamaf May 1, 2024
1424846
feat: add approval response in shared
islamaf May 1, 2024
7373ad8
feat: approval url generation
islamaf May 1, 2024
779ba07
feat: use approval url in approval piece
islamaf May 1, 2024
2b253de
fix: revert approval controller & service
islamaf May 2, 2024
620d18b
fix: remove generateApprovalUrl and use Map as the approval type
islamaf May 2, 2024
1cab75e
fix: run function return type
islamaf May 2, 2024
05bf68c
fix: run function return type
islamaf May 2, 2024
2e47e03
Merge branch 'feat/piece-branching' of https://github.com/activepiece…
islamaf May 2, 2024
bf11b2f
fix: remove unknown from RunFunctionReturnType
islamaf May 2, 2024
264f950
feat: make a return function for the piece branching response
islamaf May 2, 2024
bdbc66d
feat: piece branching docs
islamaf May 2, 2024
b25f628
fix: operation object as inline parameter in branchedPieceResponse
islamaf May 2, 2024
8d5cc10
feat: piece branching versioning
islamaf May 2, 2024
317c2fa
feat: adding versioning in docs
islamaf May 2, 2024
b315441
feat: add RunBranchablePieceWithVersion type
islamaf May 2, 2024
5c13741
fix: get outputs from piece action
islamaf May 8, 2024
b260819
fix: make default outputs array as empty array
islamaf May 8, 2024
94ede0f
feat: pass the outputs from the piece action to action settings on ne…
islamaf May 8, 2024
528c5ae
feat: fully working version of boolean piece branching
islamaf May 12, 2024
581fbf5
feat: get all children for branched piece
islamaf May 13, 2024
625e86a
fix: capitalize branch label
islamaf May 13, 2024
8cf826e
fix: use action children object instead of separate actions
islamaf May 13, 2024
7fa641f
fix: the outputs type to object map
islamaf May 17, 2024
8af8123
fix: dynamic branch children and remove hardcoded
islamaf May 17, 2024
aa742c8
fix: remove unnecessary
islamaf May 17, 2024
a706879
fix: remove outputs and make children optional
islamaf May 17, 2024
24d96fe
chore: bump package.json for approval piece
islamaf May 17, 2024
f6d11e5
feat: pass pieces to flow drawer construct
islamaf May 19, 2024
138bd25
feat: add action outputs in inputUiInfo
islamaf May 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/developers/piece-reference/piece-branching.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
---
title: 'Piece Branching'
icon: 'hammer'
description: 'Make a piece branchable'
---
Activepieces allows pieces to be directly branched from depending on the action by adding the expected outputs in the `outputs` array of objects when creating an action.

### Waiting for approval action

We will see the branching feature within the context of the approval process, using the waiting for approval action as an example.

**Example:**

```typescript
import { createAction } from '@activepieces/pieces-framework';
import { ExecutionType, PauseType, branchedPieceResponse } from '@activepieces/shared';

export const waitForApprovalLink = createAction({
name: 'wait_for_approval',
displayName: 'Wait for Approval',
description: 'Pauses the flow and wait for the approval from the user',
props: {},
errorHandlingOptions: {
continueOnFailure: {
hide: true,
},
retryOnFailure: {
hide: true,
},
},
// Here we define the names of the outputs expected to be branched from the piece
outputs: [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a name since you are using it as an id

We can change this to an object where the key is the name of the branch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue would be mainly with adding other properties to the branch in case we would need to add this later, if we would go with an object, I suggest this way:

{
    branchName: {
       expectedOutput: true,
       ...otherProperties,
    },
} 

by this way, we would be able to extend the functionality further in the future. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map of objects work

{ name: 'approved' },
{ name: 'denied' }
],
async run(ctx) {
if (ctx.executionType === ExecutionType.BEGIN) {
ctx.run.pause({
pauseMetadata: {
type: PauseType.WEBHOOK,
response: {},
},
});

// Return an empty Map while execution is paused
return branchedPieceResponse()
} else {
// Return a Map populated with the expected returned data
return branchedPieceResponse({
version: 'v1', // Optional: adding a version for the expected response. Defaults to 'v1'.
islamaf marked this conversation as resolved.
Show resolved Hide resolved
approved: ctx.resumePayload.queryParams['action'] === 'approve',
denied: ctx.resumePayload.queryParams['action'] !== 'approve',
})
}
},
});
```
63 changes: 62 additions & 1 deletion packages/engine/src/lib/handler/piece-executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { URL } from 'url'
import { ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework'
import { Action, ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, RunFunctionReturnType, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework'
import { ActionType, assertNotNullOrUndefined, AUTHENTICATION_PROPERTY_NAME, ExecutionType, FlowRunStatus, GenericStepOutput, isNil, PauseType, PieceAction, StepOutputStatus } from '@activepieces/shared'
import { continueIfFailureHandler, handleExecutionError, runWithExponentialBackoff } from '../helper/error-handling'
import { pieceLoader } from '../helper/piece-loader'
Expand All @@ -9,9 +9,19 @@ import { createContextStore } from '../services/storage.service'
import { ActionHandler, BaseExecutor } from './base-executor'
import { EngineConstants } from './context/engine-constants'
import { ExecutionVerdict, FlowExecutorContext } from './context/flow-execution-context'
import { flowExecutor } from './flow-executor'

type HookResponse = { stopResponse: StopHookParams | undefined, pauseResponse: PauseHookParams | undefined, tags: string[], stopped: boolean, paused: boolean }

type RunBranchablePieceWithVersion = {
version: 'v1'
pieceOutput: RunFunctionReturnType
executionState: FlowExecutorContext
action: PieceAction
constants: EngineConstants
stepOutput: GenericStepOutput<ActionType.PIECE, unknown>
}

export const pieceExecutor: BaseExecutor<PieceAction> = {
async handle({
action,
Expand Down Expand Up @@ -128,6 +138,18 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
pauseMetadata: hookResponse.pauseResponse.pauseMetadata,
})
}
if (action.children && hasBranches(pieceAction)) {
const pieceOutput = output as RunFunctionReturnType

return await runBranchablePieceWithVersion({
executionState,
pieceOutput,
action,
constants,
stepOutput,
version: pieceOutput.version,
})
}

return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined)
}
Expand All @@ -144,6 +166,45 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
}
}

function hasBranches(pieceAction: Action): boolean {
return !isNil(pieceAction.outputs) && pieceAction.outputs.length > 1
}

async function runBranchablePieceWithVersion({
pieceOutput,
executionState,
action,
constants,
stepOutput,
version = 'v1',
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> {
const versions = {
v1: async (): Promise<FlowExecutorContext> => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is disaster, If approval piece is changed that mean we need to change the engine and make engine doesn't work at all.

We have documented how branching should work in the framework but this will not work at all as it require approved to be hardcoded and onSuccessAction

It can be rewritten like that

async function runBranchablePieceWithVersion({
    pieceOutput,
    executionState,
    action,
    constants,
    stepOutput,
    version = 'v1',
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> {
    const versions = {
        v1: async (): Promise<FlowExecutorContext> => {
            let newExecutionContext = executionState
            for (const [branchKey, _branchValue] of pieceOutput.output.entries()) {
                const childAction = action.children[branchKey]
                if (isNil(childAction)) {
                    continue
                }
                newExecutionContext = await flowExecutor.execute({
                    action: childAction,
                    executionState: newExecutionContext,
                    constants,
                })
                break;
            }
            return newExecutionContext.upsertStep(action.name, stepOutput
                .setOutput(pieceOutput.output))
                .increaseTask()
                .setVerdict(ExecutionVerdict.RUNNING, undefined)
        },
    }

    return versions[version]()
}

let newExecutionContext = executionState
let outputValue = undefined
for (const [k, v] of pieceOutput.output.entries()) {
if (v === false || v === undefined || isNil(v)) {
continue
}

if (isNil(action.children)) {
continue
}

newExecutionContext = await flowExecutor.execute({
action: action.children.filter(child => child.name === k)[0].action,
executionState: newExecutionContext,
constants,
})
outputValue = v
}
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(outputValue)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined)
},
}

return versions[version]()
}

const createTagsManager = (hookResponse: HookResponse): TagsManager => {
return {
add: async (params: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
import { createAction } from '@activepieces/pieces-framework';
import { ExecutionType, PauseType } from '@activepieces/shared';

export const waitForApprovalLink = createAction({
name: 'wait_for_approval',
displayName: 'Wait for Approval',
description: 'Pauses the flow and wait for the approval from the user',
props: {},
errorHandlingOptions: {
continueOnFailure: {
hide: true,
},
retryOnFailure: {
hide: true,
},
},
async run(ctx) {
if (ctx.executionType === ExecutionType.BEGIN) {
ctx.run.pause({
pauseMetadata: {
type: PauseType.WEBHOOK,
response: {}
},
});

return {
approved: true,
};
} else {
return {
approved: ctx.resumePayload.queryParams['action'] === 'approve',
};
}
},
});
import { createAction } from '@activepieces/pieces-framework';
import { ExecutionType, PauseType, branchedPieceResponse } from '@activepieces/shared';

export const waitForApprovalLink = createAction({
name: 'wait_for_approval',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will also have to bump package.json

displayName: 'Wait for Approval',
description: 'Pauses the flow and wait for the approval from the user',
props: {},
errorHandlingOptions: {
continueOnFailure: {
hide: true,
},
retryOnFailure: {
hide: true,
},
},
outputs: [
{ name: 'approved' },
{ name: 'denied' }
],
async run(ctx) {
if (ctx.executionType === ExecutionType.BEGIN) {
ctx.run.pause({
pauseMetadata: {
type: PauseType.WEBHOOK,
response: {},
},
});

return branchedPieceResponse()
} else {
return branchedPieceResponse({
approved: ctx.resumePayload.queryParams['action'] === 'approve',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are depending on the value to decide where to branch this is wrong, what if both were true.

What if i want to include information like it were approved by who? I think you should use key as to execute branch or not this can be rewritten like that

async function runBranchablePieceWithVersion({
    pieceOutput,
    executionState,
    action,
    constants,
    stepOutput,
    version = 'v1',
}: RunBranchablePieceWithVersion): Promise<FlowExecutorContext> {
    const versions = {
        v1: async (): Promise<FlowExecutorContext> => {
            let newExecutionContext = executionState
            for (const [branchKey, _branchValue] of pieceOutput.output.entries()) {
                const childAction = action.children[branchKey]
                if (isNil(childAction)) {
                    continue
                }
                newExecutionContext = await flowExecutor.execute({
                    action: childAction,
                    executionState: newExecutionContext,
                    constants,
                })
                break;
            }
            return newExecutionContext.upsertStep(action.name, stepOutput
                .setOutput(pieceOutput.output))
                .increaseTask()
                .setVerdict(ExecutionVerdict.RUNNING, undefined)
        },
    }

    return versions[version]()
}

Also you can change generate approval link to

 async run(ctx) {
    return {
      approveLink: ctx.generateResumeUrl({
        queryParams: { action: 'approve' },
      }),
      denyLink: ctx.generateResumeUrl({
        queryParams: { action: 'deny' },
      }),
    };
    ```

denied: ctx.resumePayload.queryParams['action'] !== 'approve',
abuaboud marked this conversation as resolved.
Show resolved Hide resolved
})
}
},
});
164 changes: 86 additions & 78 deletions packages/pieces/community/framework/src/lib/action/action.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,86 @@
import { Static, Type } from '@sinclair/typebox';
import { ActionContext } from '../context';
import { ActionBase } from '../piece-metadata';
import { InputPropertyMap } from '../property';
import { PieceAuthProperty } from '../property/authentication';

export type ActionRunner<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> =
(ctx: ActionContext<PieceAuth, ActionProps>) => Promise<unknown | void>

export const ErrorHandlingOptionsParam = Type.Object({
retryOnFailure: Type.Object({
defaultValue: Type.Optional(Type.Boolean()),
hide: Type.Optional(Type.Boolean()),
}),
continueOnFailure: Type.Object({
defaultValue: Type.Optional(Type.Boolean()),
hide: Type.Optional(Type.Boolean()),
}),
})
export type ErrorHandlingOptionsParam = Static<typeof ErrorHandlingOptionsParam>

type CreateActionParams<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = {
/**
* A dummy parameter used to infer {@code PieceAuth} type
*/
name: string
auth?: PieceAuth
displayName: string
description: string
props: ActionProps
run: ActionRunner<PieceAuth, ActionProps>
test?: ActionRunner<PieceAuth, ActionProps>
requireAuth?: boolean
errorHandlingOptions?: ErrorHandlingOptionsParam
}

export class IAction<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> implements ActionBase {
constructor(
public readonly name: string,
public readonly displayName: string,
public readonly description: string,
public readonly props: ActionProps,
public readonly run: ActionRunner<PieceAuth, ActionProps>,
public readonly test: ActionRunner<PieceAuth, ActionProps>,
public readonly requireAuth: boolean,
public readonly errorHandlingOptions: ErrorHandlingOptionsParam,
) { }
}

export type Action<
PieceAuth extends PieceAuthProperty = any,
ActionProps extends InputPropertyMap = any,
> = IAction<PieceAuth, ActionProps>

export const createAction = <
PieceAuth extends PieceAuthProperty = PieceAuthProperty,
ActionProps extends InputPropertyMap = any
>(
params: CreateActionParams<PieceAuth, ActionProps>,
) => {
return new IAction(
params.name,
params.displayName,
params.description,
params.props,
params.run,
params.test ?? params.run,
params.requireAuth ?? true,
params.errorHandlingOptions ?? {
continueOnFailure: {
defaultValue: false,
},
retryOnFailure: {
defaultValue: false,
}
},
)
}
import { Static, Type } from '@sinclair/typebox';
import { ActionContext } from '../context';
import { ActionBase, ActionOutput } from '../piece-metadata';
import { InputPropertyMap } from '../property';
import { PieceAuthProperty } from '../property/authentication';

export type RunFunctionReturnType = {
version: 'v1'
output: Map<string, boolean | undefined | null>
}

export type ActionRunner<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> =
(ctx: ActionContext<PieceAuth, ActionProps>) => Promise<RunFunctionReturnType | unknown | void>

export const ErrorHandlingOptionsParam = Type.Object({
retryOnFailure: Type.Object({
defaultValue: Type.Optional(Type.Boolean()),
hide: Type.Optional(Type.Boolean()),
}),
continueOnFailure: Type.Object({
defaultValue: Type.Optional(Type.Boolean()),
hide: Type.Optional(Type.Boolean()),
}),
})
export type ErrorHandlingOptionsParam = Static<typeof ErrorHandlingOptionsParam>

type CreateActionParams<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> = {
/**
* A dummy parameter used to infer {@code PieceAuth} type
*/
name: string
auth?: PieceAuth
displayName: string
description: string
props: ActionProps
run: ActionRunner<PieceAuth, ActionProps>
test?: ActionRunner<PieceAuth, ActionProps>
requireAuth?: boolean
outputs?: ActionOutput[]
errorHandlingOptions?: ErrorHandlingOptionsParam
}

export class IAction<PieceAuth extends PieceAuthProperty, ActionProps extends InputPropertyMap> implements ActionBase {
constructor(
public readonly name: string,
public readonly displayName: string,
public readonly description: string,
public readonly props: ActionProps,
public readonly run: ActionRunner<PieceAuth, ActionProps>,
public readonly test: ActionRunner<PieceAuth, ActionProps>,
public readonly requireAuth: boolean,
public readonly outputs: ActionOutput[],
public readonly errorHandlingOptions: ErrorHandlingOptionsParam,
) { }
}

export type Action<
PieceAuth extends PieceAuthProperty = any,
ActionProps extends InputPropertyMap = any,
> = IAction<PieceAuth, ActionProps>

export const createAction = <
PieceAuth extends PieceAuthProperty = PieceAuthProperty,
ActionProps extends InputPropertyMap = any
>(
params: CreateActionParams<PieceAuth, ActionProps>,
) => {
return new IAction(
params.name,
params.displayName,
params.description,
params.props,
params.run,
params.test ?? params.run,
params.requireAuth ?? true,
params.outputs ?? [{ name: 'main' }],
params.errorHandlingOptions ?? {
continueOnFailure: {
defaultValue: false,
},
retryOnFailure: {
defaultValue: false,
}
},
)
}