Skip to content

Commit

Permalink
Support timed edge polling and synchronous loading
Browse files Browse the repository at this point in the history
  • Loading branch information
rvowles committed Apr 25, 2024
1 parent fb5832b commit 837a9ea
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 84 deletions.
18 changes: 14 additions & 4 deletions ConsoleAppExample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@ static void Main(string[] args)
MainApp().Wait();
}

async static Task MainApp()
static async Task MainApp()
{
Console.WriteLine("Hello World!");

var config = new EdgeFeatureHubConfig("http://localhost:8064",
"default/b63faed0-1a66-48a6-8679-e362d1d33d7d/QgoWPhyDisWwHY5heuVzXbB41xqXwB*fL7QfLvLTb6OA00Cx8t1");
config.Init();
FeatureLogging.TraceLogger += (sender, s) => Console.WriteLine(s);
FeatureLogging.ErrorLogger += (sender, s) => Console.WriteLine(s);
FeatureLogging.DebugLogger += (sender, s) => Console.WriteLine(s);
FeatureLogging.InfoLogger += (sender, s) => Console.WriteLine(s);

// var serverEvalKey = "d8cdd2f2-6003-4136-ad99-ee05730dfd97/YPSbDzRdrepVoTHUSm0IU8Da0hjJmZYxBG03jCEK";
// var clientEvalKey = "d8cdd2f2-6003-4136-ad99-ee05730dfd97/gC3QHHb6mFQfNSerIItsCUUGZJL8aK*YyIiRr6cn5i5vN7eGtRA";
// var config = new EdgeFeatureHubConfig("http://localhost:8903",
// serverEvalKey).UsePolling(10); // every 10 seconds
var config = new EdgeFeatureHubConfig(); // use environment variables
Console.WriteLine("Waiting for state to arrive");
await config.Init();
Console.WriteLine($"State should have arrived {config.Repository.Readyness}");

Console.WriteLine($"Server evaluated {config.ServerEvaluation}");

Expand Down
22 changes: 20 additions & 2 deletions FeatureHubSDK/ClientContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,16 @@ public ServerEvalFeatureContext(IFeatureRepositoryContext repository, IFeatureHu
_weCreatedSources = false;
}

public override IFeature this[string name] => _repository.GetFeature(name);
public override IFeature this[string name]
{
get
{
// we tell edge to poll if it hasn't already, it also lets the timeout go do an update in the background
_currentEdgeService?.Poll();

return _repository.GetFeature(name);
}
}

public override async Task<IClientContext> Build()
{
Expand Down Expand Up @@ -278,7 +287,16 @@ public class ClientEvalFeatureContext : BaseClientContext
_weCreatedSources = false;
}

public override IFeature this[string name] => _repository.GetFeature(name).WithContext(this);
public override IFeature this[string name]
{
get
{
// we tell edge to poll if it hasn't already, it also lets the timeout go do an update in the background
_edgeService?.Poll();

return _repository.GetFeature(name).WithContext(this);
}
}

#pragma warning disable 1998
public override async Task<IClientContext> Build()
Expand Down
74 changes: 68 additions & 6 deletions FeatureHubSDK/EdgeFeatureHubConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace FeatureHubSDK
{
Expand All @@ -18,14 +19,30 @@ public class FeatureHubConfig
public interface IFeatureHubConfig
{

/// <summary>
/// This is the fully constructed EventSource url
/// </summary>
string Url { get; }
/// <summary>
/// this is the URL of the GET edit service
/// </summary>
string EdgeUrl { get; }
List<string> SdkKeys { get; }

bool ServerEvaluation { get; }

/// <summary>
/// Tells the client to use Polling. Can also be assumed if FEATUREHUB_POLL_TIMEOUT env var is set
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
IFeatureHubConfig UsePolling(int timeout = 360);

/*
* Initialise the configuration. This will kick off the event source to connect and attempt to start
* pushing data into the FeatureHub repository for use in contexts.
*/
void Init();
Task Init();

IFeatureRepositoryContext Repository { get; set; }
IEdgeService EdgeService { get; set; }
Expand Down Expand Up @@ -55,15 +72,24 @@ public class EdgeFeatureHubConfig : IFeatureHubConfig
{
private readonly string _url;
private readonly bool _serverEvaluation;
private readonly string _edgeUrl;
private readonly List<string> _sdkKeys = new List<string>();

public EdgeFeatureHubConfig(string edgeUrl, string sdkKey)
{
_serverEvaluation = sdkKey != null && !sdkKey.Contains("*"); // two part keys are server evaluated
if (edgeUrl == null || sdkKey == null)
{
throw new FeatureHubKeyInvalidException($"The edge url or sdk key are null.");
}

_serverEvaluation = !sdkKey.Contains("*"); // two part keys are server evaluated

if (!sdkKey.Contains("/"))
{
throw new FeatureHubKeyInvalidException($"The SDK key `{sdkKey}` is invalid");
}

_sdkKeys.Add(sdkKey);

if (edgeUrl.EndsWith("/"))
{
Expand All @@ -75,12 +101,26 @@ public EdgeFeatureHubConfig(string edgeUrl, string sdkKey)
edgeUrl = edgeUrl.Substring(0, edgeUrl.Length - "/features".Length);
}

_edgeUrl = edgeUrl; // the API client automatically adds the /features, etc on

_url = edgeUrl + "/features/" + sdkKey;
}

public void Init()
/// <summary>
/// Use this constructor if you set the environment variables.
/// </summary>
public EdgeFeatureHubConfig() : this(Environment.GetEnvironmentVariable("FEATUREHUB_EDGE_URL"),
Environment.GetEnvironmentVariable("FEATUREHUB_API_KEY"))
{

}

public string EdgeUrl => _edgeUrl;
public List<string> SdkKeys => _sdkKeys;

public async Task Init()
{
EdgeService.Poll();
await EdgeService.Poll();
}

public bool ServerEvaluation => _serverEvaluation;
Expand All @@ -93,7 +133,16 @@ public IEdgeService EdgeService
{
if (_edgeService == null)
{
_edgeService = FeatureHubConfig.defaultEdgeProvider(this.Repository, this);
var pollTimeoutDefault = Environment.GetEnvironmentVariable("FEATUREHUB_POLL_TIMEOUT");
if (pollTimeoutDefault != null)
{
_edgeService = new EdgeClientPoll(Repository, this,
int.Parse(pollTimeoutDefault));
}
else
{
_edgeService = FeatureHubConfig.defaultEdgeProvider(this.Repository, this);
}
}

return _edgeService;
Expand All @@ -104,6 +153,12 @@ public IEdgeService EdgeService
}
}

public IFeatureHubConfig UsePolling(int timeout = 360)
{
_edgeService = new EdgeClientPoll(Repository, this, timeout);
return this;
}

private IFeatureRepositoryContext _repository;

public IFeatureRepositoryContext Repository
Expand Down Expand Up @@ -132,7 +187,14 @@ public IClientContext NewContext(IFeatureRepositoryContext repository = null, Ed

if (edgeServiceSource == null)
{
edgeServiceSource = (repo, config) => FeatureHubConfig.defaultEdgeProvider(repo, config);
if (_edgeService != null)
{
edgeServiceSource = (repo, config) => _edgeService;
}
else
{
edgeServiceSource = (repo, config) => FeatureHubConfig.defaultEdgeProvider(repo, config);
}
}

if (_serverEvaluation)
Expand Down
90 changes: 41 additions & 49 deletions FeatureHubSDK/EventServiceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,36 @@ public interface IEdgeService

bool IsRequiresReplacementOnHeaderChange { get; }
void Close();
void Poll();
Task Poll();
}

public static class FeatureLogging
{
// Attach event handler to receive Trace level logs
public static EventHandler<String> TraceLogger;
public static EventHandler<string> TraceLogger = (sender, args) => { };
// Attach event handler to receive Debug level logs
public static EventHandler<String> DebugLogger;
public static EventHandler<string> DebugLogger = (sender, args) => { };
// Attach event handler to receive Info level logs
public static EventHandler<String> InfoLogger;
public static EventHandler<string> InfoLogger = (sender, args) => { };
// Attach event handler to receive Warn level logs
public static EventHandler<string> WarnLogger = (sender, args) => { };
// Attach event handler to receive Error level logs
public static EventHandler<String> ErrorLogger;
public static EventHandler<string> ErrorLogger = (sender, args) => { };
}

class ConfigData
{
[JsonProperty("edge.stale")]
public Boolean stale { get; set; }
public Boolean Stale { get; set; }
}

public class EventServiceListener : IEdgeService
{
private EventSource _eventSource;
private readonly IFeatureHubConfig _featureHost;
private readonly IFeatureRepositoryContext _repository;
private string _xFeatureHubHeader = null;
private bool _closed = false;
private string _xFeatureHubHeader;
private bool _closed;

public EventServiceListener(IFeatureRepositoryContext repository, IFeatureHubConfig config)
{
Expand All @@ -66,21 +68,8 @@ public async Task ContextChange(string newHeader)
if (_eventSource == null || _eventSource.ReadyState == ReadyState.Open || _eventSource.ReadyState == ReadyState.Connecting)
{
_eventSource?.Close();

var promise = new TaskCompletionSource<Readyness>();

EventHandler<Readyness> handler = (sender, r) =>
{
promise.TrySetResult(r);
};

_repository.ReadynessHandler += handler;

Init();

await promise.Task;

_repository.ReadynessHandler -= handler;
_eventSource = null;
await Poll();
}
}
}
Expand All @@ -89,6 +78,9 @@ public async Task ContextChange(string newHeader)
Init();
}
}




public bool ClientEvaluation => !_featureHost.ServerEvaluation;

Expand Down Expand Up @@ -117,8 +109,8 @@ public void Init()
if (_closed) return;

var config = new Configuration(uri: new UriBuilder(_featureHost.Url).Uri,
backoffResetThreshold: TimeSpan.FromSeconds(int.Parse(DefaultEnvConfig("FEATUREHUB_BACKOFF_RETRY_LIMIT", "100"))),
delayRetryDuration: TimeSpan.FromSeconds(int.Parse(DefaultEnvConfig("FEATUREHUB_DELAY_RETRY_MS", "10000"))),
backoffResetThreshold: TimeSpan.FromMinutes(int.Parse(DefaultEnvConfig("FEATUREHUB_BACKOFF_RESET_THRESHOLD", "1"))),
delayRetryDuration: TimeSpan.FromMilliseconds(int.Parse(DefaultEnvConfig("FEATUREHUB_DELAY_RETRY_MS", "10000"))),
requestHeaders: _featureHost.ServerEvaluation ? BuildContextHeader() : null);

if (FeatureLogging.InfoLogger != null)
Expand All @@ -129,26 +121,15 @@ public void Init()
_eventSource = new EventSource(config);
_eventSource.Error += (sender, ex) =>
{
if (ex.Exception is EventSourceServiceUnsuccessfulResponseException result)
{
if (result.StatusCode != 503)
{
_repository.Notify(SSEResultState.Failure, null);
FeatureLogging.ErrorLogger(this, "Server issued a failure, stopping.");
_closed = true;
_eventSource.Close();
}
}
if (!(ex.Exception is EventSourceServiceUnsuccessfulResponseException result)) return;
if (result.StatusCode == 503) return;
_repository.Notify(SSEResultState.Failure, null);
FeatureLogging.ErrorLogger(this, "Server issued a failure, stopping.");
_closed = true;
_eventSource.Close();
};

// if (FeatureLogging.DebugLogger != null)
// {
// _eventSource.Closed += (sender, args) =>
// {
// FeatureLogging.DebugLogger(this, "source closed");
// };
// }


_eventSource.MessageReceived += (sender, args) =>
{
SSEResultState? state;
Expand Down Expand Up @@ -185,7 +166,7 @@ public void Init()
if (args.Message.Data != null)
{
var configData = JsonConvert.DeserializeObject<ConfigData>(args.Message.Data);
if (configData.stale)
if (configData.Stale)
{
if (FeatureLogging.ErrorLogger != null)
{
Expand All @@ -204,9 +185,7 @@ public void Init()
}
if (FeatureLogging.TraceLogger != null)
{
FeatureLogging.TraceLogger(this , $"featurehub: The state was {state} with value {args.Message.Data}");
}
FeatureLogging.TraceLogger(this, $"featurehub: The state was {state} with value {args.Message.Data}");
if (state == null) return;
Expand Down Expand Up @@ -234,11 +213,24 @@ public void Close()
_eventSource.Close();
}

public void Poll()
public async Task Poll()
{
if (_eventSource == null)
{
var promise = new TaskCompletionSource<Readyness>();

EventHandler<Readyness> handler = (sender, r) =>
{
promise.TrySetResult(r);
};

_repository.ReadynessHandler += handler;

Init();

await promise.Task;

_repository.ReadynessHandler -= handler;
}
}
}
Expand Down
Loading

0 comments on commit 837a9ea

Please sign in to comment.