Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP lookup source max retries #90

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Fixed

- Lookup queries are now retried in case of IOException up to `gid.connector.http.source.lookup.max-retries` with a delay of `gid.connector.http.source.lookup.request.retry-timeout-ms` between retries. The default values are 3 retries and 1 second delay.

## [0.13.0] - 2024-04-03

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.source.lookup.max-retries | optional | Sets the maximum number of retries for HTTP lookup request. If not specified, the default value of 3 retries will be used. |
| gid.connector.http.source.lookup.retry-timeout-ms | optional | Sets the delay between retries in milliseconds. If not specified, the default value of 1000 milliseconds will be used. |
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public final class HttpConnectorConfigConstants {
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";

public static final String LOOKUP_HTTP_MAX_RETRIES =
GID_CONNECTOR_HTTP + "source.lookup.request.max-retries";

public static final String LOOKUP_HTTP_RETRY_TIMEOUT_MS =
GID_CONNECTOR_HTTP + "source.lookup.request.retry-timeout-ms";

// -----------------------------------------------------


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
@Slf4j
public class JavaNetHttpPollingClient implements PollingClient<RowData> {

public static final String DEFAULT_REQUEST_MAX_RETRIES = "3";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this not be an int?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend keep all properties as Strings, becase they are passed as Properties<String,String> from Table factories.

Later we use them like this:

this.httpRequestMaxRetries = Integer.parseInt(
            options.getProperties().getProperty(
                HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES,
                DEFAULT_REQUEST_MAX_RETRIES
            )
        );


public static final String DEFAULT_REQUEST_RETRY_TIMEOUT_MS = "1000";
Copy link
Contributor

@davidradl davidradl May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest calling this DEFAULT_REQUEST_RETRY_INTERVAL_MS as it is not a timeout. Also this name should mention lookup or polling client - so that it is obvious it does not apply to the sink client.

Same naming consideration for max retries.

I assume we will add something similar for the sink client

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Raz0r could you apply proposed changes?


private final HttpClient httpClient;

private final HttpStatusCodeChecker statusCodeChecker;
Expand All @@ -37,6 +41,10 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;

protected final int httpRequestMaxRetries;

protected final int httpRequestRetryTimeoutMs;

public JavaNetHttpPollingClient(
HttpClient httpClient,
DeserializationSchema<RowData> responseBodyDecoder,
Expand All @@ -62,6 +70,20 @@ public JavaNetHttpPollingClient(
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);

this.httpRequestMaxRetries = Integer.parseInt(
options.getProperties().getProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES,
DEFAULT_REQUEST_MAX_RETRIES
)
);

this.httpRequestRetryTimeoutMs = Integer.parseInt(
options.getProperties().getProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_RETRY_TIMEOUT_MS,
DEFAULT_REQUEST_RETRY_TIMEOUT_MS
)
);
}

@Override
Expand All @@ -74,15 +96,43 @@ public Optional<RowData> pull(RowData lookupRow) {
}
}

// TODO Add Retry Policy And configure TimeOut from properties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of this comment is to make use of the Flink Retry strategies, which includes async.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that AsyncSink uses totally different, dedicated mechanism for retries => Flink native support.
And actually there is separate TODO for this in HttpSinkWriter.

What kind of Flink retry strategies you were here @davidradl ?

private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {

private Optional<RowData> queryAndProcess(RowData lookupData) {
HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
HttpResponse<String> response = httpClient.send(
request.getHttpRequest(),
BodyHandlers.ofString()
);
return processHttpResponse(response, request);
HttpResponse<String> response = null;

int retryCount = 0;

while (retryCount < this.httpRequestMaxRetries) {
try {
response = httpClient.send(
request.getHttpRequest(),
BodyHandlers.ofString()
);
break;
} catch (IOException e) {
log.error("IOException during HTTP request. Retrying...", e);
retryCount++;
if (retryCount == this.httpRequestMaxRetries) {
log.error("Maximum retries reached. Aborting...");
return Optional.empty();
}
try {
Thread.sleep(this.httpRequestRetryTimeoutMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("HTTP request interrupted. Aborting...", e);
return Optional.empty();
}
}
try {
return processHttpResponse(response, request);
} catch (IOException e) {
log.error("IOException during HTTP response processing.", e);
return Optional.empty();
}
}

private Optional<RowData> processHttpResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.MappingBuilder;
import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand Down Expand Up @@ -94,6 +96,7 @@ public void setUp() {
int[][] lookupKey = {{}};
this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey);

wireMockServer.resetAll();
this.lookupRowData = GenericRowData.of(
StringData.fromString("1"),
StringData.fromString("2")
Expand Down Expand Up @@ -290,6 +293,45 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
assertThat(nestedDetailsRow.getString(0).toString()).isEqualTo("$1,729.34");
}

@Test
void shouldRetryOnIOExceptionAndSucceedOnSecondAttempt() {
// GIVEN
this.stubMapping = setUpServerStubForIOExceptionOnFirstAttempt();
Properties properties = new Properties();
properties.setProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES,
"3"
);
JavaNetHttpPollingClient pollingClient = setUpPollingClient(
getBaseUrl(), properties, setUpGetRequestFactory(properties));

// WHEN
Optional<RowData> poll = pollingClient.pull(lookupRowData);

// THEN
wireMockServer.verify(2, RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));

assertThat(poll.isPresent()).isTrue();
}

private StubMapping setUpServerStubForIOExceptionOnFirstAttempt() {
wireMockServer.stubFor(
get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(Scenario.STARTED) // Initial state
.willReturn(aResponse()
.withFault(Fault.CONNECTION_RESET_BY_PEER)) // Fail the first request
.willSetStateTo("Second Attempt")); // Set the next state

return wireMockServer.stubFor(
get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Second Attempt") // When the state is "Second Attempt"
.willReturn(aResponse()
.withStatus(200)
.withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
}

private String getBaseUrl() {
return wireMockServer.baseUrl() + ENDPOINT;
}
Expand Down