Skip to content

Commit 27a407a

Browse files
ScienJusleizhiyuan
authored andcommitted
HystrixFilter should not block async requests. (sofastack#479)
* HystrixFilter should not block async requests * add lock * fix codestyle * add timeout for lock * add events
1 parent 22541a8 commit 27a407a

File tree

12 files changed

+397
-194
lines changed

12 files changed

+397
-194
lines changed

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/DefaultSetterFactory.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.netflix.hystrix.HystrixCommand;
2222
import com.netflix.hystrix.HystrixCommandGroupKey;
2323
import com.netflix.hystrix.HystrixCommandKey;
24-
import com.netflix.hystrix.HystrixObservableCommand;
2524

2625
import java.lang.reflect.Method;
2726
import java.util.Map;
@@ -34,9 +33,7 @@
3433
*/
3534
public class DefaultSetterFactory implements SetterFactory {
3635

37-
private static final Map<Method, HystrixCommand.Setter> SETTER_CACHE = new ConcurrentHashMap<Method, HystrixCommand.Setter>();
38-
39-
private static final Map<Method, HystrixObservableCommand.Setter> OBSERVABLE_SETTER_CACHE = new ConcurrentHashMap<Method, HystrixObservableCommand.Setter>();
36+
private static final Map<Method, HystrixCommand.Setter> SETTER_CACHE = new ConcurrentHashMap<Method, HystrixCommand.Setter>();
4037

4138
@Override
4239
public HystrixCommand.Setter createSetter(FilterInvoker invoker, SofaRequest request) {
@@ -55,22 +52,4 @@ public HystrixCommand.Setter createSetter(FilterInvoker invoker, SofaRequest req
5552
}
5653
return SETTER_CACHE.get(clientMethod);
5754
}
58-
59-
@Override
60-
public HystrixObservableCommand.Setter createObservableSetter(FilterInvoker invoker, SofaRequest request) {
61-
Method clientMethod = request.getMethod();
62-
if (!OBSERVABLE_SETTER_CACHE.containsKey(clientMethod)) {
63-
synchronized (DefaultSetterFactory.class) {
64-
if (!OBSERVABLE_SETTER_CACHE.containsKey(clientMethod)) {
65-
String groupKey = invoker.getConfig().getInterfaceId();
66-
String commandKey = request.getMethodName();
67-
HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter
68-
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
69-
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
70-
OBSERVABLE_SETTER_CACHE.put(clientMethod, setter);
71-
}
72-
}
73-
}
74-
return OBSERVABLE_SETTER_CACHE.get(clientMethod);
75-
}
7655
}

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/HystrixFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So
7373
if (RpcConstants.INVOKER_TYPE_SYNC.equals(request.getInvokeType())) {
7474
command = new SofaHystrixCommand(invoker, request);
7575
} else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(request.getInvokeType())) {
76-
command = new SofaHystrixObservableCommand(invoker, request);
76+
command = new SofaAsyncHystrixCommand(invoker, request);
7777
} else {
7878
return invoker.invoke(request);
7979
}

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/HystrixResponseFuture.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
2020
import com.alipay.sofa.rpc.message.ResponseFuture;
21+
import com.netflix.hystrix.HystrixCommand;
2122

2223
import java.util.List;
2324
import java.util.concurrent.ExecutionException;
@@ -26,36 +27,26 @@
2627
import java.util.concurrent.TimeoutException;
2728

2829
/**
29-
* the {@link Future}(from {@link rx.Observable}) wrapper that can be used as a {@link ResponseFuture}
30+
* the {@link Future}(from {@link HystrixCommand#queue()}) wrapper that can be used as a {@link ResponseFuture}
3031
*
3132
* @author <a href=mailto:[email protected]>ScienJus</a>
3233
*/
3334
public class HystrixResponseFuture implements ResponseFuture {
3435

35-
private Future delegate;
36+
private Future delegate;
3637

37-
private ResponseFuture responseFuture;
38-
39-
public HystrixResponseFuture(Future delegate, ResponseFuture responseFuture) {
38+
public HystrixResponseFuture(Future delegate) {
4039
this.delegate = delegate;
41-
this.responseFuture = responseFuture;
4240
}
4341

4442
@Override
4543
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
46-
if (responseFuture == null) {
47-
// TODO 因为熔断了,没有发出真实请求,无法获得真实实现类是如何处理的
48-
throw new UnsupportedOperationException("Not supported, Please use callback function");
49-
}
50-
return responseFuture.addListener(sofaResponseCallback);
44+
throw new UnsupportedOperationException("addListener is not supported when using Hystrix");
5145
}
5246

5347
@Override
5448
public ResponseFuture addListeners(List list) {
55-
if (responseFuture == null) {
56-
throw new UnsupportedOperationException("Not supported, Please use callback function");
57-
}
58-
return responseFuture.addListeners(list);
49+
throw new UnsupportedOperationException("addListeners is not supported when using Hystrix");
5950
}
6051

6152
@Override

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/SetterFactory.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,4 @@ public interface SetterFactory {
3636
*/
3737
HystrixCommand.Setter createSetter(FilterInvoker invoker, SofaRequest request);
3838

39-
/**
40-
* Create a {@link HystrixObservableCommand.Setter} with the given invoker and request
41-
* @param invoker
42-
* @param request
43-
* @return
44-
*/
45-
HystrixObservableCommand.Setter createObservableSetter(FilterInvoker invoker, SofaRequest request);
46-
4739
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.alipay.sofa.rpc.hystrix;
18+
19+
import com.alipay.sofa.rpc.common.utils.ClassUtils;
20+
import com.alipay.sofa.rpc.config.ConsumerConfig;
21+
import com.alipay.sofa.rpc.context.RpcInternalContext;
22+
import com.alipay.sofa.rpc.context.RpcInvokeContext;
23+
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
24+
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
25+
import com.alipay.sofa.rpc.core.request.SofaRequest;
26+
import com.alipay.sofa.rpc.core.response.SofaResponse;
27+
import com.alipay.sofa.rpc.filter.FilterInvoker;
28+
import com.alipay.sofa.rpc.log.Logger;
29+
import com.alipay.sofa.rpc.log.LoggerFactory;
30+
import com.alipay.sofa.rpc.message.ResponseFuture;
31+
import com.netflix.hystrix.HystrixCommand;
32+
import com.netflix.hystrix.HystrixEventType;
33+
34+
import java.lang.reflect.InvocationTargetException;
35+
import java.lang.reflect.Method;
36+
import java.util.ArrayList;
37+
import java.util.Collections;
38+
import java.util.List;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
42+
/**
43+
* {@link HystrixCommand} for async requests
44+
*
45+
* @author <a href=mailto:[email protected]>ScienJus</a>
46+
*/
47+
public class SofaAsyncHystrixCommand extends HystrixCommand implements SofaHystrixInvokable {
48+
49+
private static final Logger LOGGER = LoggerFactory
50+
.getLogger(SofaAsyncHystrixCommand.class);
51+
52+
private static final long DEFAULT_LOCK_TIMEOUT = 1000;
53+
54+
private final FilterInvoker invoker;
55+
56+
private final SofaRequest request;
57+
58+
private final RpcInternalContext rpcInternalContext;
59+
60+
private final RpcInvokeContext rpcInvokeContext;
61+
62+
private final CountDownLatch lock = new CountDownLatch(1);
63+
64+
private final List<SofaAsyncHystrixEvent> events = new ArrayList<SofaAsyncHystrixEvent>();
65+
66+
private SofaResponse sofaResponse;
67+
68+
public SofaAsyncHystrixCommand(FilterInvoker invoker, SofaRequest request) {
69+
super(SofaHystrixConfig.loadSetterFactory((ConsumerConfig) invoker.getConfig()).createSetter(invoker,
70+
request));
71+
this.rpcInternalContext = RpcInternalContext.peekContext();
72+
this.rpcInvokeContext = RpcInvokeContext.peekContext();
73+
this.invoker = invoker;
74+
this.request = request;
75+
}
76+
77+
@Override
78+
public SofaResponse invoke() {
79+
if (isCircuitBreakerOpen() && LOGGER.isWarnEnabled(invoker.getConfig().getAppName())) {
80+
LOGGER.warnWithApp(invoker.getConfig().getAppName(), "Circuit Breaker is opened, method: {}#{}",
81+
invoker.getConfig().getInterfaceId(), request.getMethodName());
82+
}
83+
HystrixResponseFuture delegate = new HystrixResponseFuture(this.queue());
84+
try {
85+
boolean finished = lock.await(getLockTimeout(), TimeUnit.MILLISECONDS);
86+
if (!finished && !this.isExecutionComplete()) {
87+
throw new SofaTimeOutException(
88+
"Asynchronous execution timed out, please check Hystrix configuration. Events: " +
89+
getExecutionEventsString());
90+
}
91+
} catch (InterruptedException e) {
92+
Thread.currentThread().interrupt();
93+
}
94+
RpcInternalContext.getContext().setFuture(delegate);
95+
if (this.sofaResponse == null) {
96+
this.sofaResponse = buildEmptyResponse(request);
97+
}
98+
return this.sofaResponse;
99+
}
100+
101+
@Override
102+
protected Object run() throws Exception {
103+
events.add(SofaAsyncHystrixEvent.EMIT);
104+
RpcInternalContext.setContext(rpcInternalContext);
105+
RpcInvokeContext.setContext(rpcInvokeContext);
106+
107+
this.sofaResponse = invoker.invoke(request);
108+
ResponseFuture responseFuture = RpcInternalContext.getContext().getFuture();
109+
lock.countDown();
110+
events.add(SofaAsyncHystrixEvent.INVOKE_UNLOCKED);
111+
try {
112+
return responseFuture.get();
113+
} finally {
114+
events.add(SofaAsyncHystrixEvent.INVOKE_SUCCESS);
115+
}
116+
}
117+
118+
@Override
119+
protected Object getFallback() {
120+
events.add(SofaAsyncHystrixEvent.FALLBACK_EMIT);
121+
if (lock.getCount() > 0) {
122+
// > 0 说明 run 方法没有执行,或是执行时立刻失败了
123+
this.sofaResponse = buildEmptyResponse(request);
124+
lock.countDown();
125+
events.add(SofaAsyncHystrixEvent.FALLBACK_UNLOCKED);
126+
}
127+
FallbackFactory fallbackFactory = SofaHystrixConfig.loadFallbackFactory((ConsumerConfig) invoker.getConfig());
128+
if (fallbackFactory == null) {
129+
return super.getFallback();
130+
}
131+
Object fallback = fallbackFactory.create(null, this.getExecutionException());
132+
if (fallback == null) {
133+
return super.getFallback();
134+
}
135+
try {
136+
return request.getMethod().invoke(fallback, request.getMethodArgs());
137+
} catch (IllegalAccessException e) {
138+
throw new SofaRpcRuntimeException("Hystrix fallback method failed to execute.", e);
139+
} catch (InvocationTargetException e) {
140+
throw new SofaRpcRuntimeException("Hystrix fallback method failed to execute.",
141+
e.getTargetException());
142+
} finally {
143+
events.add(SofaAsyncHystrixEvent.FALLBACK_SUCCESS);
144+
}
145+
}
146+
147+
// Copy from AbstractCluster#buildEmptyResponse
148+
private SofaResponse buildEmptyResponse(SofaRequest request) {
149+
SofaResponse response = new SofaResponse();
150+
Method method = request.getMethod();
151+
if (method != null) {
152+
response.setAppResponse(ClassUtils.getDefaultPrimitiveValue(method.getReturnType()));
153+
}
154+
return response;
155+
}
156+
157+
private long getLockTimeout() {
158+
if (this.getProperties().executionTimeoutEnabled().get()) {
159+
return this.getProperties().executionTimeoutInMilliseconds().get();
160+
}
161+
return DEFAULT_LOCK_TIMEOUT;
162+
}
163+
164+
private String getExecutionEventsString() {
165+
List<HystrixEventType> executionEvents = getExecutionEvents();
166+
if (executionEvents == null) {
167+
executionEvents = Collections.emptyList();
168+
}
169+
StringBuilder message = new StringBuilder("[");
170+
for (HystrixEventType executionEvent : executionEvents) {
171+
message.append(HystrixEventType.class.getSimpleName()).append("#").append(executionEvent.name())
172+
.append(",");
173+
}
174+
for (SofaAsyncHystrixEvent event : events) {
175+
message.append(SofaAsyncHystrixEvent.class.getSimpleName()).append("#").append(event.name()).append(",");
176+
}
177+
if (message.length() > 1) {
178+
message.deleteCharAt(message.length() - 1);
179+
}
180+
return message.append("]").toString();
181+
}
182+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.alipay.sofa.rpc.hystrix;
18+
19+
/**
20+
*
21+
*/
22+
public enum SofaAsyncHystrixEvent {
23+
EMIT,
24+
INVOKE_UNLOCKED,
25+
INVOKE_SUCCESS,
26+
FALLBACK_EMIT,
27+
FALLBACK_UNLOCKED,
28+
FALLBACK_SUCCESS,
29+
}

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/SofaHystrixCommand.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
*/
3737
public class SofaHystrixCommand extends HystrixCommand<SofaResponse> implements SofaHystrixInvokable {
3838

39-
private final static Logger LOGGER = LoggerFactory.getLogger(SofaHystrixCommand.class);
39+
private static final Logger LOGGER = LoggerFactory.getLogger(SofaHystrixCommand.class);
4040

41-
private RpcInternalContext rpcInternalContext;
42-
private RpcInvokeContext rpcInvokeContext;
43-
protected FilterInvoker invoker;
44-
protected SofaRequest request;
41+
private final RpcInternalContext rpcInternalContext;
42+
43+
private final RpcInvokeContext rpcInvokeContext;
44+
45+
private final FilterInvoker invoker;
46+
private final SofaRequest request;
4547

4648
public SofaHystrixCommand(FilterInvoker invoker, SofaRequest request) {
4749
super(SofaHystrixConfig.loadSetterFactory((ConsumerConfig) invoker.getConfig()).createSetter(invoker, request));

extension-impl/fault-hystrix/src/main/java/com/alipay/sofa/rpc/hystrix/SofaHystrixInvokable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import com.alipay.sofa.rpc.core.response.SofaResponse;
2020

2121
/**
22-
* Basic interface for {@link SofaHystrixCommand} and {@link SofaHystrixObservableCommand}
22+
* Basic interface for {@link SofaHystrixCommand} and {@link SofaAsyncHystrixCommand}
2323
*
2424
* @author <a href=mailto:[email protected]>ScienJus</a>
2525
*/

0 commit comments

Comments
 (0)