Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: Process Cloud Events - Publish Events Abstractions #10477

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

esttenorio
Copy link
Contributor

@esttenorio esttenorio commented Feb 11, 2025

Motivation and Context

Description

Changes

  • more plumbing:
    • piping source event name info when emitting an event
    • capturing event name when linking step/process edges
  • creating new step family components - ProxyStep (following similar pattern used for MapStep by @crickman )
    • ProcessProxyBuilder
    • KernelProcessProxy
    • LocalProxy
    • ProxyActor/IProxy/DaprProxyInfo
  • updating process cloud events uts
  • adding new uts in

Fixes #10328

Contribution Checklist

@markwallace-microsoft markwallace-microsoft added the .NET Issue or Pull requests regarding .NET code label Feb 11, 2025
@github-actions github-actions bot changed the title Process Cloud Events - Publish Events Abstractions .Net: Process Cloud Events - Publish Events Abstractions Feb 11, 2025
/// <param name="proxyEvent">event data passed to proxy step</param>
/// <returns></returns>
[KernelFunction(Functions.EmitExternalEvent)]
public async Task EmitExternalEventAsync(KernelProcessStepContext context, KernelProcessProxyMessage proxyEvent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could remove the async qualifier from the method and just return the Task generated by EmitExternalEventAsync. (Same for EmitInternalProcessEventAsync) I'd expect the compiler to result in the same code, but with no await you know for sure there's no extra byte-code.

@@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.SemanticKernel.Process.Internal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting a public class inside the *.Internal namespace may be a mismatch.

/// <summary>
/// Factory that helps create <see cref="KernelProcessProxyMessage"/>
/// </summary>
public static class KernelProcessProxyMessageFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be internal to Process.Core. (I only see it consumed on ProcessStepBuilder.)


// TODO-estenori: these 2 could potentially be merged into 1 dict later
// maybe sourceStep is not needed? need to check
this.EventTopicMap[eventData.EventName] = topicName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that these can be merged. I'd do so now since it also impacts the public surface of KernelProcessProxyStateMetadata.

this.EventTopicMap[eventData.EventName] = topicName;
this.EventDataMap[eventData.EventName] = eventData.EventId;

this.ExternalTopicUsage[topicName] = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be HashSet<string> and void the faux value.

/// <inheritdoc/>
internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata? stateMetadata = null)
{
KernelProcessProxyStateMetadata? proxyMetadata = new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extraneous ? (KernelProcessProxyStateMetadata? => KernelProcessProxyStateMetadata)

throw new KernelException("No IExternalKernelProcessMessageChannel found, need at least 1 to emit external messages");
}

await this.ExternalMessageChannel.Initialize().ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can different proxy-steps share the same channel? I see this is on the base step and passed down from the process-context. How many times will it be initialized?

/// Provides functionality to allow emitting external messages from within the SK
/// process.
/// </summary>
public sealed class ProcessProxyBuilder : ProcessStepBuilder<KernelProxyStep>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to lay a sample down to illustrate how this is being used. I'm curious because pretty much everything is internal. (Currious on how the maps are utilized)

@esttenorio esttenorio self-assigned this Feb 11, 2025
@@ -118,7 +119,7 @@ internal IEnumerable<KernelProcessEdge> GetEdgeForEvent(string eventId)
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <returns>A <see cref="ValueTask"/></returns>
public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
public virtual ValueTask EmitEventAsync(KernelProcessEvent processEvent)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

virtual

not needed, should remove virtual

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
.NET Issue or Pull requests regarding .NET code
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

Cloud Events - new Step type dedicated to work as Proxy Step abstraction
3 participants