-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP-63 caching for sync lookups #94
Conversation
@kristoffSC @grzegorz8 the 1.18.1 job is failing with
This looks like an infrastructure issue - as the build is successful then there is a cat error. |
src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
Outdated
Show resolved
Hide resolved
src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
Outdated
Show resolved
Hide resolved
@kristoffSC @grzegorz8 it looks like the build is successful but fails - is the machine running out of disk space / lost access to something maybe? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor inline comments.
...java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java
Outdated
Show resolved
Hide resolved
src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
Outdated
Show resolved
Hide resolved
...java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java
Outdated
Show resolved
Hide resolved
I will take a look why build fails. |
@AdrianVasiliu thank you very much for your detailed review, I really appreciate it. Great feedback :-) |
@kristoffSC a gentle nudge, we are looking to adopt this change, any news on the build issue? |
Hi @davidradl, we merged changes related to the build issue. Could you please rebase your commit? |
@MarekMaj thanks for making this change - it works for 2 of the releases bu not for 1.18.1, where I now see the CI fail with error
} I suspect this is related to your recent jakoco commit |
@davidradl indeed, for PR created from forked repositories it caused problems due to insufficient permissions. I tested build again using forked repo, please rebase your code again:) |
Signed-off-by: David Radley <[email protected]>
|
||
private transient PollingClient<RowData> client; | ||
|
||
private LookupCache cache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache is not used in this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, seems like you would like to use it in lookup(RowData keyRow)
method @davidradl
Also, the cache
field can be declared as final.
src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
Show resolved
Hide resolved
cache | ||
); | ||
if (cache != null) { | ||
log.debug("PartialCachingLookupProvider; cache = " + cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use logger placeholders like described in another comment
*/ | ||
@Override | ||
public Collection<RowData> lookup(RowData keyRow) throws IOException { | ||
log.debug("lookup=" + lookupRow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use logger placeholders instead of explicit string concatenation to avoid calling toString method when logger is disabled for the given logging level log.debug("lookupRow = {}", lookupRow);
Optional<RowData> rowData= client.pull(keyRow); | ||
List<RowData> result = new ArrayList<>(); | ||
rowData.ifPresent(row -> { result.add(row); }); | ||
log.debug("lookup result=" + result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use logger placeholders like described in another comment
|
||
private transient PollingClient<RowData> client; | ||
|
||
private LookupCache cache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, seems like you would like to use it in lookup(RowData keyRow)
method @davidradl
Also, the cache
field can be declared as final.
context | ||
.getMetricGroup() | ||
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remember to call cache.open(...)
here
localHttpCallCounter.incrementAndGet(); | ||
Optional<RowData> rowData= client.pull(keyRow); | ||
List<RowData> result = new ArrayList<>(); | ||
rowData.ifPresent(row -> { result.add(row); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
rowData.ifPresent(result::add);
lookupConfig, | ||
cache | ||
); | ||
if (cache != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super happy that we can pass null
as cache to HttpLookupSource
. Plus if cache
is null you still create an instance of CachingHttpTableLookupFunction
.
I'm also thinking whether cache
should be passed to HttpLookupTableSource
We have HttpLookupConfig
that contains entire config (via ReadableConfig
) that is already passed to HttpLookupTableSource. You could move getLookupCache
from HttpLookupTableSourceFactory
to HttpLookupTableSource
and create cache there.
Going step futher, you could have two implementations for HttpLookupTableSource
-> cached and noCache, build a factory class for it and move getLookupCache
to that factory
@kristoffSC @MarekMaj hope to get to this this week. Thanks for your feedback. |
Hey @davidradl! Do you plan to continue working on the PR anytime soon? I can take over the PR if you are busy right now. |
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; | ||
|
||
@Slf4j | ||
public class CachingHttpTableLookupFunction extends LookupFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need this class at all. For example in flink-connector-jdbc the same lookup function is used for caching and non-caching approach: https://github.com/apache/flink-connector-jdbc/blob/v3.2/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java#L96-L121
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to the comment #94 (comment)
Done in #109 |
Description
Introduce caching similar to the Flink JDBC connector, so long running synchronous Flink jobs can cache content resulting in less API calls.
Resolves #63
PR Checklist