Skip to content

Commit

Permalink
DLQ service improvements for CritterWatch support. Closes GH-764. Closes
Browse files Browse the repository at this point in the history
 GH-1243. Closes GH-1244. Closes GH-1245. Closes GH-1248
  • Loading branch information
jeremydmiller committed Jan 29, 2025
1 parent d98eb66 commit d53af72
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using IntegrationTests;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
using Wolverine.SqlServer;
using Xunit.Abstractions;

namespace SqlServerTests.Persistence;

public class deadletter_admin_compliance : DeadLetterAdminCompliance
{
public deadletter_admin_compliance(ITestOutputHelper output) : base(output)
{
}

public override async Task<IHost> BuildCleanHost()
{
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "dlq5");

// This setting changes the internal message storage identity
opts.Durability.DeadLetterQueueExpirationEnabled = true;
})
.StartAsync();

var persistence = (IMessageDatabase)host.Services.GetRequiredService<IMessageStore>();
await persistence.Admin.ClearAllAsync();

return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ public Task<IReadOnlyList<DeadLetterQueueCount>> SummarizeByDatabaseAsync(string
{
return SummarizeAllAsync(serviceName, range, token);
}

protected virtual string toTopClause(DeadLetterEnvelopeQuery query)
{
return "";
}

public async Task<DeadLetterEnvelopeResults> QueryAsync(DeadLetterEnvelopeQuery query, CancellationToken token)
{
var builder = ToCommandBuilder();

var topSelect = toTopClause(query);

// TODO -- not sure the total works for both databases
builder.Append($"select {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1");
builder.Append($"select{topSelect} {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1");

writeDeadLetterWhereClause(query, builder);

Expand Down Expand Up @@ -163,11 +169,18 @@ public Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token)
public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token)
{
var builder = ToCommandBuilder();

builder.Append($"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = true where 1 = 1");

builder.Append(
$"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = ");
builder.AppendParameter(true);
builder.Append(" where 1 = 1");
writeDeadLetterWhereClause(query, builder);
builder.Append(';');

builder.Append(
$"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable} where {DatabaseConstants.Replayable} = ");
builder.AppendParameter(true);
builder.Append(';');

return executeCommandBatch(builder, token);
}

Expand All @@ -193,9 +206,16 @@ public Task ReplayAsync(MessageBatchRequest request, CancellationToken token)

foreach (var id in request.Ids)
{
builder.Append($"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = true where {DatabaseConstants.Id} = ");
builder.Append(
$"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = ");
builder.AppendParameter(true);
builder.Append($" where {DatabaseConstants.Id} = ");
builder.AppendParameter(id);
builder.Append(';');
builder.Append(
$"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable} where {DatabaseConstants.Replayable} = ");
builder.AppendParameter(true);
builder.Append(';');
}

new MoveReplayableErrorMessagesToIncomingOperation(this).ConfigureCommand(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Weasel.SqlServer;
using Weasel.SqlServer.Tables;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Sagas;
using Wolverine.RDBMS.Transport;
Expand Down Expand Up @@ -64,6 +65,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex)

protected override void writePagingAfter(DbCommandBuilder builder, int offset, int limit)
{
if (offset == 0) return;

if (offset > 0)
{
builder.Append(" OFFSET ");
Expand All @@ -73,12 +76,22 @@ protected override void writePagingAfter(DbCommandBuilder builder, int offset, i

if (limit > 0)
{
builder.Append("FETCH NEXT ");
builder.Append(" FETCH NEXT ");
builder.AppendParameter(limit);
builder.Append(" ROWS ONLY");
}
}

protected override string toTopClause(DeadLetterEnvelopeQuery query)
{
if (query.PageSize > 0 && query.PageNumber <= 1)
{
return $" top {query.PageSize}";
}

return string.Empty;
}

public override async Task<PersistedCounts> FetchCountsAsync()
{
var counts = new PersistedCounts();
Expand Down
Loading

0 comments on commit d53af72

Please sign in to comment.