Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-63 caching for sync lookups #94

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
is used instead.

- Added support for caching of synchronous lookup joins.

## [0.13.0] - 2024-04-03

### Added
Expand Down
61 changes: 42 additions & 19 deletions README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
Expand Down Expand Up @@ -296,7 +297,8 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>

<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
public class CachingHttpTableLookupFunction extends LookupFunction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need this class at all. For example in flink-connector-jdbc the same lookup function is used for caching and non-caching approach: https://github.com/apache/flink-connector-jdbc/blob/v3.2/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java#L96-L121

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to the comment #94 (comment)

private final PollingClientFactory<RowData> pollingClientFactory;

private final DeserializationSchema<RowData> responseSchemaDecoder;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final LookupRow lookupRow;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final HttpLookupConfig options;

private transient AtomicInteger localHttpCallCounter;

private transient PollingClient<RowData> client;

private LookupCache cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache is not used in this function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, seems like you would like to use it in lookup(RowData keyRow) method @davidradl
Also, the cache field can be declared as final.


public CachingHttpTableLookupFunction(
PollingClientFactory<RowData> pollingClientFactory,
DeserializationSchema<RowData> responseSchemaDecoder,
LookupRow lookupRow,
HttpLookupConfig options,
LookupCache cache) {

this.pollingClientFactory = pollingClientFactory;
this.responseSchemaDecoder = responseSchemaDecoder;
this.lookupRow = lookupRow;
this.options = options;
this.cache = cache;
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(CachingHttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to call cache.open(...) here


/**
* This is a lookup method which is called by Flink framework at runtime.
*/
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
log.debug("lookup=" + lookupRow);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders instead of explicit string concatenation to avoid calling toString method when logger is disabled for the given logging level log.debug("lookupRow = {}", lookupRow);

localHttpCallCounter.incrementAndGet();
Optional<RowData> rowData= client.pull(keyRow);
List<RowData> result = new ArrayList<>();
rowData.ifPresent(row -> { result.add(row); });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
rowData.ifPresent(result::add);

log.debug("lookup result=" + result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders like described in another comment

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
Expand Down Expand Up @@ -46,17 +48,20 @@ public class HttpLookupTableSource
private final DynamicTableFactory.Context dynamicTableFactoryContext;

private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final LookupCache cache;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DynamicTableFactory.Context dynamicTablecontext) {
DynamicTableFactory.Context dynamicTablecontext,
LookupCache cache) {

this.physicalRowDataType = physicalRowDataType;
this.lookupConfig = lookupConfig;
this.decodingFormat = decodingFormat;
this.dynamicTableFactoryContext = dynamicTablecontext;
this.cache = cache;
}

@Override
Expand All @@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
log.debug("getLookupRuntimeProvider Entry");

LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());

Expand Down Expand Up @@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
PollingClientFactory<RowData> pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
// In line with the JDBC implementation and current requirements, we are only
// supporting Partial Caching for synchronous operations.
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}

protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
PollingClientFactory<RowData> pollingClientFactory) {
if (lookupConfig.isUseAsync()) {
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
CachingHttpTableLookupFunction dataLookupFunction =
new CachingHttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig,
cache
);
if (cache != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy that we can pass null as cache to HttpLookupSource. Plus if cache is null you still create an instance of CachingHttpTableLookupFunction.

I'm also thinking whether cache should be passed to HttpLookupTableSource
We have HttpLookupConfig that contains entire config (via ReadableConfig) that is already passed to HttpLookupTableSource. You could move getLookupCache from HttpLookupTableSourceFactory to HttpLookupTableSource and create cache there.

Going step futher, you could have two implementations for HttpLookupTableSource -> cached and noCache, build a factory class for it and move getLookupCache to that factory

log.debug("PartialCachingLookupProvider; cache = " + cache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders like described in another comment

return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
log.debug("Using LookupFunctionProvider.");
return LookupFunctionProvider.of(dataLookupFunction);
}
}
}

Expand All @@ -118,7 +144,8 @@ public DynamicTableSource copy() {
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableFactoryContext
dynamicTableFactoryContext,
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -15,6 +16,9 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readableConfig = helper.getOptions();
ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
Expand All @@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.FORMAT
);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

Expand All @@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableContext
dynamicTableContext,
getLookupCache(readable)
);
}

Expand All @@ -89,7 +94,18 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);

return Set.of(
URL_ARGS,
ASYNC_POLLING,
LOOKUP_METHOD,
REQUEST_CALLBACK_IDENTIFIER,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.MAX_RETRIES);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
Expand All @@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
// Do not support legacy cache options
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}

// TODO verify this since we are on 1.15 now.
// Backport from Flink 1.15-Master
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,23 @@ public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
public void eval(Object... keys) {
lookupByKeys(keys)
.ifPresent(this::collect);
.ifPresent(this::collect);
}

public Optional<RowData> lookupByKeys(Object[] keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public JavaNetHttpPollingClient(
@Override
public Optional<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata=" + lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
Expand Down
Loading
Loading