-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP-63 caching for sync lookups #94
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package com.getindata.connectors.http.internal.table.lookup; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.Getter; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.flink.annotation.VisibleForTesting; | ||
import org.apache.flink.api.common.serialization.DeserializationSchema; | ||
import org.apache.flink.table.connector.source.lookup.cache.LookupCache; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.functions.FunctionContext; | ||
import org.apache.flink.table.functions.LookupFunction; | ||
|
||
import com.getindata.connectors.http.internal.PollingClient; | ||
import com.getindata.connectors.http.internal.PollingClientFactory; | ||
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; | ||
|
||
@Slf4j | ||
public class CachingHttpTableLookupFunction extends LookupFunction { | ||
private final PollingClientFactory<RowData> pollingClientFactory; | ||
|
||
private final DeserializationSchema<RowData> responseSchemaDecoder; | ||
|
||
@VisibleForTesting | ||
@Getter(AccessLevel.PACKAGE) | ||
private final LookupRow lookupRow; | ||
|
||
@VisibleForTesting | ||
@Getter(AccessLevel.PACKAGE) | ||
private final HttpLookupConfig options; | ||
|
||
private transient AtomicInteger localHttpCallCounter; | ||
|
||
private transient PollingClient<RowData> client; | ||
|
||
private LookupCache cache; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cache is not used in this function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, seems like you would like to use it in |
||
|
||
public CachingHttpTableLookupFunction( | ||
PollingClientFactory<RowData> pollingClientFactory, | ||
DeserializationSchema<RowData> responseSchemaDecoder, | ||
LookupRow lookupRow, | ||
HttpLookupConfig options, | ||
LookupCache cache) { | ||
|
||
this.pollingClientFactory = pollingClientFactory; | ||
this.responseSchemaDecoder = responseSchemaDecoder; | ||
this.lookupRow = lookupRow; | ||
this.options = options; | ||
this.cache = cache; | ||
} | ||
|
||
@Override | ||
public void open(FunctionContext context) throws Exception { | ||
super.open(context); | ||
|
||
this.responseSchemaDecoder.open( | ||
SerializationSchemaUtils | ||
.createDeserializationInitContext(CachingHttpTableLookupFunction.class)); | ||
|
||
this.localHttpCallCounter = new AtomicInteger(0); | ||
this.client = pollingClientFactory | ||
.createPollClient(options, responseSchemaDecoder); | ||
|
||
context | ||
.getMetricGroup() | ||
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remember to call |
||
|
||
/** | ||
* This is a lookup method which is called by Flink framework at runtime. | ||
*/ | ||
@Override | ||
public Collection<RowData> lookup(RowData keyRow) throws IOException { | ||
log.debug("lookup=" + lookupRow); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use logger placeholders instead of explicit string concatenation to avoid calling toString method when logger is disabled for the given logging level |
||
localHttpCallCounter.incrementAndGet(); | ||
Optional<RowData> rowData= client.pull(keyRow); | ||
List<RowData> result = new ArrayList<>(); | ||
rowData.ifPresent(row -> { result.add(row); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
log.debug("lookup result=" + result); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use logger placeholders like described in another comment |
||
return result; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,11 @@ | |
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; | ||
import org.apache.flink.table.connector.source.DynamicTableSource; | ||
import org.apache.flink.table.connector.source.LookupTableSource; | ||
import org.apache.flink.table.connector.source.TableFunctionProvider; | ||
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; | ||
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; | ||
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; | ||
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; | ||
import org.apache.flink.table.connector.source.lookup.cache.LookupCache; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.factories.DynamicTableFactory; | ||
import org.apache.flink.table.factories.FactoryUtil; | ||
|
@@ -46,17 +48,20 @@ public class HttpLookupTableSource | |
private final DynamicTableFactory.Context dynamicTableFactoryContext; | ||
|
||
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; | ||
private final LookupCache cache; | ||
|
||
public HttpLookupTableSource( | ||
DataType physicalRowDataType, | ||
HttpLookupConfig lookupConfig, | ||
DecodingFormat<DeserializationSchema<RowData>> decodingFormat, | ||
DynamicTableFactory.Context dynamicTablecontext) { | ||
DynamicTableFactory.Context dynamicTablecontext, | ||
LookupCache cache) { | ||
|
||
this.physicalRowDataType = physicalRowDataType; | ||
this.lookupConfig = lookupConfig; | ||
this.decodingFormat = decodingFormat; | ||
this.dynamicTableFactoryContext = dynamicTablecontext; | ||
this.cache = cache; | ||
} | ||
|
||
@Override | ||
|
@@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType) | |
|
||
@Override | ||
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { | ||
log.debug("getLookupRuntimeProvider Entry"); | ||
|
||
LookupRow lookupRow = extractLookupRow(lookupContext.getKeys()); | ||
|
||
|
@@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex | |
PollingClientFactory<RowData> pollingClientFactory = | ||
createPollingClientFactory(lookupQueryCreator, lookupConfig); | ||
|
||
HttpTableLookupFunction dataLookupFunction = | ||
new HttpTableLookupFunction( | ||
pollingClientFactory, | ||
responseSchemaDecoder, | ||
lookupRow, | ||
lookupConfig | ||
); | ||
// In line with the JDBC implementation and current requirements, we are only | ||
// supporting Partial Caching for synchronous operations. | ||
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory); | ||
} | ||
|
||
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow, | ||
DeserializationSchema<RowData> responseSchemaDecoder, | ||
PollingClientFactory<RowData> pollingClientFactory) { | ||
if (lookupConfig.isUseAsync()) { | ||
HttpTableLookupFunction dataLookupFunction = | ||
new HttpTableLookupFunction( | ||
pollingClientFactory, | ||
responseSchemaDecoder, | ||
lookupRow, | ||
lookupConfig | ||
); | ||
log.info("Using Async version of HttpLookupTable."); | ||
return AsyncTableFunctionProvider.of( | ||
new AsyncHttpTableLookupFunction(dataLookupFunction)); | ||
} else { | ||
log.info("Using blocking version of HttpLookupTable."); | ||
return TableFunctionProvider.of(dataLookupFunction); | ||
CachingHttpTableLookupFunction dataLookupFunction = | ||
davidradl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
new CachingHttpTableLookupFunction( | ||
pollingClientFactory, | ||
responseSchemaDecoder, | ||
lookupRow, | ||
lookupConfig, | ||
cache | ||
); | ||
if (cache != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not super happy that we can pass I'm also thinking whether Going step futher, you could have two implementations for |
||
log.debug("PartialCachingLookupProvider; cache = " + cache); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use logger placeholders like described in another comment |
||
return PartialCachingLookupProvider.of(dataLookupFunction, cache); | ||
} else { | ||
log.debug("Using LookupFunctionProvider."); | ||
return LookupFunctionProvider.of(dataLookupFunction); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -118,7 +144,8 @@ public DynamicTableSource copy() { | |
physicalRowDataType, | ||
lookupConfig, | ||
decodingFormat, | ||
dynamicTableFactoryContext | ||
dynamicTableFactoryContext, | ||
cache | ||
); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need this class at all. For example in flink-connector-jdbc the same lookup function is used for caching and non-caching approach: https://github.com/apache/flink-connector-jdbc/blob/v3.2/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java#L96-L121
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to the comment #94 (comment)