Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-63 caching for sync lookups #94

Closed
wants to merge 1 commit into from

Conversation

davidradl
Copy link
Contributor

@davidradl davidradl commented May 8, 2024

Description

Introduce caching similar to the Flink JDBC connector, so long running synchronous Flink jobs can cache content resulting in less API calls.

Resolves #63

PR Checklist

@davidradl davidradl mentioned this pull request May 8, 2024
2 tasks
@davidradl davidradl marked this pull request as ready for review May 9, 2024 14:52
@davidradl
Copy link
Contributor Author

davidradl commented May 9, 2024

@kristoffSC @grzegorz8 the 1.18.1 job is failing with

14:50:32,522 [INFO] ------------------------------------------------------------------------
14:50:32,522 [INFO] BUILD SUCCESS
14:50:32,522 [INFO] ------------------------------------------------------------------------
14:50:32,523 [INFO] Total time:  02:01 min
14:50:32,523 [INFO] Finished at: 2024-05-09T14:50:32Z
14:50:32,523 [INFO] ------------------------------------------------------------------------
cat: target/site/jacoco/index.html: No such file or directory

Error: Process completed with exit code 1.

This looks like an infrastructure issue - as the build is successful then there is a cat error.

@kristoffSC kristoffSC changed the title HTTP63 caching for sync lookups HTTP-63 caching for sync lookups May 10, 2024
@kristoffSC kristoffSC self-requested a review May 10, 2024 07:30
@kristoffSC kristoffSC self-assigned this May 10, 2024
@kristoffSC kristoffSC added the enhancement New feature or request label May 10, 2024
@davidradl
Copy link
Contributor Author

@kristoffSC @grzegorz8 it looks like the build is successful but fails - is the machine running out of disk space / lost access to something maybe?

Copy link
Contributor

@AdrianVasiliu AdrianVasiliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor inline comments.

@kristoffSC
Copy link
Collaborator

I will take a look why build fails.

@davidradl
Copy link
Contributor Author

@AdrianVasiliu thank you very much for your detailed review, I really appreciate it. Great feedback :-)

@davidradl
Copy link
Contributor Author

@kristoffSC a gentle nudge, we are looking to adopt this change, any news on the build issue?

@MarekMaj
Copy link
Contributor

MarekMaj commented Jun 7, 2024

Hi @davidradl, we merged changes related to the build issue. Could you please rebase your commit?

@davidradl
Copy link
Contributor Author

davidradl commented Jun 10, 2024

@MarekMaj thanks for making this change - it works for 2 of the releases bu not for 1.18.1, where I now see the CI fail with error

Run madrapps/[email protected]
Event is pull_request
base sha: 9bfbb4c8a1b722eac8dd1f1b1b71b1fcbf1f02ce
head sha: 18f2d82ac13333344bacd4a02e7e2f486a649ae6
node:internal/process/promises:279
            triggerUncaughtException(err, true /* fromPromise */);
            ^

[Error: ENOENT: no such file or directory, open '/home/runner/work/flink-http-connector/flink-http-connector/target/site/jacoco/jacoco.xml'] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: '/home/runner/work/flink-http-connector/flink-http-connector/target/site/jacoco/jacoco.xml'

}

I suspect this is related to your recent jakoco commit

@MarekMaj
Copy link
Contributor

@davidradl indeed, for PR created from forked repositories it caused problems due to insufficient permissions. I tested build again using forked repo, please rebase your code again:)


private transient PollingClient<RowData> client;

private LookupCache cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache is not used in this function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, seems like you would like to use it in lookup(RowData keyRow) method @davidradl
Also, the cache field can be declared as final.

cache
);
if (cache != null) {
log.debug("PartialCachingLookupProvider; cache = " + cache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders like described in another comment

*/
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
log.debug("lookup=" + lookupRow);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders instead of explicit string concatenation to avoid calling toString method when logger is disabled for the given logging level log.debug("lookupRow = {}", lookupRow);

Optional<RowData> rowData= client.pull(keyRow);
List<RowData> result = new ArrayList<>();
rowData.ifPresent(row -> { result.add(row); });
log.debug("lookup result=" + result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger placeholders like described in another comment


private transient PollingClient<RowData> client;

private LookupCache cache;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, seems like you would like to use it in lookup(RowData keyRow) method @davidradl
Also, the cache field can be declared as final.

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to call cache.open(...) here

localHttpCallCounter.incrementAndGet();
Optional<RowData> rowData= client.pull(keyRow);
List<RowData> result = new ArrayList<>();
rowData.ifPresent(row -> { result.add(row); });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
rowData.ifPresent(result::add);

lookupConfig,
cache
);
if (cache != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy that we can pass null as cache to HttpLookupSource. Plus if cache is null you still create an instance of CachingHttpTableLookupFunction.

I'm also thinking whether cache should be passed to HttpLookupTableSource
We have HttpLookupConfig that contains entire config (via ReadableConfig) that is already passed to HttpLookupTableSource. You could move getLookupCache from HttpLookupTableSourceFactory to HttpLookupTableSource and create cache there.

Going step futher, you could have two implementations for HttpLookupTableSource -> cached and noCache, build a factory class for it and move getLookupCache to that factory

@davidradl
Copy link
Contributor Author

@kristoffSC @MarekMaj hope to get to this this week. Thanks for your feedback.

@grzegorz8
Copy link
Member

@kristoffSC @MarekMaj hope to get to this this week. Thanks for your feedback.

Hey @davidradl! Do you plan to continue working on the PR anytime soon? I can take over the PR if you are busy right now.

import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
public class CachingHttpTableLookupFunction extends LookupFunction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need this class at all. For example in flink-connector-jdbc the same lookup function is used for caching and non-caching approach: https://github.com/apache/flink-connector-jdbc/blob/v3.2/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java#L96-L121

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to the comment #94 (comment)

@grzegorz8 grzegorz8 mentioned this pull request Jul 29, 2024
2 tasks
@kristoffSC
Copy link
Collaborator

Done in #109

@kristoffSC kristoffSC closed this Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Cache in Lookup Http Source
5 participants