diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b499503..3134702a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Lookup queries are now retried in case of IOException up to `gid.connector.http.source.lookup.max-retries` with a delay of `gid.connector.http.source.lookup.request.retry-timeout-ms` between retries. The default values are 3 retries and 1 second delay. + ## [0.13.0] - 2024-04-03 ### Added diff --git a/README.md b/README.md index bbaa0ff9..8d328272 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,8 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | +| gid.connector.http.source.lookup.max-retries | optional | Sets the maximum number of retries for HTTP lookup request. If not specified, the default value of 3 retries will be used. | +| gid.connector.http.source.lookup.retry-timeout-ms | optional | Sets the delay between retries in milliseconds. If not specified, the default value of 1000 milliseconds will be used. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c7c10e20..b4ea7a59 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -92,6 +92,12 @@ public final class HttpConnectorConfigConstants { public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE = GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size"; + public static final String LOOKUP_HTTP_MAX_RETRIES = + GID_CONNECTOR_HTTP + "source.lookup.request.max-retries"; + + public static final String LOOKUP_HTTP_RETRY_TIMEOUT_MS = + GID_CONNECTOR_HTTP + "source.lookup.request.retry-timeout-ms"; + // ----------------------------------------------------- diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index d1e13324..b5fb0afd 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -27,6 +27,10 @@ @Slf4j public class JavaNetHttpPollingClient implements PollingClient { + public static final String DEFAULT_REQUEST_MAX_RETRIES = "3"; + + public static final String DEFAULT_REQUEST_RETRY_TIMEOUT_MS = "1000"; + private final HttpClient httpClient; private final HttpStatusCodeChecker statusCodeChecker; @@ -37,6 +41,10 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpPostRequestCallback httpPostRequestCallback; + protected final int httpRequestMaxRetries; + + protected final int httpRequestRetryTimeoutMs; + public JavaNetHttpPollingClient( HttpClient httpClient, DeserializationSchema responseBodyDecoder, @@ -62,6 +70,20 @@ public JavaNetHttpPollingClient( .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + this.httpRequestMaxRetries = Integer.parseInt( + options.getProperties().getProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES, + DEFAULT_REQUEST_MAX_RETRIES + ) + ); + + this.httpRequestRetryTimeoutMs = Integer.parseInt( + options.getProperties().getProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_RETRY_TIMEOUT_MS, + DEFAULT_REQUEST_RETRY_TIMEOUT_MS + ) + ); } @Override @@ -74,15 +96,43 @@ public Optional pull(RowData lookupRow) { } } - // TODO Add Retry Policy And configure TimeOut from properties - private Optional queryAndProcess(RowData lookupData) throws Exception { - + private Optional queryAndProcess(RowData lookupData) { HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); - HttpResponse response = httpClient.send( - request.getHttpRequest(), - BodyHandlers.ofString() - ); - return processHttpResponse(response, request); + HttpResponse response = null; + + int retryCount = 0; + + while (retryCount < this.httpRequestMaxRetries) { + try { + response = httpClient.send( + request.getHttpRequest(), + BodyHandlers.ofString() + ); + break; + } catch (IOException e) { + log.error("IOException during HTTP request. Retrying...", e); + retryCount++; + if (retryCount == this.httpRequestMaxRetries) { + log.error("Maximum retries reached. Aborting..."); + return Optional.empty(); + } + try { + Thread.sleep(this.httpRequestRetryTimeoutMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("HTTP request interrupted. Aborting...", e); + return Optional.empty(); + } + } + try { + return processHttpResponse(response, request); + } catch (IOException e) { + log.error("IOException during HTTP response processing.", e); + return Optional.empty(); + } } private Optional processHttpResponse( diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 277f7d84..955ca4fe 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -7,7 +7,9 @@ import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.MappingBuilder; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; +import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.stubbing.StubMapping; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -94,6 +96,7 @@ public void setUp() { int[][] lookupKey = {{}}; this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey); + wireMockServer.resetAll(); this.lookupRowData = GenericRowData.of( StringData.fromString("1"), StringData.fromString("2") @@ -290,6 +293,45 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue, assertThat(nestedDetailsRow.getString(0).toString()).isEqualTo("$1,729.34"); } + @Test + void shouldRetryOnIOExceptionAndSucceedOnSecondAttempt() { + // GIVEN + this.stubMapping = setUpServerStubForIOExceptionOnFirstAttempt(); + Properties properties = new Properties(); + properties.setProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES, + "3" + ); + JavaNetHttpPollingClient pollingClient = setUpPollingClient( + getBaseUrl(), properties, setUpGetRequestFactory(properties)); + + // WHEN + Optional poll = pollingClient.pull(lookupRowData); + + // THEN + wireMockServer.verify(2, RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + + assertThat(poll.isPresent()).isTrue(); + } + + private StubMapping setUpServerStubForIOExceptionOnFirstAttempt() { + wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .inScenario("Retry Scenario") + .whenScenarioStateIs(Scenario.STARTED) // Initial state + .willReturn(aResponse() + .withFault(Fault.CONNECTION_RESET_BY_PEER)) // Fail the first request + .willSetStateTo("Second Attempt")); // Set the next state + + return wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .inScenario("Retry Scenario") + .whenScenarioStateIs("Second Attempt") // When the state is "Second Attempt" + .willReturn(aResponse() + .withStatus(200) + .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json")))); + } + private String getBaseUrl() { return wireMockServer.baseUrl() + ENDPOINT; }