Skip to content

Commit

Permalink
Merge pull request #1621 from mattrjacobs/stuck-circuit-breaker
Browse files Browse the repository at this point in the history
Fixed bug where an unsubscription of a command in half-open state leaves circuit  permanently open
  • Loading branch information
mattrjacobs authored Jul 6, 2017
2 parents fa9c54d + b5c1d20 commit a820344
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public void call() {
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
circuitBreaker.markNonSuccess();
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,11 @@ public boolean attemptExecution() {
return true;
} else {
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import rx.Observable;
import rx.Subscription;

/**
* These tests each use a different command key to ensure that running them in parallel doesn't allow the state
Expand Down Expand Up @@ -565,6 +566,56 @@ public void testLowVolumeDoesNotTripCircuit() {
}
}

@Test
public void testUnsubscriptionDoesNotLeaveCircuitStuckHalfOpen() {
String key = "cmd-J";
try {
int sleepWindow = 200;

// fail
HystrixCommand<Boolean> cmd1 = new FailureCommand(key, 1, sleepWindow);
HystrixCommand<Boolean> cmd2 = new FailureCommand(key, 1, sleepWindow);
HystrixCommand<Boolean> cmd3 = new FailureCommand(key, 1, sleepWindow);
HystrixCommand<Boolean> cmd4 = new FailureCommand(key, 1, sleepWindow);
cmd1.execute();
cmd2.execute();
cmd3.execute();
cmd4.execute();

HystrixCircuitBreaker cb = cmd1.circuitBreaker;

// everything has failed in the test window so we should return false now
Thread.sleep(100);
assertFalse(cb.allowRequest());
assertTrue(cb.isOpen());

//this should occur after the sleep window, so get executed
//however, it is unsubscribed, so never updates state on the circuit-breaker
HystrixCommand<Boolean> cmd5 = new SuccessCommand(key, 5000, sleepWindow);

//wait for sleep window to pass
Thread.sleep(sleepWindow + 50);

Observable<Boolean> o = cmd5.observe();
Subscription s = o.subscribe();
s.unsubscribe();

//wait for 10 sleep windows, then try a successful command. this should return the circuit to CLOSED

Thread.sleep(10 * sleepWindow);
HystrixCommand<Boolean> cmd6 = new SuccessCommand(key, 1, sleepWindow);
cmd6.execute();

Thread.sleep(100);
assertTrue(cb.allowRequest());
assertFalse(cb.isOpen());

} catch (Exception e) {
e.printStackTrace();
fail("Error occurred: " + e.getMessage());
}
}

/**
* Utility method for creating {@link HystrixCommandMetrics} for unit tests.
*/
Expand Down

1 comment on commit a820344

@hanxingxing
Copy link

@hanxingxing hanxingxing commented on a820344 Jan 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use 1.5.13 , code blow can not recovery
use 1.5.12, code blow can recovery
seems like fallback set status conflict with first request after timesleepwindow;

`package xxx;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HystrixCommand4CircuitBreakerTest extends HystrixCommand {

private final String name;

public HystrixCommand4CircuitBreakerTest(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
                    .andCommandPropertiesDefaults(
                            HystrixCommandProperties.Setter()
                                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) 
                                    .withExecutionIsolationSemaphoreMaxConcurrentRequests(100) 
                                    .withExecutionTimeoutEnabled(false)
                                    .withCircuitBreakerRequestVolumeThreshold(30)
                                    .withCircuitBreakerSleepWindowInMilliseconds(10000)
                                    .withCircuitBreakerErrorThresholdPercentage(60)
                    )
    );
    this.name = name;
}

@Override
protected String run() throws Exception {
    System.out.println("running run():" + name);
    int num = Integer.valueOf(name);
    Thread.sleep(250);
    if (num < 30) {
        throw new RuntimeException("wx need retry");
    }
  return name;
}

@Override
protected String getFallback() {
    return "CircuitBreaker fallback: " + name;
}

private static ExecutorService executorService = new ThreadPoolExecutor(5,
        10,
        2000L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(10000));

public static class UnitTest {

    @Test
    public void testSynchronous() throws IOException {


        for (int i = 0; i < 50; i ++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 100000000; j ++) {
                        try {
                            new HystrixCommand4CircuitBreakerTest(String.valueOf(j)).execute();
                        } catch(Exception e) {
                            System.out.println("run() throw HystrixBadRequestException" + e.getCause());
                        }
                    }
                }
            });
        }
        try {
            Thread.sleep(10000000);
        } catch (Exception e) {

        }
    }
}

}`

Please sign in to comment.