Skip to content

Commit

Permalink
.Net Processes: Fixing an issue with nested processes in Dapr runtime. (
Browse files Browse the repository at this point in the history
#9491)

### Description

This PR addresses an issue when running a nested process in the Dapr
runtime. After this changes, the Dapr runtime fully supports nested
processes with the ability to pass events in both directions.

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [x] I didn't break anyone 😄

---------

Co-authored-by: Chris <[email protected]>
  • Loading branch information
alliscode and crickman authored Oct 31, 2024
1 parent d014534 commit f0de0b6
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 13 deletions.
3 changes: 2 additions & 1 deletion dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata
throw new KernelException($"The initial state provided for step {this.Name} is not of the correct type. The expected type is {userStateType.Name}.");
}

var initialState = this._initialState ?? Activator.CreateInstance(userStateType);
stateObject = (KernelProcessStepState?)Activator.CreateInstance(stateType, this.Name, this.Id);
stateType.GetProperty(nameof(KernelProcessStepState<object>.State))?.SetValue(stateObject, this._initialState);
stateType.GetProperty(nameof(KernelProcessStepState<object>.State))?.SetValue(stateObject, initialState);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ private ValueTask InitializeProcessAsync()
kernel: this._kernel,
parentProcessId: this.Id);

//await process.StartAsync(kernel: this._kernel, keepAlive: true).ConfigureAwait(false);
localStep = process;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, stri

private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSupersteps = 100, bool keepAlive = true, CancellationToken cancellationToken = default)
{
Kernel localKernel = kernel ?? this._kernel;
Queue<ProcessMessage> messageChannel = new();

try
{
// Run the Pregel algorithm until there are no more messages being sent.
Expand Down Expand Up @@ -308,8 +305,7 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
await Task.WhenAll(stepProcessingTasks).ConfigureAwait(false);

// Handle public events that need to be bubbled out of the process.
var eventQueue = this.ProxyFactory.CreateActorProxy<IEventBuffer>(new ActorId(this.Id.GetId()), nameof(EventBufferActor));
var allEvents = await eventQueue.DequeueAllAsync().ConfigureAwait(false);
await this.SendOutgoingPublicEventsAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -354,6 +350,36 @@ private async Task EnqueueExternalMessagesAsync()
}
}

/// <summary>
/// Public events that are produced inside of this process need to be sent to the parent process. This method reads
/// all of the public events from the event buffer and sends them to the targeted step in the parent process.
/// </summary>
private async Task SendOutgoingPublicEventsAsync()
{
// Loop through all steps that are processes and call a function requesting their outgoing events, then queue them up.
if (!string.IsNullOrWhiteSpace(this.ParentProcessId))
{
// Handle public events that need to be bubbled out of the process.
var eventQueue = this.ProxyFactory.CreateActorProxy<IEventBuffer>(new ActorId(this.Id.GetId()), nameof(EventBufferActor));
var allEvents = await eventQueue.DequeueAllAsync().ConfigureAwait(false);

foreach (var e in allEvents)
{
var scopedEvent = this.ScopedEvent(e);
if (this._outputEdges!.TryGetValue(scopedEvent.Id, out List<KernelProcessEdge>? edges) && edges is not null)
{
foreach (var edge in edges)
{
ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, e.Data);
var scopedMessageBufferId = this.ScopedActorId(new ActorId(edge.OutputTarget.StepId), scopeToParent: true);
var messageQueue = this.ProxyFactory.CreateActorProxy<IMessageBuffer>(scopedMessageBufferId, nameof(MessageBufferActor));
await messageQueue.EnqueueAsync(message).ConfigureAwait(false);
}
}
}
}
}

/// <summary>
/// Determines is the end message has been sent to the process.
/// </summary>
Expand Down Expand Up @@ -383,10 +409,28 @@ private async Task<DaprProcessInfo> ToDaprProcessInfoAsync()
/// Scopes the Id of a step within the process to the process.
/// </summary>
/// <param name="actorId">The actor Id to scope.</param>
/// <param name="scopeToParent">Indicates if the Id should be scoped to the parent process.</param>
/// <returns>A new <see cref="ActorId"/> which is scoped to the process.</returns>
private ActorId ScopedActorId(ActorId actorId)
private ActorId ScopedActorId(ActorId actorId, bool scopeToParent = false)
{
if (scopeToParent && string.IsNullOrWhiteSpace(this.ParentProcessId))
{
throw new InvalidOperationException("The parent process Id must be set before scoping to the parent process.");
}

string id = scopeToParent ? this.ParentProcessId! : this.Id.GetId();
return new ActorId($"{id}.{actorId.GetId()}");
}

/// <summary>
/// Generates a scoped event for the step.
/// </summary>
/// <param name="daprEvent">The event.</param>
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
private ProcessEvent ScopedEvent(ProcessEvent daprEvent)
{
return new ActorId($"{this.Id}.{actorId.GetId()}");
Verify.NotNull(daprEvent);
return daprEvent with { Namespace = $"{this.Name}_{this._process!.State.Id}" };
}

#endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,16 +378,14 @@ private Task<FunctionResult> InvokeFunction(KernelFunction function, Kernel kern
/// <param name="daprEvent">The event to emit.</param>
internal async ValueTask EmitEventAsync(ProcessEvent daprEvent)
{
var scopedEvent = this.ScopedEvent(daprEvent);

// Emit the event out of the process (this one) if it's visibility is public.
if (daprEvent.Visibility == KernelProcessEventVisibility.Public)
{
if (this.ParentProcessId is not null)
{
// Emit the event to the parent process
var parentProcess = this.ProxyFactory.CreateActorProxy<IEventBuffer>(new ActorId(this.ParentProcessId), nameof(EventBufferActor));
await parentProcess.EnqueueAsync(scopedEvent).ConfigureAwait(false);
await parentProcess.EnqueueAsync(daprEvent).ConfigureAwait(false);
}
}

Expand All @@ -406,7 +404,7 @@ internal async ValueTask EmitEventAsync(ProcessEvent daprEvent)
/// </summary>
/// <param name="daprEvent">The event.</param>
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
internal ProcessEvent ScopedEvent(ProcessEvent daprEvent)
private ProcessEvent ScopedEvent(ProcessEvent daprEvent)
{
Verify.NotNull(daprEvent, nameof(daprEvent));
return daprEvent with { Namespace = $"{this.Name}_{this.Id}" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.SemanticKernel;
/// </summary>
[KnownType(typeof(KernelProcessEdge))]
[KnownType(typeof(KernelProcessStepState))]
[KnownType(typeof(DaprProcessInfo))]
public record DaprStepInfo
{
/// <summary>
Expand Down

0 comments on commit f0de0b6

Please sign in to comment.