From f0de0b614607db84b162435a82c141f5b710769c Mon Sep 17 00:00:00 2001 From: Ben Thomas Date: Thu, 31 Oct 2024 12:27:58 -0700 Subject: [PATCH] .Net Processes: Fixing an issue with nested processes in Dapr runtime. (#9491) ### Description This PR addresses an issue when running a nested process in the Dapr runtime. After this changes, the Dapr runtime fully supports nested processes with the ability to pass events in both directions. ### Contribution Checklist - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone :smile: --------- Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- .../Process.Core/ProcessStepBuilder.cs | 3 +- .../Process.LocalRuntime/LocalProcess.cs | 1 - .../Actors/ProcessActor.cs | 58 ++++++++++++++++--- .../Process.Runtime.Dapr/Actors/StepActor.cs | 6 +- .../Process.Runtime.Dapr/DaprStepInfo.cs | 1 + 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs index 27db41d73c0b..5b917fd3fa8d 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs @@ -263,8 +263,9 @@ internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata throw new KernelException($"The initial state provided for step {this.Name} is not of the correct type. The expected type is {userStateType.Name}."); } + var initialState = this._initialState ?? Activator.CreateInstance(userStateType); stateObject = (KernelProcessStepState?)Activator.CreateInstance(stateType, this.Name, this.Id); - stateType.GetProperty(nameof(KernelProcessStepState.State))?.SetValue(stateObject, this._initialState); + stateType.GetProperty(nameof(KernelProcessStepState.State))?.SetValue(stateObject, initialState); } else { diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 3b9edf23651a..1b4ad7c1de07 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -188,7 +188,6 @@ private ValueTask InitializeProcessAsync() kernel: this._kernel, parentProcessId: this.Id); - //await process.StartAsync(kernel: this._kernel, keepAlive: true).ConfigureAwait(false); localStep = process; } else diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs index e13a33997d4a..51f9098d7b99 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs @@ -271,9 +271,6 @@ private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, stri private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSupersteps = 100, bool keepAlive = true, CancellationToken cancellationToken = default) { - Kernel localKernel = kernel ?? this._kernel; - Queue messageChannel = new(); - try { // Run the Pregel algorithm until there are no more messages being sent. @@ -308,8 +305,7 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep await Task.WhenAll(stepProcessingTasks).ConfigureAwait(false); // Handle public events that need to be bubbled out of the process. - var eventQueue = this.ProxyFactory.CreateActorProxy(new ActorId(this.Id.GetId()), nameof(EventBufferActor)); - var allEvents = await eventQueue.DequeueAllAsync().ConfigureAwait(false); + await this.SendOutgoingPublicEventsAsync().ConfigureAwait(false); } } catch (Exception ex) @@ -354,6 +350,36 @@ private async Task EnqueueExternalMessagesAsync() } } + /// + /// Public events that are produced inside of this process need to be sent to the parent process. This method reads + /// all of the public events from the event buffer and sends them to the targeted step in the parent process. + /// + private async Task SendOutgoingPublicEventsAsync() + { + // Loop through all steps that are processes and call a function requesting their outgoing events, then queue them up. + if (!string.IsNullOrWhiteSpace(this.ParentProcessId)) + { + // Handle public events that need to be bubbled out of the process. + var eventQueue = this.ProxyFactory.CreateActorProxy(new ActorId(this.Id.GetId()), nameof(EventBufferActor)); + var allEvents = await eventQueue.DequeueAllAsync().ConfigureAwait(false); + + foreach (var e in allEvents) + { + var scopedEvent = this.ScopedEvent(e); + if (this._outputEdges!.TryGetValue(scopedEvent.Id, out List? edges) && edges is not null) + { + foreach (var edge in edges) + { + ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, e.Data); + var scopedMessageBufferId = this.ScopedActorId(new ActorId(edge.OutputTarget.StepId), scopeToParent: true); + var messageQueue = this.ProxyFactory.CreateActorProxy(scopedMessageBufferId, nameof(MessageBufferActor)); + await messageQueue.EnqueueAsync(message).ConfigureAwait(false); + } + } + } + } + } + /// /// Determines is the end message has been sent to the process. /// @@ -383,10 +409,28 @@ private async Task ToDaprProcessInfoAsync() /// Scopes the Id of a step within the process to the process. /// /// The actor Id to scope. + /// Indicates if the Id should be scoped to the parent process. /// A new which is scoped to the process. - private ActorId ScopedActorId(ActorId actorId) + private ActorId ScopedActorId(ActorId actorId, bool scopeToParent = false) + { + if (scopeToParent && string.IsNullOrWhiteSpace(this.ParentProcessId)) + { + throw new InvalidOperationException("The parent process Id must be set before scoping to the parent process."); + } + + string id = scopeToParent ? this.ParentProcessId! : this.Id.GetId(); + return new ActorId($"{id}.{actorId.GetId()}"); + } + + /// + /// Generates a scoped event for the step. + /// + /// The event. + /// A with the correctly scoped namespace. + private ProcessEvent ScopedEvent(ProcessEvent daprEvent) { - return new ActorId($"{this.Id}.{actorId.GetId()}"); + Verify.NotNull(daprEvent); + return daprEvent with { Namespace = $"{this.Name}_{this._process!.State.Id}" }; } #endregion diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs index efe8d5007612..9b627ad4d43f 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs @@ -378,8 +378,6 @@ private Task InvokeFunction(KernelFunction function, Kernel kern /// The event to emit. internal async ValueTask EmitEventAsync(ProcessEvent daprEvent) { - var scopedEvent = this.ScopedEvent(daprEvent); - // Emit the event out of the process (this one) if it's visibility is public. if (daprEvent.Visibility == KernelProcessEventVisibility.Public) { @@ -387,7 +385,7 @@ internal async ValueTask EmitEventAsync(ProcessEvent daprEvent) { // Emit the event to the parent process var parentProcess = this.ProxyFactory.CreateActorProxy(new ActorId(this.ParentProcessId), nameof(EventBufferActor)); - await parentProcess.EnqueueAsync(scopedEvent).ConfigureAwait(false); + await parentProcess.EnqueueAsync(daprEvent).ConfigureAwait(false); } } @@ -406,7 +404,7 @@ internal async ValueTask EmitEventAsync(ProcessEvent daprEvent) /// /// The event. /// A with the correctly scoped namespace. - internal ProcessEvent ScopedEvent(ProcessEvent daprEvent) + private ProcessEvent ScopedEvent(ProcessEvent daprEvent) { Verify.NotNull(daprEvent, nameof(daprEvent)); return daprEvent with { Namespace = $"{this.Name}_{this.Id}" }; diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs index 05d53042705d..a5d63077a08b 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs @@ -12,6 +12,7 @@ namespace Microsoft.SemanticKernel; /// [KnownType(typeof(KernelProcessEdge))] [KnownType(typeof(KernelProcessStepState))] +[KnownType(typeof(DaprProcessInfo))] public record DaprStepInfo { ///