Skip to content

Commit

Permalink
HTTP63 caching for sync lookups
Browse files Browse the repository at this point in the history
Signed-off-by: David Radley <[email protected]>
  • Loading branch information
davidradl committed May 9, 2024
1 parent a1e485e commit 28ec114
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink: [ "1.16.3", "1.17.2", "1.18.1"]
flink: ["1.16.3", "1.17.2", "1.18.1", "1.19.0"]
steps:
- uses: actions/checkout@v3

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Added support for caching of synchronous lookup joins.

## [0.13.0] - 2024-04-03

### Added
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
| lookup.cache | optional | Enum possible values: NONE, PARTIAL. The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external API). |
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. "lookup.cache" must be set to "PARTIAL" to use this option. See the following Lookup Cache section for more details. |
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
Expand All @@ -410,6 +414,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |


### HTTP Sink
| Option | Required | Description/Value |
|---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand All @@ -434,6 +439,19 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.sink.writer.request.mode | optional | Sets Http Sink request submission mode. Two modes are available to select, `single` and `batch` which is the default mode if option is not specified. |
| gid.connector.http.sink.request.batch.size | optional | Applicable only for `gid.connector.http.sink.writer.request.mode = batch`. Sets number of individual events/requests that will be submitted as one HTTP request by HTTP sink. The default value is 500 which is same as HTTP Sink `maxBatchSize` |


## Lookup Cache
The HTTP Client connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

By default, lookup cache is not enabled. You can enable it by setting lookup.cache to PARTIAL.

The lookup cache is used to improve performance of temporal join the HTTP Client connector. By default, lookup cache is not enabled, so all the API requests are sent on the network. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests
on the network when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.partial-cache.max-rows or when the row exceeds the max time to live specified by lookup.partial-cache.expire-after-write or lookup.partial-cache.expire-after-access.
The cached rows might not be the latest, users can tune expiration options to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

By default, flink will cache the empty query result for a Primary key, you can toggle the behaviour by setting lookup.partial-cache.cache-missing-key to false.


## Build and deployment
To build the project locally you need to have `maven 3` and Java 11+. </br>

Expand Down Expand Up @@ -519,7 +537,7 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
## TODO

### HTTP TableLookup Source
- Implement caches.
- Implement caches for async.
- Think about Retry Policy for Http Request
- Check other `//TODO`'s.

Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
Expand Down Expand Up @@ -290,13 +291,13 @@ under the License.
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>

<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
//public class HttpTableLookupFunction extends TableFunction<RowData> {
public class CachingHttpTableLookupFunction extends LookupFunction {
private final PollingClientFactory<RowData> pollingClientFactory;

private final DeserializationSchema<RowData> responseSchemaDecoder;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final LookupRow lookupRow;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final HttpLookupConfig options;

private transient AtomicInteger localHttpCallCounter;

private transient PollingClient<RowData> client;

private LookupCache cache;

public CachingHttpTableLookupFunction(
PollingClientFactory<RowData> pollingClientFactory,
DeserializationSchema<RowData> responseSchemaDecoder,
LookupRow lookupRow,
HttpLookupConfig options,
LookupCache cache) {

this.pollingClientFactory = pollingClientFactory;
this.responseSchemaDecoder = responseSchemaDecoder;
this.lookupRow = lookupRow;
this.options = options;
this.cache = cache;
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(CachingHttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
log.debug("lookup=" + lookupRow);
localHttpCallCounter.incrementAndGet();
Optional<RowData> rowData= client.pull(keyRow);
List<RowData> result = new ArrayList<>();
rowData.ifPresent(row -> { result.add(row); });
log.debug("lookup result=" + result);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
Expand Down Expand Up @@ -46,17 +48,20 @@ public class HttpLookupTableSource
private final DynamicTableFactory.Context dynamicTableFactoryContext;

private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final LookupCache cache;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DynamicTableFactory.Context dynamicTablecontext) {
DynamicTableFactory.Context dynamicTablecontext,
LookupCache cache) {

this.physicalRowDataType = physicalRowDataType;
this.lookupConfig = lookupConfig;
this.decodingFormat = decodingFormat;
this.dynamicTableFactoryContext = dynamicTablecontext;
this.cache =cache;
}

@Override
Expand All @@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
log.debug("getLookupRuntimeProvider Entry");

LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());

Expand Down Expand Up @@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
PollingClientFactory<RowData> pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
// In line with the JDBC implementation and current requirements, we are only
// supporting Partial Caching for synchronous operations.
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}

protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
PollingClientFactory<RowData> pollingClientFactory) {
if (lookupConfig.isUseAsync()) {
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
new AsyncHttpTableLookupFunction(dataLookupFunction));
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
CachingHttpTableLookupFunction dataLookupFunction =
new CachingHttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig,
cache
);
if (cache != null) {
log.debug("PartialCachingLookupProvider; cache = " + cache);
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
log.debug("Using LookupFunctionProvider.");
return LookupFunctionProvider.of(dataLookupFunction);
}
}
}

Expand All @@ -118,7 +144,8 @@ public DynamicTableSource copy() {
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableFactoryContext
dynamicTableFactoryContext,
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -15,6 +16,9 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -47,7 +51,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readableConfig = helper.getOptions();
ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
Expand All @@ -61,7 +65,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.FORMAT
);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

Expand All @@ -72,7 +76,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableContext
dynamicTableContext,
getLookupCache(readable)
);
}

Expand All @@ -88,7 +93,16 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD);
return Set.of(
URL_ARGS,
ASYNC_POLLING,
LOOKUP_METHOD,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.MAX_RETRIES);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
Expand All @@ -105,6 +119,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
// Do not support legacy cache options
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}

// TODO verify this since we are on 1.15 now.
// Backport from Flink 1.15-Master
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {
Expand Down
Loading

0 comments on commit 28ec114

Please sign in to comment.