Skip to content

Commit ec32f88

Browse files
committed
Fixes for RPC shadow services (see below)
- RPC return value now properly passed on to remote callback - Shadow services are now being used if the message bus is offline, remote communication is disabled or no remote endpoint for the service exists - No longer attempt to contact remote endpoint if message was deferred or was already delivered to a shadow service - No longer swallow exceptions thrown by shadow service implementations - Test coverage
1 parent 277120d commit ec32f88

File tree

14 files changed

+376
-168
lines changed

14 files changed

+376
-168
lines changed

errai-bus/src/main/java/org/jboss/errai/bus/client/api/base/MessageBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import org.jboss.errai.bus.client.api.BusErrorCallback;
2020
import org.jboss.errai.bus.client.api.HasEncoded;
21-
import org.jboss.errai.bus.client.api.messaging.Message;
21+
import org.jboss.errai.bus.client.api.RoutingFlag;
2222
import org.jboss.errai.bus.client.api.builder.DefaultRemoteCallBuilder;
2323
import org.jboss.errai.bus.client.api.builder.MessageBuildCommand;
2424
import org.jboss.errai.bus.client.api.builder.MessageBuildSendableWithReply;
2525
import org.jboss.errai.bus.client.api.builder.MessageBuildSubject;
2626
import org.jboss.errai.bus.client.api.builder.MessageReplySendable;
27+
import org.jboss.errai.bus.client.api.messaging.Message;
2728
import org.jboss.errai.bus.client.api.messaging.MessageProvider;
28-
import org.jboss.errai.bus.client.api.RoutingFlag;
2929
import org.jboss.errai.common.client.api.RemoteCallback;
3030

3131
/**
@@ -119,7 +119,6 @@ public static MessageBuildCommand<MessageBuildSendableWithReply> createMessage(f
119119
* @return a <tt>MessageBuildSubject</tt> which essentially is a <tt>Message</tt>, but ensures that the user
120120
* constructs messages properly
121121
*/
122-
@SuppressWarnings({ "unchecked" })
123122
public static MessageBuildSubject<MessageReplySendable> createConversation(final Message message) {
124123
final Message newMessage = provider.get();
125124
newMessage.setFlag(RoutingFlag.NonGlobalRouting);
@@ -143,7 +142,6 @@ public static MessageBuildSubject<MessageReplySendable> createConversation(final
143142
* @return a <tt>MessageBuildSubject</tt> which essentially is a <tt>Message</tt>, but ensures that the user
144143
* constructs messages properly
145144
*/
146-
@SuppressWarnings({ "unchecked" })
147145
public static MessageBuildCommand<MessageReplySendable> createConversation(final Message message, final String subject) {
148146
final Message newMessage = provider.get();
149147
if (newMessage instanceof HasEncoded) {

errai-bus/src/main/java/org/jboss/errai/bus/client/api/builder/DefaultRemoteCallBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public RemoteCallResponseDef endpoint(String endPointName, Annotation[] qualifie
173173

174174
if (qualifiers != null) {
175175
final List<String> qualNames = new ArrayList<String>(qualifiers.length);
176-
for (Annotation a : qualifiers) {
176+
for (final Annotation a : qualifiers) {
177177
qualNames.add(a.annotationType().getName());
178178
}
179179

errai-bus/src/main/java/org/jboss/errai/bus/client/framework/ClientMessageBusImpl.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ public void callback(final Message message) {
121121
final String errorTo = message.get(String.class, MessageParts.ErrorTo);
122122

123123
if (errorTo == null || DefaultErrorCallback.CLIENT_ERROR_SUBJECT.equals(errorTo)) {
124-
Throwable t = message.get(Throwable.class, MessageParts.Throwable);
124+
final Throwable t = message.get(Throwable.class, MessageParts.Throwable);
125125
Optional.ofNullable(GWT.getUncaughtExceptionHandler())
126126
.ifPresent(h -> h.onUncaughtException(t));
127127

128128
if (!uncaughtExceptionHandlers.isEmpty()) {
129-
for (UncaughtExceptionHandler handler : uncaughtExceptionHandlers) {
129+
for (final UncaughtExceptionHandler handler : uncaughtExceptionHandlers) {
130130
handler.onUncaughtException(t);
131131
}
132132
}
@@ -319,12 +319,12 @@ private void setBusToInitializableState() {
319319

320320
private void setupDefaultHandlers() {
321321
if (availableHandlers != null) {
322-
for (TransportHandler handler : availableHandlers.values()) {
322+
for (final TransportHandler handler : availableHandlers.values()) {
323323
handler.close();
324324
}
325325
}
326326

327-
Map<String, TransportHandler> m = new LinkedHashMap<String, TransportHandler>();
327+
final Map<String, TransportHandler> m = new LinkedHashMap<String, TransportHandler>();
328328
m.put(Capabilities.WebSockets.name(), new WebsocketHandler(ClientMessageBusImpl.this));
329329
m.put(Capabilities.SSE.name(), new SSEHandler(ClientMessageBusImpl.this));
330330
m.put(Capabilities.LongPolling.name(),
@@ -673,7 +673,7 @@ public void callback(final Message message) {
673673
try {
674674
callback.callback(message);
675675
}
676-
catch (Exception e) {
676+
catch (final Exception e) {
677677
handleCallbackError(message, e);
678678
}
679679
}
@@ -779,29 +779,26 @@ public void send(final Message message) {
779779
logger.debug("send({})", message.getParts());
780780

781781
try {
782-
boolean deferred = false;
782+
boolean delivered = false;
783783
final boolean localOnly = message.isFlagSet(RoutingFlag.DeliverLocalOnly);
784784
final String subject = message.getSubject();
785785

786786
if (message.hasPart(MessageParts.ToSubject)) {
787787
if (isRemoteCommunicationEnabled() && !localOnly) {
788788
if (getState().isShadowDeliverable() && shadowSubscriptions.containsKey(subject)) {
789789
deliverToSubscriptions(shadowSubscriptions, subject, message);
790-
deferred = true;
790+
delivered = true;
791791
}
792792
else if (getState() != BusState.CONNECTED) {
793793
logger.debug("deferred: {}", message);
794794
deferredMessages.add(message);
795-
deferred = true;
795+
delivered = true;
796+
}
797+
else if (remotes.containsKey(subject)) {
798+
logger.debug("sent to remote: {}", message);
799+
remotes.get(subject).callback(message);
800+
delivered = true;
796801
}
797-
}
798-
799-
boolean routedToRemote = false;
800-
801-
if (!localOnly && remotes.containsKey(subject)) {
802-
logger.debug("sent to remote: {}", message);
803-
remotes.get(subject).callback(message);
804-
routedToRemote = true;
805802
}
806803

807804
if (subscriptions.containsKey(subject)) {
@@ -810,16 +807,21 @@ else if (getState() != BusState.CONNECTED) {
810807
else if (localSubscriptions.containsKey(subject)) {
811808
deliverToSubscriptions(localSubscriptions, subject, message);
812809
}
813-
else if (!deferred && !routedToRemote) {
814-
throw new NoSubscribersToDeliverTo(subject);
810+
else if (!delivered) {
811+
if (shadowSubscriptions.containsKey(subject)) {
812+
deliverToSubscriptions(shadowSubscriptions, subject, message);
813+
}
814+
else {
815+
throw new NoSubscribersToDeliverTo(subject);
816+
}
815817
}
816818
}
817819
else {
818820
throw new RuntimeException("Cannot send message using this method"
819821
+ " if the message does not contain a ToSubject field.");
820822
}
821823
}
822-
catch (RuntimeException e) {
824+
catch (final RuntimeException e) {
823825
callErrorHandler(message, e);
824826
}
825827
}
@@ -986,7 +988,7 @@ private void handleCallbackError(final Message message, final Throwable t) {
986988
try {
987989
defaultErrorHandling = message.getErrorCallback().error(message, t);
988990
}
989-
catch (Throwable secondaryError) {
991+
catch (final Throwable secondaryError) {
990992
logger.error("Encountered an error while calling error callback for message to " + message.getSubject(), secondaryError);
991993
}
992994
}
@@ -997,7 +999,7 @@ private void handleCallbackError(final Message message, final Throwable t) {
997999
}
9981000

9991001
private void loadRpcProxies() {
1000-
RpcProxyLoader proxyLoader = ((RpcProxyLoader) GWT.create(RpcProxyLoader.class));
1002+
final RpcProxyLoader proxyLoader = ((RpcProxyLoader) GWT.create(RpcProxyLoader.class));
10011003
proxyLoader.loadProxies(ClientMessageBusImpl.this);
10021004
}
10031005

@@ -1218,7 +1220,7 @@ else if (newState == BusState.LOCAL_ONLY) {
12181220
try {
12191221
et.deliverTo(lifecycleListeners.get(i), e);
12201222
}
1221-
catch (Throwable t) {
1223+
catch (final Throwable t) {
12221224
logger.error("listener threw exception: " + t);
12231225
t.printStackTrace();
12241226
}

errai-bus/src/main/java/org/jboss/errai/bus/client/util/BusToolsCli.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private static Map<String, Object> decodePayloadMap(final EJValue value) {
115115
}
116116

117117
private static final QueueSession clientSession = new QueueSession() {
118-
private Map<String, Object> attributes = new HashMap<String, Object>();
118+
private final Map<String, Object> attributes = new HashMap<String, Object>();
119119

120120
@Override
121121
public String getSessionId() {
@@ -194,7 +194,6 @@ public static void setAutoDemarshall(final boolean autoDemarshall1) {
194194
* @param path
195195
* path to use when sending requests to the JAX-RS endpoint
196196
*/
197-
@SuppressWarnings("UnusedDeclaration")
198197
public static native void setApplicationRoot(final String path) /*-{
199198
if (path == null) {
200199
$wnd.erraiBusApplicationRoot = undefined;
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright (C) 2012 Red Hat, Inc. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jboss.errai.ioc.support.bus.rebind;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.jboss.errai.bus.client.ErraiBus;
24+
import org.jboss.errai.bus.client.api.ClientMessageBus;
25+
import org.jboss.errai.bus.client.api.base.MessageBuilder;
26+
import org.jboss.errai.bus.client.api.messaging.Message;
27+
import org.jboss.errai.bus.client.api.messaging.MessageCallback;
28+
import org.jboss.errai.bus.server.annotations.Remote;
29+
import org.jboss.errai.bus.server.annotations.ShadowService;
30+
import org.jboss.errai.codegen.Parameter;
31+
import org.jboss.errai.codegen.Statement;
32+
import org.jboss.errai.codegen.VariableReference;
33+
import org.jboss.errai.codegen.builder.AnonymousClassStructureBuilder;
34+
import org.jboss.errai.codegen.builder.BlockBuilder;
35+
import org.jboss.errai.codegen.builder.ElseBlockBuilder;
36+
import org.jboss.errai.codegen.builder.impl.ObjectBuilder;
37+
import org.jboss.errai.codegen.util.EmptyStatement;
38+
import org.jboss.errai.codegen.util.If;
39+
import org.jboss.errai.codegen.util.ProxyUtil;
40+
import org.jboss.errai.codegen.util.Refs;
41+
import org.jboss.errai.codegen.util.Stmt;
42+
import org.jboss.errai.ioc.client.api.CodeDecorator;
43+
import org.jboss.errai.ioc.rebind.ioc.extension.IOCDecoratorExtension;
44+
import org.jboss.errai.ioc.rebind.ioc.injector.api.Decorable;
45+
import org.jboss.errai.ioc.rebind.ioc.injector.api.FactoryController;
46+
47+
/**
48+
* Generates logic to register client-side shadow services for Errai's message
49+
* bus. Shadow services are used when:
50+
* <ul>
51+
* <li>Remote communication is turned off
52+
* <li>Errai's message bus is not in connected state
53+
* <li>A remote endpoint for the service doesn't exist
54+
* </ul>
55+
*
56+
* @author Mike Brock
57+
* @author Christian Sadilek <[email protected]>
58+
*/
59+
@CodeDecorator
60+
public class ShadowServiceDecorator extends IOCDecoratorExtension<ShadowService> {
61+
public ShadowServiceDecorator(Class<ShadowService> decoratesWith) {
62+
super(decoratesWith);
63+
}
64+
65+
@Override
66+
public void generateDecorator(final Decorable decorable, final FactoryController controller) {
67+
final ShadowService shadowService = (ShadowService) decorable.getAnnotation();
68+
String serviceName = null;
69+
70+
Statement subscribeShadowStatement = null;
71+
final Class<?> javaClass = decorable.getType().asClass();
72+
for (final Class<?> intf : javaClass.getInterfaces()) {
73+
if (intf.isAnnotationPresent(Remote.class)) {
74+
serviceName = intf.getName() + ":RPC";
75+
76+
final AnonymousClassStructureBuilder builder = generateMethodDelegates(intf, decorable, controller);
77+
subscribeShadowStatement = Stmt.castTo(ClientMessageBus.class, Stmt.invokeStatic(ErraiBus.class, "get"))
78+
.invoke("subscribeShadow", serviceName, builder.finish());
79+
}
80+
81+
if (serviceName == null) {
82+
if (shadowService.value().equals("")) {
83+
serviceName = decorable.getName();
84+
}
85+
else {
86+
serviceName = shadowService.value();
87+
}
88+
89+
subscribeShadowStatement = Stmt.castTo(ClientMessageBus.class, Stmt.invokeStatic(ErraiBus.class, "get"))
90+
.invoke("subscribeShadow", serviceName, controller.contextGetInstanceStmt());
91+
}
92+
93+
controller.addFactoryInitializationStatements(Collections.singletonList(subscribeShadowStatement));
94+
}
95+
}
96+
97+
private AnonymousClassStructureBuilder generateMethodDelegates(final Class<?> intf, final Decorable decorable, final FactoryController controller) {
98+
99+
final BlockBuilder<AnonymousClassStructureBuilder> builder = ObjectBuilder.newInstanceOf(MessageCallback.class)
100+
.extend().publicOverridesMethod("callback", Parameter.of(Message.class, "message"))
101+
.append(Stmt.declareVariable("commandType", String.class,
102+
Stmt.loadVariable("message").invoke("getCommandType")))
103+
.append(Stmt.declareVariable("methodParms", List.class,
104+
Stmt.loadVariable("message").invoke("get", List.class, Stmt.loadLiteral("MethodParms"))));
105+
106+
for (final Method method : intf.getMethods()) {
107+
if (ProxyUtil.isMethodInInterface(intf, method)) {
108+
final Class<?>[] parameterTypes = method.getParameterTypes();
109+
final VariableReference[] objects = new VariableReference[parameterTypes.length];
110+
final BlockBuilder<ElseBlockBuilder> blockBuilder = If
111+
.cond(Stmt.loadLiteral(ProxyUtil.createCallSignature(intf, method)).invoke("equals",
112+
Stmt.loadVariable("commandType")));
113+
114+
for (int i = 0; i < parameterTypes.length; i++) {
115+
final Class<?> parameterType = parameterTypes[i];
116+
blockBuilder.append(Stmt.declareVariable("var" + i, parameterType,
117+
Stmt.castTo(parameterType, Stmt.loadVariable("methodParms").invoke("get", i))));
118+
objects[i] = Refs.get("var" + i);
119+
}
120+
121+
final boolean hasReturnType = !method.getReturnType().equals(void.class);
122+
blockBuilder.append(Stmt.declareFinalVariable("instance", intf, controller.contextGetInstanceStmt()));
123+
final Statement methodInvocation = Stmt.nestedCall(Stmt.loadVariable("instance")).invoke(method.getName(), (Object[]) objects);
124+
blockBuilder.append(Stmt.try_()
125+
.append((hasReturnType) ? Stmt.declareFinalVariable("ret", method.getReturnType(), methodInvocation) : methodInvocation)
126+
.append((decorable.isEnclosingTypeDependent()) ? Stmt.loadVariable("context").invoke("destroyInstance", Refs.get("instance"))
127+
: EmptyStatement.INSTANCE)
128+
.append((hasReturnType) ? Stmt.invokeStatic(MessageBuilder.class, "createConversation", Stmt.loadVariable("message"))
129+
.invoke("subjectProvided").invoke("with", "MethodReply", Refs.get("ret"))
130+
.invoke("noErrorHandling").invoke("sendNowWith", Stmt.invokeStatic(ErraiBus.class, "get"))
131+
: EmptyStatement.INSTANCE)
132+
.finish().catch_(Throwable.class, "throwable")
133+
.append(Stmt.throw_(RuntimeException.class, Stmt.loadVariable("throwable"))).finish());
134+
builder.append(blockBuilder.finish());
135+
}
136+
}
137+
return builder.finish();
138+
}
139+
}

0 commit comments

Comments
 (0)