Skip to content

Commit

Permalink
HTTP-83 Support SQL queries with project pushdown after a lookup join
Browse files Browse the repository at this point in the history
Signed-off-by: Olivier Zembri <<[email protected]>>
  • Loading branch information
OlivierZembri committed Mar 28, 2024
1 parent dacf108 commit 05c7db6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased]

### Added

- Added support for using the result of a lookup join operation in a subsequent select query that adds
or removes columns (project pushdown operation).

### Fixed

Moved junit support to junit 5, allowing junits to be run against flink 1.17 and 1.18.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.DataTypes.Field;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -38,7 +39,7 @@
public class HttpLookupTableSource
implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {

private final DataType physicalRowDataType;
private DataType physicalRowDataType;

private final HttpLookupConfig lookupConfig;

Expand All @@ -58,6 +59,11 @@ public HttpLookupTableSource(
this.dynamicTableFactoryContext = dynamicTablecontext;
}

@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {

Expand Down Expand Up @@ -127,7 +133,7 @@ public void applyLimit(long limit) {

@Override
public boolean supportsNestedProjection() {
return false;
return true;
}

private PollingClientFactory<RowData> createPollingClientFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,83 @@ public void testHttpsMTlsLookupJoin() throws Exception {
assertEnrichedRows(rows);
}

@Test
public void testLookupJoinProjectPushDown() throws Exception {

// GIVEN
setUpServerBodyStub(
"POST",
wireMockServer,
List.of(
matchingJsonPath("$.row.aStringColumn"),
matchingJsonPath("$.row.anIntColumn"),
matchingJsonPath("$.row.aFloatColumn")
)
);

String fields =
"`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n";

String sourceTable =
"CREATE TABLE Orders (\n"
+ " proc_time AS PROCTIME(),\n"
+ " id STRING,\n"
+ fields
+ ") WITH ("
+ "'connector' = 'datagen',"
+ "'rows-per-second' = '1',"
+ "'fields.id.kind' = 'sequence',"
+ "'fields.id.start' = '1',"
+ "'fields.id.end' = '5'"
+ ")";

String lookupTable =
"CREATE TABLE Customers (\n" +
" `enrichedInt` INT,\n" +
" `enrichedString` STRING,\n" +
" \n"
+ fields
+ ") WITH ("
+ "'format' = 'json',"
+ "'lookup-request.format' = 'json',"
+ "'lookup-request.format.json.fail-on-missing-field' = 'true',"
+ "'connector' = 'rest-lookup',"
+ "'lookup-method' = 'POST',"
+ "'url' = 'http://localhost:9090/client',"
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ "'asyncPolling' = 'true'"
+ ")";

tEnv.executeSql(sourceTable);
tEnv.executeSql(lookupTable);

// WHEN
// SQL query that performs JOIN on both tables.
String joinQuery =
"CREATE TEMPORARY VIEW lookupResult AS " +
"SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o"
+ " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c"
+ " ON (\n"
+ " o.`row` = c.`row`\n"
+ ")";

tEnv.executeSql(joinQuery);

// SQL query that performs a project pushdown to limit the number of columns
String lastQuery =
"SELECT r.id, r.enrichedInt FROM lookupResult r;";

TableResult result = tEnv.executeSql(lastQuery);
result.await(15, TimeUnit.SECONDS);

// THEN
SortedSet<Row> collectedRows = getCollectedRows(result);

collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(2));

assertThat(collectedRows.size()).isEqualTo(5);
}

@Test
public void testLookupJoinOnRowType() throws Exception {

Expand Down

0 comments on commit 05c7db6

Please sign in to comment.