From 05c7db6cf41c2a3108361108bd67e8a3d32a17d2 Mon Sep 17 00:00:00 2001 From: Olivier Zembri Date: Thu, 28 Mar 2024 16:04:26 +0100 Subject: [PATCH] HTTP-83 Support SQL queries with project pushdown after a lookup join Signed-off-by: Olivier Zembri <> --- CHANGELOG.md | 5 ++ .../table/lookup/HttpLookupTableSource.java | 10 ++- .../HttpLookupTableSourceITCaseTest.java | 77 +++++++++++++++++++ 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b555b9..c425166e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## [Unreleased] +### Added + +- Added support for using the result of a lookup join operation in a subsequent select query that adds + or removes columns (project pushdown operation). + ### Fixed Moved junit support to junit 5, allowing junits to be run against flink 1.17 and 1.18. diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index 9527f80f..5e688ed5 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -8,6 +8,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -38,7 +39,7 @@ public class HttpLookupTableSource implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown { - private final DataType physicalRowDataType; + private DataType physicalRowDataType; private final HttpLookupConfig lookupConfig; @@ -58,6 +59,11 @@ public HttpLookupTableSource( this.dynamicTableFactoryContext = dynamicTablecontext; } + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + } + @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { @@ -127,7 +133,7 @@ public void applyLimit(long limit) { @Override public boolean supportsNestedProjection() { - return false; + return true; } private PollingClientFactory createPollingClientFactory( diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java index 2994fbd4..03a51420 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java @@ -243,6 +243,83 @@ public void testHttpsMTlsLookupJoin() throws Exception { assertEnrichedRows(rows); } + @Test + public void testLookupJoinProjectPushDown() throws Exception { + + // GIVEN + setUpServerBodyStub( + "POST", + wireMockServer, + List.of( + matchingJsonPath("$.row.aStringColumn"), + matchingJsonPath("$.row.anIntColumn"), + matchingJsonPath("$.row.aFloatColumn") + ) + ); + + String fields = + "`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n"; + + String sourceTable = + "CREATE TABLE Orders (\n" + + " proc_time AS PROCTIME(),\n" + + " id STRING,\n" + + fields + + ") WITH (" + + "'connector' = 'datagen'," + + "'rows-per-second' = '1'," + + "'fields.id.kind' = 'sequence'," + + "'fields.id.start' = '1'," + + "'fields.id.end' = '5'" + + ")"; + + String lookupTable = + "CREATE TABLE Customers (\n" + + " `enrichedInt` INT,\n" + + " `enrichedString` STRING,\n" + + " \n" + + fields + + ") WITH (" + + "'format' = 'json'," + + "'lookup-request.format' = 'json'," + + "'lookup-request.format.json.fail-on-missing-field' = 'true'," + + "'connector' = 'rest-lookup'," + + "'lookup-method' = 'POST'," + + "'url' = 'http://localhost:9090/client'," + + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'," + + "'asyncPolling' = 'true'" + + ")"; + + tEnv.executeSql(sourceTable); + tEnv.executeSql(lookupTable); + + // WHEN + // SQL query that performs JOIN on both tables. + String joinQuery = + "CREATE TEMPORARY VIEW lookupResult AS " + + "SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o" + + " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c" + + " ON (\n" + + " o.`row` = c.`row`\n" + + ")"; + + tEnv.executeSql(joinQuery); + + // SQL query that performs a project pushdown to limit the number of columns + String lastQuery = + "SELECT r.id, r.enrichedInt FROM lookupResult r;"; + + TableResult result = tEnv.executeSql(lastQuery); + result.await(15, TimeUnit.SECONDS); + + // THEN + SortedSet collectedRows = getCollectedRows(result); + + collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(2)); + + assertThat(collectedRows.size()).isEqualTo(5); + } + @Test public void testLookupJoinOnRowType() throws Exception {