Skip to content

Commit

Permalink
Merge branch 'release/1.2.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
tznind committed Jan 9, 2020
2 parents 30cb2f3 + 23863c9 commit 029c1f8
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 128 deletions.
21 changes: 19 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,23 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

...
## [1.2.3] - 2020-01-09

### Changed

- RabbitMQAdapter: Improve handling of timeouts on connection startup

### Added

- Improved logging in IdentifierSwappers

### Changed

- Guid swapper no longer limits input identifiers to a maximum of 10 characters

### Fixed

- Fixed DicomRelationalMapper not cleaning up STAGING table remnants from previously failed loads (leading to crash)

## [1.2.2] - 2020-01-08

Expand Down Expand Up @@ -100,7 +116,8 @@ First stable release after importing the repository from the private [SMIPlugin]
- Anonymous `MappingTableName` must now be fully specified to pass validation (e.g. `mydb.mytbl`). Previously skipping database portion was supported.


[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.2.2...develop
[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.2.3...develop
[1.2.3]: https://github.com/SMI/SmiServices/compare/v1.2.2...v1.2.3
[1.2.2]: https://github.com/SMI/SmiServices/compare/v1.2.1...v1.2.2
[1.2.1]: https://github.com/SMI/SmiServices/compare/1.2.0...v1.2.1
[1.2.0]: https://github.com/SMI/SmiServices/compare/1.1.0-rc1...1.2.0
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
![GitHub](https://img.shields.io/github/license/SMI/SmiServices)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/SMI/SmiServices.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/SMI/SmiServices/alerts/)

Version: `1.2.2`
Version: `1.2.3`

# SMI Services

Expand Down
6 changes: 3 additions & 3 deletions src/SharedAssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
[assembly: AssemblyCulture("")]

// These should be overwritten by release builds
[assembly: AssemblyVersion("1.2.2")]
[assembly: AssemblyFileVersion("1.2.2")]
[assembly: AssemblyInformationalVersion("1.2.2")] // This one can have the extra build info after it
[assembly: AssemblyVersion("1.2.3")]
[assembly: AssemblyFileVersion("1.2.3")]
[assembly: AssemblyInformationalVersion("1.2.3")] // This one can have the extra build info after it
43 changes: 34 additions & 9 deletions src/common/Smi.Common/RabbitMQAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public bool HasConsumers
private const int MinRabbitServerVersionMinor = 7;
private const int MinRabbitServerVersionPatch = 0;

private const int MaxSubscriptionAttempts = 5;


/// <summary>
///
Expand Down Expand Up @@ -135,18 +137,41 @@ public Guid StartConsumer(ConsumerOptions consumerOptions, IConsumer consumer, b
throw new ApplicationException("Already a consumer on queue " + consumerOptions.QueueName + " and solo consumer was specified");
}

Subscription subscription;
Subscription subscription = null;
var connected = false;
var failed = 0;

try
{
subscription = new Subscription(model, consumerOptions.QueueName, consumerOptions.AutoAck, label);
}
catch (OperationInterruptedException e)
while (!connected)
{
model.Close(200, "StartConsumer - Couldn't create subscription");
connection.Close(200, "StartConsumer - Couldn't create subscription");
try
{
subscription = new Subscription(model, consumerOptions.QueueName, consumerOptions.AutoAck, label);
connected = true;
}
catch (TimeoutException)
{
if (++failed >= MaxSubscriptionAttempts)
{
_logger.Warn("Retries exceeded, throwing exception");
throw;
}

throw new ApplicationException("Error when creating subscription on queue \"" + consumerOptions.QueueName + "\"", e);
_logger.Warn($"Timeout when creating Subscription, retrying in 5s...");
Thread.Sleep(TimeSpan.FromSeconds(5));
}
catch (OperationInterruptedException e)
{
throw new ApplicationException(
"Error when creating subscription on queue \"" + consumerOptions.QueueName + "\"", e);
}
finally
{
if (!connected)
{
model.Close(200, "StartConsumer - Couldn't create subscription");
connection.Close(200, "StartConsumer - Couldn't create subscription");
}
}
}

Guid taskId = Guid.NewGuid();
Expand Down
33 changes: 33 additions & 0 deletions src/common/Smi.Common/TimeTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;

namespace Smi.Common
{
/// <summary>
/// Runs a <see cref="Stopwatch"/> for the duration of the using statement (this class is <see cref="IDisposable"/>)
/// </summary>
public class TimeTracker:IDisposable
{
private readonly Stopwatch _sw;

/// <summary>
/// Starts the <paramref name="sw"/> and runs it until disposal (use this in a using statement)
/// </summary>
/// <param name="sw"></param>
public TimeTracker(Stopwatch sw)
{
_sw = sw;
_sw.Start();
}

/// <summary>
/// Stops the <see cref="Stopwatch"/>
/// </summary>
public void Dispose()
{
_sw.Stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,33 @@ public override ExitCodeType Run(IDataLoadJob job, GracefulCancellationToken can
var configuration = job.Configuration;
var namer = configuration.DatabaseNamer;

// To be on the safe side, we will create/destroy the staging tables on a per-load basis
var cloner = new DatabaseCloner(configuration);
job.CreateTablesInStage(cloner, LoadBubble.Staging);

DiscoveredServer server = job.LoadMetadata.GetDistinctLiveDatabaseServer();
server.EnableAsync();

//Drop any STAGING tables that already exist
foreach (var table in job.RegularTablesToLoad)
{
string stagingDbName = table.GetDatabaseRuntimeName(LoadStage.AdjustStaging, namer);
string stagingTableName = table.GetRuntimeName(LoadStage.AdjustStaging, namer);

var stagingDb = server.ExpectDatabase(stagingDbName);
var stagingTable = stagingDb.ExpectTable(stagingTableName);

if (stagingDb.Exists())
{
if (stagingTable.Exists())
{
job.OnNotify(this,new NotifyEventArgs(ProgressEventType.Information,$"Dropping existing STAGING table remnant {stagingTable.GetFullyQualifiedName()}"));
stagingTable.Drop();
}
}
}

//Now create STAGING tables (empty)
var cloner = new DatabaseCloner(configuration);
job.CreateTablesInStage(cloner, LoadBubble.Staging);


using (DbConnection con = server.GetConnection())
{
con.Open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using FAnsi.Implementations.MySql;
using FAnsi.Implementations.Oracle;
using FAnsi.Implementations.PostgreSql;
using NLog;
using RabbitMQ.Client.Exceptions;


Expand Down Expand Up @@ -93,11 +94,8 @@ public override void Stop(string reason)
{

}


var asLookup = _swapper as TableLookupSwapper;
if (asLookup != null)
Logger.Debug("TableLookupSwapper: TotalSwapCount={0} TotalCachedSwapCount={1}", asLookup.TotalSwapCount, asLookup.TotalCachedSwapCount);
_swapper?.LogProgress(Logger, LogLevel.Info);

base.Stop(reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Data.Common;
using System.Data.SqlClient;
using System.Text;
using Smi.Common;
using TypeGuesser;

namespace Microservices.IdentifierMapper.Execution.Swappers
Expand All @@ -16,7 +17,7 @@ namespace Microservices.IdentifierMapper.Execution.Swappers
/// Connects to a (possibly empty) database containing values to swap identifiers with. If no valid replacement found for a value,
/// we create a new <see cref="Guid"/>, insert it into the database, and return it as the swapped value. Keeps a cache of swap values
/// </summary>
public class ForGuidIdentifierSwapper : ISwapIdentifiers
public class ForGuidIdentifierSwapper : SwapIdentifiers
{
private readonly ILogger _logger;

Expand All @@ -27,6 +28,7 @@ public class ForGuidIdentifierSwapper : ISwapIdentifiers
private readonly Dictionary<string, string> _cachedAnswers = new Dictionary<string, string>();
private readonly object _oCacheLock = new object();

private int _swapColumnLength;

public ForGuidIdentifierSwapper()
{
Expand All @@ -37,29 +39,36 @@ public ForGuidIdentifierSwapper()
/// Connects to the specified swapping table if it exists, or creates it
/// </summary>
/// <param name="mappingTableOptions"></param>
public void Setup(IMappingTableOptions mappingTableOptions)
public override void Setup(IMappingTableOptions mappingTableOptions)
{
_options = mappingTableOptions;
_table = _options.Discover();

CreateTableIfNotExists();
using(new TimeTracker(DatabaseStopwatch))
CreateTableIfNotExists();
}

public string GetSubstitutionFor(string toSwap, out string reason)
public override string GetSubstitutionFor(string toSwap, out string reason)
{
reason = null;

if (toSwap.Length > 10)
if (_swapColumnLength >0 && toSwap.Length > _swapColumnLength)
{
reason = "Supplied value was too long (" + toSwap.Length + ")";
reason = $"Supplied value was too long ({toSwap.Length}) - max allowed is ({_swapColumnLength})";
Invalid++;
return null;
}

string insertSql;
lock (_oCacheLock)
{
if (_cachedAnswers.ContainsKey(toSwap))
{
CacheHit++;
Success++;
return _cachedAnswers[toSwap];
}


var guid = Guid.NewGuid().ToString();

Expand Down Expand Up @@ -100,37 +109,41 @@ where not exists(select *

}

using (var con = _table.Database.Server.BeginNewTransactedConnection())
{
DbCommand cmd = _table.Database.Server.GetCommand(insertSql, con);

try
{
cmd.ExecuteNonQuery();
}
catch (Exception e)
using(new TimeTracker(DatabaseStopwatch))
using (var con = _table.Database.Server.BeginNewTransactedConnection())
{
throw new Exception("Failed to perform lookup of toSwap with SQL:" + insertSql, e);
}
DbCommand cmd = _table.Database.Server.GetCommand(insertSql, con);

//guid may not have been inserted. Just because we don't have it in our cache doesn't mean that other poeple might
//not have allocated that one at the same time.
try
{
cmd.ExecuteNonQuery();
}
catch (Exception e)
{
Invalid++;
throw new Exception("Failed to perform lookup of toSwap with SQL:" + insertSql, e);
}

DbCommand cmd2 = _table.Database.Server.GetCommand($"SELECT {_options.ReplacementColumnName} FROM {_table.GetFullyQualifiedName()} WHERE {_options.SwapColumnName} = '{toSwap}' ",con);
var syncAnswer = (string)cmd2.ExecuteScalar();
//guid may not have been inserted. Just because we don't have it in our cache doesn't mean that other poeple might
//not have allocated that one at the same time.

_cachedAnswers.Add(toSwap, syncAnswer);
DbCommand cmd2 = _table.Database.Server.GetCommand($"SELECT {_options.ReplacementColumnName} FROM {_table.GetFullyQualifiedName()} WHERE {_options.SwapColumnName} = '{toSwap}' ",con);
var syncAnswer = (string)cmd2.ExecuteScalar();

con.ManagedTransaction.CommitAndCloseConnection();
return syncAnswer;
}
_cachedAnswers.Add(toSwap, syncAnswer);

con.ManagedTransaction.CommitAndCloseConnection();
Success++;
CacheMiss++;
return syncAnswer;
}
}
}

/// <summary>
/// Clears the in-memory cache of swap pairs
/// </summary>
public void ClearCache()
public override void ClearCache()
{
lock (_oCacheLock)
{
Expand All @@ -139,6 +152,7 @@ public void ClearCache()
}
}


private void CreateTableIfNotExists()
{
try
Expand All @@ -158,18 +172,18 @@ private void CreateTableIfNotExists()
new DatabaseColumnRequest(_options.SwapColumnName, new DatabaseTypeRequest(typeof(string), 10), false){ IsPrimaryKey = true },
new DatabaseColumnRequest(_options.ReplacementColumnName,new DatabaseTypeRequest(typeof(string), 255), false)}
);
}

if (_table.Exists())
_logger.Info("Guid mapping table exist (" + _table + ")");
else
throw new Exception("Table creation did not result in table existing!");
if (_table.Exists())
_logger.Info("Guid mapping table exist (" + _table + ")");
else
throw new Exception("Table creation did not result in table existing!");

_logger.Info("Checking for column " + _options.SwapColumnName);
_table.DiscoverColumn(_options.SwapColumnName);
_logger.Info("Checking for column " + _options.SwapColumnName);
_swapColumnLength = _table.DiscoverColumn(_options.SwapColumnName).DataType.GetLengthIfString();

_logger.Info("Checking for column " + _options.ReplacementColumnName);
_table.DiscoverColumn(_options.ReplacementColumnName);
}
_logger.Info("Checking for column " + _options.ReplacementColumnName);
_table.DiscoverColumn(_options.ReplacementColumnName);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

using NLog;
using Smi.Common.Options;

namespace Microservices.IdentifierMapper.Execution.Swappers
Expand All @@ -23,5 +24,12 @@ public interface ISwapIdentifiers
/// Clear the mapping cache (if exists) and reload
/// </summary>
void ClearCache();

/// <summary>
/// Report on the current number of swapped identifiers
/// </summary>
/// <param name="logger"></param>
/// <param name="level"></param>
void LogProgress(ILogger logger, LogLevel level);
}
}
Loading

0 comments on commit 029c1f8

Please sign in to comment.