Skip to content

Commit

Permalink
Cleaned up .... Made the exceptions thrown clear, added unit tests fo…
Browse files Browse the repository at this point in the history
…r failure cases.
  • Loading branch information
Drawaes committed Mar 8, 2017
1 parent 11a4ffe commit e14b440
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 36 deletions.
56 changes: 33 additions & 23 deletions src/CondenserDotNet.Client/Services/BlockingWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
using System.Threading.Tasks;
using CondenserDotNet.Core;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging;

namespace CondenserDotNet.Client.Services
{
internal class BlockingWatcher<T> where T : class
{
private readonly AsyncManualResetEvent<bool> _haveFirstResults = new AsyncManualResetEvent<bool>();
private readonly Func<string, Task<HttpResponseMessage>> _client;
private readonly Action<T> _onNew;
private T _instances;
private WatcherState _state = WatcherState.NotInitialized;
private static int s_getServiceDelay = 2000;

public BlockingWatcher(Func<string, Task<HttpResponseMessage>> client, Action<T> onNew = null)
private readonly ILogger _logger;
private CancellationToken _token;

public BlockingWatcher(Func<string, Task<HttpResponseMessage>> client, ILogger logger, CancellationToken token)
{
_token = token;
_client = client;
_onNew = onNew;
_logger = logger;
}

public async Task<T> ReadAsync()
Expand All @@ -31,7 +34,7 @@ public async Task<T> ReadAsync()
var taskThatFinished = await Task.WhenAny(delayTask, _haveFirstResults.WaitAsync());
if (delayTask == taskThatFinished)
{
throw new System.Net.Sockets.SocketException();
throw new NoConsulConnectionException();
}
instances = Volatile.Read(ref _instances);
}
Expand All @@ -40,32 +43,39 @@ public async Task<T> ReadAsync()

public async Task WatchLoop()
{
try
while (true)
{
string consulIndex = "0";
while (true)
try
{
var result = await _client(consulIndex);
if (!result.IsSuccessStatusCode)
string consulIndex = "0";
while (!_token.IsCancellationRequested)
{
if (_state == WatcherState.UsingLiveValues)
var result = await _client(consulIndex);
if (!result.IsSuccessStatusCode)
{
_state = WatcherState.UsingCachedValues;
if (_state == WatcherState.UsingLiveValues)
{
_state = WatcherState.UsingCachedValues;
}
await Task.Delay(1000);
continue;
}
await Task.Delay(1000);
continue;
consulIndex = result.GetConsulIndex();
var content = await result.Content.ReadAsStringAsync();
var instance = JsonConvert.DeserializeObject<T>(content);
Interlocked.Exchange(ref _instances, instance);
_state = WatcherState.UsingLiveValues;
_haveFirstResults.Set(true);
}
consulIndex = result.GetConsulIndex();
var content = await result.Content.ReadAsStringAsync();
var instance = JsonConvert.DeserializeObject<T>(content);
Interlocked.Exchange(ref _instances, instance);
_state = WatcherState.UsingLiveValues;
_haveFirstResults.Set(true);
_onNew?.Invoke(instance);
}
catch (TaskCanceledException) { /*nom nom */}
catch (ObjectDisposedException) { /*nom nom */}
catch (Exception ex)
{
_logger?.LogWarning(0, ex, "Error in blocking watcher watching consul");
}
await Task.Delay(s_getServiceDelay);
}
catch (TaskCanceledException) { /*nom nom */}
catch (ObjectDisposedException) { /*nom nom */}
}
}
}
10 changes: 10 additions & 0 deletions src/CondenserDotNet.Client/Services/NoConsulConnectionException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CondenserDotNet.Client.Services
{
public class NoConsulConnectionException:Exception
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
{
var currentUri = request.RequestUri;
var serviceInstance = await _serviceRegistry.GetServiceInstanceAsync(currentUri.Host);
if(serviceInstance == null)
{
throw new NoServiceInstanceFoundException(currentUri.Host, null);
}
request.RequestUri = new Uri($"{currentUri.Scheme}://{serviceInstance.Address}:{serviceInstance.Port}{currentUri.PathAndQuery}");
return await base.SendAsync(request, cancellationToken);
}
Expand Down
16 changes: 4 additions & 12 deletions src/CondenserDotNet.Client/Services/ServiceWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,16 @@ internal ServiceWatcher(string serviceName, HttpClient client, CancellationToken
_logger = logger;
_routingStrategy = routingStrategy;
string lookupUrl = $"{HttpUtils.ServiceHealthUrl}{serviceName}?passing&index=";
Func<string,Task<HttpResponseMessage>> action =
Func<string, Task<HttpResponseMessage>> action =
(consulIndex) => client.GetAsync(lookupUrl + consulIndex, cancel);
_watcher = new BlockingWatcher<List<InformationServiceSet>>(action);
_watcher = new BlockingWatcher<List<InformationServiceSet>>(action, _logger, cancel);
_watcherTask = _watcher.WatchLoop();
}

internal async Task<InformationService> GetNextServiceInstanceAsync()
{
try
{
var instances = await _watcher.ReadAsync();
return _routingStrategy.RouteTo(instances)?.Service;
}
catch(Exception ex)
{
_logger?.LogError("Unable to get an instance of {serviceName} the error was {excception}",_serviceName, ex);
throw new NoServiceInstanceFoundException(_serviceName,ex);
}
var instances = await _watcher.ReadAsync();
return _routingStrategy.RouteTo(instances)?.Service;
}
}
}
12 changes: 11 additions & 1 deletion test/Condenser.Tests.Integration/ServiceLookupFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ public async Task TestThatAnErrorIsReturnedWhenConsulIsNotAvailable()
{
using (var serviceRegistry = new ServiceRegistry(() => new HttpClient() { BaseAddress = new Uri( "http://localhost:7000" )}))
{
await Assert.ThrowsAsync<NoServiceInstanceFoundException>(async () => await serviceRegistry.GetServiceInstanceAsync("TestService"));
await Assert.ThrowsAsync<NoConsulConnectionException>(async () => await serviceRegistry.GetServiceInstanceAsync("TestService"));
}
}

[Fact]
public async Task TestThatAnErrorIsReturnedWhenNoServiceIsAvailableInTheHandler()
{
using (var serviceRegistry = new ServiceRegistry())
{
var handler = serviceRegistry.GetHttpHandler();
var httpClient = new HttpClient(handler);
await Assert.ThrowsAsync<NoServiceInstanceFoundException>(async () => await httpClient.GetAsync($"http://{Guid.NewGuid().ToString()}"));
}
}
[Fact]
public async Task TestRegisterAndCheckUpdates()
{
Expand Down

0 comments on commit e14b440

Please sign in to comment.