From 5bf516f0f2897d1b1d9eb4d5af9067367e8404cd Mon Sep 17 00:00:00 2001 From: Johan 't Hart Date: Wed, 3 Mar 2021 11:26:51 +0100 Subject: [PATCH] Add possibility for callee instance to return an observable #238 --- .../Integration/RpcProgressTests.cs | 25 ++++++++++++- .../WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs | 4 +-- .../Callee/Reflection/OperationExtractor.cs | 28 +++++++++++++-- ...ressiveObservableMethodInfoRpcOperation.cs | 35 +++++++++++++++++++ .../V2/Rpc/Callee/SyncLocalRpcOperation.cs | 31 +++++++++++----- 5 files changed, 109 insertions(+), 14 deletions(-) create mode 100644 src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs diff --git a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs index fa207074d..65d9e7959 100644 --- a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs +++ b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs @@ -41,6 +41,29 @@ public async Task ProgressiveCallsCallerProgress() Assert.That(callback.Task.Result, Is.EqualTo(10)); } + [Test] + public async Task ProgressiveCallsCallerProgressObservable() + { + WampPlayground playground = new WampPlayground(); + + CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel(); + IWampChannel calleeChannel = dualChannel.CalleeChannel; + IWampChannel callerChannel = dualChannel.CallerChannel; + + await calleeChannel.RealmProxy.Services.RegisterCallee(new LongOpObsService()); + + MyCallback callback = new MyCallback(); + + callerChannel.RealmProxy.RpcCatalog.Invoke + (callback, + new CallOptions() { ReceiveProgress = true }, + "com.myapp.longop", + new object[] { 10, false }); + + Assert.That(callback.Task.Result, Is.EqualTo(-1)); + CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults); + } + [Test] public async Task ProgressiveCallsCalleeProxyProgress() { @@ -206,7 +229,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback public void Result(IWampFormatter formatter, ResultDetails details) { - throw new NotImplementedException(); + mTask.SetResult(-1); // -1 indicates no final return value } public void Result(IWampFormatter formatter, ResultDetails details, TMessage[] arguments) diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs index ab55aabb4..4e939b122 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs @@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption { caller.Result(ObjectFormatter, options, arguments, argumentKeywords); } - else if (!this.HasResult) + else if (!this.HasResult || arguments == null) { caller.Result(ObjectFormatter, options); } @@ -93,7 +93,7 @@ protected IEnumerable UnpackParameters(IWampFormatter result = + IEnumerable result = unpacker.UnpackParameters(formatter, arguments, argumentsKeywords); return result; diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs index a6cf41815..7713f2af6 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs @@ -57,7 +57,11 @@ protected IWampRpcOperation CreateRpcMethod(Func instanceProvider, ICall string procedureUri = interceptor.GetProcedureUri(method); - if (!typeof (Task).IsAssignableFrom(method.ReturnType)) + if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>)) + { + return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri); + } + else if (!typeof (Task).IsAssignableFrom(method.ReturnType)) { MethodInfoValidation.ValidateSyncMethod(method); return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri); @@ -97,5 +101,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func instanc return operation; } + + private static IWampRpcOperation CreateProgressiveObservableOperation(Func instanceProvider, MethodInfo method, string procedureUri) + { + //return new ProgressiveObservableMethodInfoRpcOperation + // (instance, method, procedureUri); + + Type returnType = method.ReturnType.GetGenericArguments()[0]; + + Type operationType = + typeof(ProgressiveObservableMethodInfoRpcOperation<>) + .MakeGenericType(returnType); + + IWampRpcOperation operation = + (IWampRpcOperation)Activator.CreateInstance(operationType, + instanceProvider, + method, + procedureUri); + + return operation; + } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs new file mode 100644 index 000000000..566d2b5a2 --- /dev/null +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading; +using WampSharp.Core.Utilities; +using WampSharp.Core.Serialization; +using WampSharp.V2.Core.Contracts; + +namespace WampSharp.V2.Rpc +{ + public class ProgressiveObservableMethodInfoRpcOperation : SyncMethodInfoRpcOperation + { + public ProgressiveObservableMethodInfoRpcOperation(Func instanceProvider, MethodInfo method, string procedureName) : + base(instanceProvider, method, procedureName) + { + } + + protected override void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs) + { + ((IObservable)result).Subscribe( + it => CallResult(caller, it, outputs, new YieldOptions { Progress = true }), + ex => + { + if (ex is WampException wampex) + HandleException(caller, wampex); + else + HandleException(caller, ex); + }, + // An observable does not emit any value when completing, so result without arguments + () => caller.Result(ObjectFormatter, new YieldOptions()) + ); + } + } +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs index 3cbe45717..15c8390bf 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs @@ -29,30 +29,43 @@ protected override IWampCancellableInvocation InnerInvoke(IWampRawRpcO argumentsKeywords, out IDictionary outputs); - CallResult(caller, result, outputs); + OnResult(caller, result, outputs); } catch (WampException ex) { - mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure); - IWampErrorCallback callback = new WampRpcErrorCallback(caller); - callback.Error(ex); + HandleException(caller, ex); } catch (Exception ex) { - WampException wampException = ConvertExceptionToRuntimeException(ex); - IWampErrorCallback callback = new WampRpcErrorCallback(caller); - callback.Error(wampException); + HandleException(caller, ex); } return null; } + protected void HandleException(IWampRawRpcOperationRouterCallback caller, WampException ex) + { + mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure); + IWampErrorCallback callback = new WampRpcErrorCallback(caller); + callback.Error(ex); + } + + protected void HandleException(IWampRawRpcOperationRouterCallback caller, Exception ex) + { + WampException wampException = ConvertExceptionToRuntimeException(ex); + IWampErrorCallback callback = new WampRpcErrorCallback(caller); + callback.Error(wampException); + } + + protected virtual void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs) => + CallResult(caller, result, outputs); + protected void CallResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs, YieldOptions yieldOptions = null) { yieldOptions = yieldOptions ?? new YieldOptions(); object[] resultArguments = GetResultArguments(result); - IDictionary argumentKeywords = + IDictionary argumentKeywords = GetResultArgumentKeywords(result, outputs); CallResult(caller, @@ -69,4 +82,4 @@ protected virtual IDictionary GetResultArgumentKeywords(object r protected abstract object InvokeSync (IWampRawRpcOperationRouterCallback caller, IWampFormatter formatter, InvocationDetails details, TMessage[] arguments, IDictionary argumentsKeywords, out IDictionary outputs); } -} \ No newline at end of file +}