Skip to content

Conversation

Copy link

Copilot AI commented Aug 8, 2025

This PR adds support for the ForkJoin operator in conductor-sharp, providing a simpler alternative to the existing DynamicForkJoin for cases where you just want to run tasks in parallel for performance reasons.

What's Added

ForkJoin Task Implementation

  • ForkJoinTaskModel and ForkJoinInput models for type-safe workflow definition
  • ForkJoinTaskBuilder that generates proper FORK_JOIN and JOIN tasks
  • Extension method AddTask() for fluent workflow building

Key Features

  • Static fork definition (vs dynamic at runtime)
  • Simple parallel execution of predefined task branches
  • Proper mapping of forkTasks and joinOn properties
  • Full integration with existing workflow builder patterns

Usage Example

public class ParallelWorkflow : Workflow<ParallelWorkflow, WorkflowInput, WorkflowOutput>
{
    public ForkJoinTaskModel ParallelTasks { get; set; }

    public override void BuildDefinition()
    {
        _builder.AddTask(wf => wf.ParallelTasks, wf => new ForkJoinInput()
        {
            ForkTasks = new[]
            {
                new WorkflowTask[]
                {
                    new()
                    {
                        Name = "get_customer_task",
                        TaskReferenceName = "get_customer",
                        Type = "SIMPLE",
                        WorkflowTaskType = WorkflowTaskType.SIMPLE,
                        InputParameters = new Dictionary<string, object> { { "customerId", wf.WorkflowInput.CustomerId } }
                    }
                },
                new WorkflowTask[]
                {
                    new()
                    {
                        Name = "prepare_email_task",
                        TaskReferenceName = "prepare_email",
                        Type = "SIMPLE",
                        WorkflowTaskType = WorkflowTaskType.SIMPLE,
                        InputParameters = new Dictionary<string, object> { { "templateId", "welcome" } }
                    }
                }
            },
            JoinOn = new[] { "get_customer", "prepare_email" }
        });
    }
}

Generated Workflow Structure

The ForkJoin implementation generates two tasks:

  1. FORK Task: Contains the parallel task branches in forkTasks property
  2. JOIN Task: Waits for completion of specified tasks in joinOn property

This follows the standard Conductor ForkJoin pattern as documented at https://docs.conductor-oss.org/documentation/configuration/workflowdef/operators/fork-task.html

Testing

  • Complete test workflow demonstrating usage
  • Integration test verifying correct JSON workflow definition generation
  • All existing tests continue to pass (33/33 passing)

This implementation provides a much simpler API compared to DynamicForkJoin when you have a known set of tasks to run in parallel, addressing the performance use case mentioned in the issue.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • esm.ubuntu.com
    • Triggering command: /usr/lib/apt/methods/https (dns block)

If you need me to access, download, or install something from one of these locations, you can either:


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

@Roruchi
Copy link
Owner

Roruchi commented Aug 8, 2025

Can you make sure it does not use objects for the fork tasks, but instead i can add it through builder style. Like in the do-while task builder i.e. this is some code on how we can potentially use it:

builder.Fork(
            wf => wf.ForkReference
            forkBuilder =>
            {
forkBuilder.AddFork(taskBuilder => {
  taskBuilder.AddTask(
                    wf => wf.Task1,
                    wf => new Task!Input
                    {
                        Name = wf.Task0.Ouput
                    }
                );
                taskBuilder.AddTask(
                    wf => wf.Task2,
                    wf => new Task2Input
                    {
                         Name = wf.Task1.Ouput
                    }
                );
            }
} 
        );
        

Copilot AI changed the title [WIP] Can you add support for the forkJoin in conductor sharp based on the https://docs.conductor-oss.org/documentation/configuration/workflowdef/operators/fork-task.html There already is a dynamic join but that is too complex when you just want to run two t... Add ForkJoin task support for simplified parallel execution Aug 8, 2025
Copilot AI requested a review from Roruchi August 8, 2025 10:23
Copy link
Owner

@Roruchi Roruchi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure it does not use objects for the fork tasks, but instead i can add it through builder style. Like in the do-while task builder i.e. this is some code on how we can potentially use it:

builder.Fork(
wf => wf.ForkReference
forkBuilder =>
{
forkBuilder.AddFork(taskBuilder => {
taskBuilder.AddTask(
wf => wf.Task1,
wf => new Task!Input
{
Name = wf.Task0.Ouput
}
);
taskBuilder.AddTask(
wf => wf.Task2,
wf => new Task2Input
{
Name = wf.Task1.Ouput
}
);
}
}
);

@Roruchi Roruchi marked this pull request as ready for review August 8, 2025 11:04
Copilot AI and others added 3 commits August 8, 2025 11:21
…1-343b0f7eaf1d

Add builder-style API for ForkJoin tasks with fluent subtask configuration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants