From 9711d6d5e7e65ebd16c553a4a3c29901fd07e143 Mon Sep 17 00:00:00 2001 From: Chris Canal Date: Thu, 17 Oct 2024 14:40:34 -0600 Subject: [PATCH] Adds support for tenantId to MartenOps --- ...marten_operations_with_tenant_switching.cs | 150 ++++++++++++++++++ ...ndler_actions_with_returned_StartStream.cs | 114 ++++++++++++- src/Persistence/Wolverine.Marten/IMartenOp.cs | 105 ++++++++---- 3 files changed, 340 insertions(+), 29 deletions(-) create mode 100644 src/Persistence/MartenTests/handler_actions_with_implied_marten_operations_with_tenant_switching.cs diff --git a/src/Persistence/MartenTests/handler_actions_with_implied_marten_operations_with_tenant_switching.cs b/src/Persistence/MartenTests/handler_actions_with_implied_marten_operations_with_tenant_switching.cs new file mode 100644 index 000000000..d5f076b74 --- /dev/null +++ b/src/Persistence/MartenTests/handler_actions_with_implied_marten_operations_with_tenant_switching.cs @@ -0,0 +1,150 @@ +using IntegrationTests; +using Marten; +using Marten.Events; +using Marten.Exceptions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests; + +public class handler_actions_with_implied_marten_operations_with_tenant_switching : PostgresqlContext, IAsyncLifetime +{ + private IHost _host; + private IDocumentStore _store; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services + .AddMarten(o => + { + o.Connection(Servers.PostgresConnectionString); + o.Policies.AllDocumentsAreMultiTenanted(); + o.Events.StreamIdentity = StreamIdentity.AsString; + }) + .IntegrateWithWolverine(); + + + opts.Policies.AutoApplyTransactions(); + }).StartAsync(); + + _store = _host.Services.GetRequiredService(); + + await _store.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(TenantNamedDocument)); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task storing_document() + { + var tracked = await _host.InvokeMessageAndWaitAsync(new TenantCreateMartenDocument("Aubrey", "green")); + + tracked.Sent.MessagesOf>().ShouldHaveNoMessages(); + tracked.Sent.SingleMessage().Name.ShouldBe("Aubrey"); + + using var session = _store.LightweightSession("green"); + var doc = await session.LoadAsync("Aubrey"); + doc.ShouldNotBeNull(); + } + + [Fact] + public async Task insert_document() + { + await _host.InvokeMessageAndWaitAsync(new TenantInsertMartenDocument("Declan", "green")); + + using var session = _store.LightweightSession("green"); + var doc = await session.LoadAsync("Declan"); + doc.ShouldNotBeNull(); + + await Should.ThrowAsync(() => + _host.InvokeMessageAndWaitAsync(new TenantInsertMartenDocument("Declan", "green"))); + } + + [Fact] + public async Task update_document_happy_path() + { + await _host.InvokeMessageAndWaitAsync(new TenantInsertMartenDocument("Max", "green")); + await _host.InvokeMessageAndWaitAsync(new TenantUpdateMartenDocument("Max", 10, "green")); + + + using var session = _store.LightweightSession("green"); + var doc = await session.LoadAsync("Max"); + doc.Number.ShouldBe(10); + + + } + + [Fact] + public async Task update_document_sad_path() + { + await Should.ThrowAsync(() => + _host.InvokeMessageAndWaitAsync(new TenantUpdateMartenDocument("Max", 10, "green"))); + } + + [Fact] + public async Task delete_document() + { + await _host.InvokeMessageAndWaitAsync(new TenantInsertMartenDocument("Max", "green")); + await _host.InvokeMessageAndWaitAsync(new TenantDeleteMartenDocument("Max", "green")); + + using var session = _store.LightweightSession("green"); + var doc = await session.LoadAsync("Max"); + doc.ShouldBeNull(); + } + +} + + +public record TenantCreateMartenDocument(string Name, string TenantId); +public record TenantInsertMartenDocument(string Name, string TenantId); +public record TenantUpdateMartenDocument(string Name, int Number, string TenantId); +public record TenantDeleteMartenDocument(string Name, string TenantId); + +public record TenantMartenMessage2(string Name, string TenantId); + +public static class TenantMartenCommandHandler +{ + public static (TenantMartenMessage2, DocumentOp) Handle(TenantCreateMartenDocument command) + { + return (new TenantMartenMessage2(command.Name, command.TenantId), MartenOps.Store(new TenantNamedDocument { Id = command.Name }, command.TenantId)); + } + + public static DocumentOp Handle(TenantInsertMartenDocument command) + { + return MartenOps.Insert(new TenantNamedDocument { Id = command.Name }, command.TenantId); + } + + public static async Task Handle(TenantUpdateMartenDocument command, IDocumentSession session) + { + return MartenOps.Update(new TenantNamedDocument{Id = command.Name, Number = command.Number}, command.TenantId); + } + + public static async Task Handle(TenantDeleteMartenDocument command, IDocumentSession session) + { + var doc = await session.ForTenant(command.TenantId).LoadAsync(command.Name); + + return MartenOps.Delete(doc, command.TenantId); + } + + public static void Handle(TenantMartenMessage2 message) + { + // Nothing yet + } +} + +public class TenantNamedDocument +{ + public string Id { get; set; } + public int Number { get; set; } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/handler_actions_with_returned_StartStream.cs b/src/Persistence/MartenTests/handler_actions_with_returned_StartStream.cs index 8a8fb0e15..4c2d1f528 100644 --- a/src/Persistence/MartenTests/handler_actions_with_returned_StartStream.cs +++ b/src/Persistence/MartenTests/handler_actions_with_returned_StartStream.cs @@ -1,6 +1,7 @@ using IntegrationTests; using Marten; using Marten.Events; +using Marten.Storage; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Oakton.Resources; @@ -55,6 +56,57 @@ public async Task start_stream_by_guid1() events[1].Data.ShouldBeOfType(); } } +public class handler_actions_with_returned_StartStream_with_tenant_switching : PostgresqlContext, IAsyncLifetime +{ + private IHost _host; + private IDocumentStore _store; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services + .AddMarten(o => + { + o.Connection(Servers.PostgresConnectionString); + o.Policies.AllDocumentsAreMultiTenanted(); + o.Events.TenancyStyle = TenancyStyle.Conjoined; + o.DatabaseSchemaName = "martenops_events_guid"; + }) + .IntegrateWithWolverine(); + + opts.Policies.AutoApplyTransactions(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _store = _host.Services.GetRequiredService(); + + await _store.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(NamedDocument)); + await _store.Advanced.Clean.DeleteAllEventDataAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task start_stream_by_guid1() + { + var id = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartStreamMessage(id, "green")); + + using var session = _store.LightweightSession("green"); + var events = await session.Events.FetchStreamAsync(id); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } +} public class start_stream_by_string_from_return_value : PostgresqlContext, IAsyncLifetime { @@ -106,18 +158,76 @@ public async Task start_stream_by_string() } } -public record StartStreamMessage(Guid Id); -public record StartStreamMessage2(string Id); +public class start_stream_by_string_from_return_value_with_tenant_switching : PostgresqlContext, IAsyncLifetime +{ + private IHost _host; + private IDocumentStore _store; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services + .AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.Policies.AllDocumentsAreMultiTenanted(); + m.Events.StreamIdentity = StreamIdentity.AsString; + m.DatabaseSchemaName = "martenops_string_identity"; + m.Events.TenancyStyle = TenancyStyle.Conjoined; + }) + .IntegrateWithWolverine(); + + opts.Policies.AutoApplyTransactions(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _store = _host.Services.GetRequiredService(); + + await _store.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(NamedDocument)); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task start_stream_by_string() + { + var id = Guid.NewGuid().ToString(); + + await _host.InvokeMessageAndWaitAsync(new StartStreamMessage2(id, "green")); + + using var session = _store.LightweightSession("green"); + var events = await session.Events.FetchStreamAsync(id); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } +} + +public record StartStreamMessage(Guid Id, string? TenantId = null); +public record StartStreamMessage2(string Id, string? TenantId = null); public static class StartStreamMessageHandler { public static IStartStream Handle(StartStreamMessage message) { + if (message.TenantId is not null) + return MartenOps.StartStream(message.Id, message.TenantId, new AEvent(), new BEvent()); + return MartenOps.StartStream(message.Id, new AEvent(), new BEvent()); } public static IStartStream Handle(StartStreamMessage2 message) { + if (message.TenantId is not null) + return MartenOps.StartStream(message.Id, message.TenantId, new CEvent(), new BEvent()); + return MartenOps.StartStream(message.Id, new CEvent(), new BEvent()); } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/IMartenOp.cs b/src/Persistence/Wolverine.Marten/IMartenOp.cs index f010cb3e6..e7d86a630 100644 --- a/src/Persistence/Wolverine.Marten/IMartenOp.cs +++ b/src/Persistence/Wolverine.Marten/IMartenOp.cs @@ -14,6 +14,7 @@ public interface IMartenOp : ISideEffect void Execute(IDocumentSession session); } + #endregion /// @@ -25,68 +26,72 @@ public static class MartenOps /// Return a side effect of storing the specified document in Marten /// /// + /// /// /// /// - public static StoreDoc Store(T document) where T : notnull + public static StoreDoc Store(T document, string? tenantId = null) where T : notnull { if (document == null) { throw new ArgumentNullException(nameof(document)); } - return new StoreDoc(document); + return new StoreDoc(document, tenantId); } /// /// Return a side effect of inserting the specified document in Marten /// /// + /// /// /// /// - public static InsertDoc Insert(T document) where T : notnull + public static InsertDoc Insert(T document, string? tenantId = null) where T : notnull { if (document == null) { throw new ArgumentNullException(nameof(document)); } - return new InsertDoc(document); + return new InsertDoc(document, tenantId); } /// /// Return a side effect of updating the specified document in Marten /// /// + /// /// /// /// - public static UpdateDoc Update(T document) where T : notnull + public static UpdateDoc Update(T document, string? tenantId = null) where T : notnull { if (document == null) { throw new ArgumentNullException(nameof(document)); } - return new UpdateDoc(document); + return new UpdateDoc(document, tenantId); } /// /// Return a side effect of deleting the specified document in Marten /// /// + /// /// /// /// - public static DeleteDoc Delete(T document) where T : notnull + public static DeleteDoc Delete(T document, string? tenantId = null) where T : notnull { if (document == null) { throw new ArgumentNullException(nameof(document)); } - return new DeleteDoc(document); + return new DeleteDoc(document, tenantId); } /// @@ -98,7 +103,20 @@ public static DeleteDoc Delete(T document) where T : notnull /// public static StartStream StartStream(Guid streamId, params object[] events) where T : class { - return new StartStream(streamId, events); + return new StartStream(streamId, null, events); + } + + /// + /// Return a side effect of starting a new event stream in Marten + /// + /// + /// + /// + /// + /// + public static StartStream StartStream(Guid streamId, string? tenantId, params object[] events) where T : class + { + return new StartStream(streamId, tenantId, events); } /// @@ -112,7 +130,7 @@ public static StartStream StartStream(Guid streamId, params object[] event public static IStartStream StartStream(params object[] events) where T : class { var streamId = CombGuidIdGeneration.NewGuid(); - return new StartStream(streamId, events); + return new StartStream(streamId, null, events); } /// @@ -124,7 +142,20 @@ public static IStartStream StartStream(params object[] events) where T : clas /// public static IStartStream StartStream(string streamKey, params object[] events) where T : class { - return new StartStream(streamKey, events); + return new StartStream(streamKey, null, events); + } + + /// + /// Return a side effect of starting a new event stream in Marten + /// + /// + /// + /// + /// + /// + public static IStartStream StartStream(string streamKey, string? tenantId, params object[] events) where T : class + { + return new StartStream(streamKey, tenantId, events); } /// @@ -154,34 +185,40 @@ public interface IStartStream : IMartenOp Type AggregateType { get; } IReadOnlyList Events { get; } + string? TenantId { get; } } public class StartStream : IStartStream where T : class { public string StreamKey { get; } = string.Empty; public Guid StreamId { get; } + public string? TenantId { get; } - public StartStream(Guid streamId, params object[] events) + public StartStream(Guid streamId, string? tenantId, params object[] events) { StreamId = streamId; + TenantId = tenantId; Events.AddRange(events); } - public StartStream(string streamKey, params object[] events) + public StartStream(string streamKey, string? tenantId, params object[] events) { StreamKey = streamKey; + TenantId = tenantId; Events.AddRange(events); } - public StartStream(Guid streamId, IList events) + public StartStream(Guid streamId, string? tenantId, IList events) { StreamId = streamId; + TenantId = tenantId; Events.AddRange(events); } - public StartStream(string streamKey, IList events) + public StartStream(string streamKey, string? tenantId, IList events) { StreamKey = streamKey; + TenantId = tenantId; Events.AddRange(events); } @@ -206,16 +243,16 @@ public void Execute(IDocumentSession session) { if (StreamKey.IsNotEmpty()) { - session.Events.StartStream(StreamKey, Events.ToArray()); + session.ForTenant(TenantId ?? session.TenantId).Events.StartStream(StreamKey, Events.ToArray()); } else { - session.Events.StartStream(Events.ToArray()); + session.ForTenant(TenantId ?? session.TenantId).Events.StartStream(Events.ToArray()); } } else { - session.Events.StartStream(StreamId, Events.ToArray()); + session.ForTenant(TenantId ?? session.TenantId).Events.StartStream(StreamId, Events.ToArray()); } } @@ -228,14 +265,17 @@ public class StoreDoc : DocumentOp where T : notnull { private readonly T _document; - public StoreDoc(T document) : base(document) + public StoreDoc(T document, string? tenantId = null) : base(document, tenantId) { _document = document; } public override void Execute(IDocumentSession session) { - session.Store(_document); + if (TenantId is not null) + session.ForTenant(TenantId).Store(_document); + else + session.Store(_document); } } @@ -243,14 +283,17 @@ public class InsertDoc : DocumentOp where T : notnull { private readonly T _document; - public InsertDoc(T document) : base(document) + public InsertDoc(T document, string? tenantId = null) : base(document, tenantId) { _document = document; } public override void Execute(IDocumentSession session) { - session.Insert(_document); + if (TenantId is not null) + session.ForTenant(TenantId).Insert(_document); + else + session.Insert(_document); } } @@ -258,14 +301,17 @@ public class UpdateDoc : DocumentOp where T : notnull { private readonly T _document; - public UpdateDoc(T document) : base(document) + public UpdateDoc(T document, string? tenantId = null) : base(document, tenantId) { _document = document; } public override void Execute(IDocumentSession session) { - session.Update(_document); + if (TenantId is not null) + session.ForTenant(TenantId).Update(_document); + else + session.Update(_document); } } @@ -273,24 +319,29 @@ public class DeleteDoc : DocumentOp where T : notnull { private readonly T _document; - public DeleteDoc(T document) : base(document) + public DeleteDoc(T document, string? tenantId = null) : base(document, tenantId) { _document = document; } public override void Execute(IDocumentSession session) { - session.Delete(_document); + if (TenantId is not null) + session.ForTenant(TenantId).Delete(_document); + else + session.Delete(_document); } } public abstract class DocumentOp : IMartenOp { public object Document { get; } + public string? TenantId { get; } - protected DocumentOp(object document) + protected DocumentOp(object document, string? tenantId = null) { Document = document; + TenantId = tenantId; } public abstract void Execute(IDocumentSession session);