Skip to content

Commit

Permalink
Add possibility for callee instance to return an observable
Browse files Browse the repository at this point in the history
  • Loading branch information
Johan 't Hart committed Mar 3, 2021
1 parent fe62d3e commit 5bf516f
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,29 @@ public async Task ProgressiveCallsCallerProgress()
Assert.That(callback.Task.Result, Is.EqualTo(10));
}

[Test]
public async Task ProgressiveCallsCallerProgressObservable()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

await calleeChannel.RealmProxy.Services.RegisterCallee(new LongOpObsService());

MyCallback callback = new MyCallback();

callerChannel.RealmProxy.RpcCatalog.Invoke
(callback,
new CallOptions() { ReceiveProgress = true },
"com.myapp.longop",
new object[] { 10, false });

Assert.That(callback.Task.Result, Is.EqualTo(-1));
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
}

[Test]
public async Task ProgressiveCallsCalleeProxyProgress()
{
Expand Down Expand Up @@ -206,7 +229,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback

public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details)
{
throw new NotImplementedException();
mTask.SetResult(-1); // -1 indicates no final return value
}

public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details, TMessage[] arguments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption
{
caller.Result(ObjectFormatter, options, arguments, argumentKeywords);
}
else if (!this.HasResult)
else if (!this.HasResult || arguments == null)
{
caller.Result(ObjectFormatter, options);
}
Expand All @@ -93,7 +93,7 @@ protected IEnumerable<object> UnpackParameters<TMessage>(IWampFormatter<TMessage
{
ArgumentUnpacker unpacker = new ArgumentUnpacker(Parameters);

IEnumerable<object> result =
IEnumerable<object> result =
unpacker.UnpackParameters(formatter, arguments, argumentsKeywords);

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ protected IWampRpcOperation CreateRpcMethod(Func<object> instanceProvider, ICall
string procedureUri =
interceptor.GetProcedureUri(method);

if (!typeof (Task).IsAssignableFrom(method.ReturnType))
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>))
{
return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri);
}
else if (!typeof (Task).IsAssignableFrom(method.ReturnType))
{
MethodInfoValidation.ValidateSyncMethod(method);
return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri);
Expand Down Expand Up @@ -97,5 +101,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func<object> instanc

return operation;
}

private static IWampRpcOperation CreateProgressiveObservableOperation(Func<object> instanceProvider, MethodInfo method, string procedureUri)
{
//return new ProgressiveObservableMethodInfoRpcOperation<returnType>
// (instance, method, procedureUri);

Type returnType = method.ReturnType.GetGenericArguments()[0];

Type operationType =
typeof(ProgressiveObservableMethodInfoRpcOperation<>)
.MakeGenericType(returnType);

IWampRpcOperation operation =
(IWampRpcOperation)Activator.CreateInstance(operationType,
instanceProvider,
method,
procedureUri);

return operation;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using WampSharp.Core.Utilities;
using WampSharp.Core.Serialization;
using WampSharp.V2.Core.Contracts;

namespace WampSharp.V2.Rpc
{
public class ProgressiveObservableMethodInfoRpcOperation<T> : SyncMethodInfoRpcOperation
{
public ProgressiveObservableMethodInfoRpcOperation(Func<object> instanceProvider, MethodInfo method, string procedureName) :
base(instanceProvider, method, procedureName)
{
}

protected override void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs)
{
((IObservable<T>)result).Subscribe(
it => CallResult(caller, it, outputs, new YieldOptions { Progress = true }),
ex =>
{
if (ex is WampException wampex)
HandleException(caller, wampex);
else
HandleException(caller, ex);
},
// An observable does not emit any value when completing, so result without arguments
() => caller.Result(ObjectFormatter, new YieldOptions())
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,43 @@ protected override IWampCancellableInvocation InnerInvoke<TMessage>(IWampRawRpcO
argumentsKeywords,
out IDictionary<string, object> outputs);

CallResult(caller, result, outputs);
OnResult(caller, result, outputs);
}
catch (WampException ex)
{
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
callback.Error(ex);
HandleException(caller, ex);
}
catch (Exception ex)
{
WampException wampException = ConvertExceptionToRuntimeException(ex);
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
callback.Error(wampException);
HandleException(caller, ex);
}

return null;
}

protected void HandleException(IWampRawRpcOperationRouterCallback caller, WampException ex)
{
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
callback.Error(ex);
}

protected void HandleException(IWampRawRpcOperationRouterCallback caller, Exception ex)
{
WampException wampException = ConvertExceptionToRuntimeException(ex);
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
callback.Error(wampException);
}

protected virtual void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs) =>
CallResult(caller, result, outputs);

protected void CallResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs, YieldOptions yieldOptions = null)
{
yieldOptions = yieldOptions ?? new YieldOptions();
object[] resultArguments = GetResultArguments(result);

IDictionary<string, object> argumentKeywords =
IDictionary<string, object> argumentKeywords =
GetResultArgumentKeywords(result, outputs);

CallResult(caller,
Expand All @@ -69,4 +82,4 @@ protected virtual IDictionary<string, object> GetResultArgumentKeywords(object r
protected abstract object InvokeSync<TMessage>
(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter, InvocationDetails details, TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords, out IDictionary<string, object> outputs);
}
}
}

0 comments on commit 5bf516f

Please sign in to comment.