-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix(plugin-arrow): Handle restricted output columns in Arrow Page Source #26175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,6 +166,30 @@ public void testDescribeUnknownTable() | |
| assertEquals(actualRows, expectedRows); | ||
| } | ||
|
|
||
| @Test | ||
| public void testQueryFunctionWithRestrictedColumns() | ||
| { | ||
| assertQuery("SELECT NAME FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME FROM nation WHERE NATIONKEY = 4"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testQueryFunctionWithoutRestrictedColumns() | ||
| { | ||
| assertQuery("SELECT NATIONKEY, NAME FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NATIONKEY, NAME FROM nation WHERE NATIONKEY = 4"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testQueryFunctionWithDifferentColumnOrder() | ||
| { | ||
| assertQuery("SELECT NAME, NATIONKEY FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME, NATIONKEY FROM nation WHERE NATIONKEY = 4"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a test that would reverse the order of the columns? e.g. Also a negative test where the output column is not present in the TVF? like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added negative test |
||
| } | ||
|
|
||
| @Test | ||
| public void testQueryFunctionWithInvalidColumn() | ||
| { | ||
| assertQueryFails("SELECT NAME, NATIONKEY, INVALID_COLUMN FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "Column 'invalid_column' cannot be resolved", true); | ||
| } | ||
|
|
||
| private LocalDate getDate(String dateString) | ||
| { | ||
| DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.plugin.arrow.testingConnector; | ||
|
|
||
| import com.facebook.presto.common.type.BigintType; | ||
| import com.facebook.presto.common.type.BooleanType; | ||
| import com.facebook.presto.common.type.DateType; | ||
| import com.facebook.presto.common.type.DoubleType; | ||
| import com.facebook.presto.common.type.IntegerType; | ||
| import com.facebook.presto.common.type.RealType; | ||
| import com.facebook.presto.common.type.SmallintType; | ||
| import com.facebook.presto.common.type.TimeType; | ||
| import com.facebook.presto.common.type.TimestampType; | ||
| import com.facebook.presto.common.type.Type; | ||
| import com.facebook.presto.spi.PrestoException; | ||
|
|
||
| import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; | ||
| import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; | ||
|
|
||
| public final class PrimitiveToPrestoTypeMappings | ||
| { | ||
| private PrimitiveToPrestoTypeMappings() | ||
| { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| public static Type fromPrimitiveToPrestoType(String dataType) | ||
| { | ||
| switch (dataType) { | ||
elbinpallimalilibm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case "INTEGER": | ||
| return IntegerType.INTEGER; | ||
| case "VARCHAR": | ||
| return createUnboundedVarcharType(); | ||
| case "DOUBLE": | ||
| return DoubleType.DOUBLE; | ||
| case "SMALLINT": | ||
| return SmallintType.SMALLINT; | ||
| case "BOOLEAN": | ||
| return BooleanType.BOOLEAN; | ||
| case "TIMESTAMP": | ||
| return TimestampType.TIMESTAMP; | ||
| case "TIME": | ||
| return TimeType.TIME; | ||
| case "REAL": | ||
| return RealType.REAL; | ||
| case "DATE": | ||
| return DateType.DATE; | ||
| case "BIGINT": | ||
| return BigintType.BIGINT; | ||
| } | ||
| throw new PrestoException(NOT_SUPPORTED, "Unsupported datatype '" + dataType + "' in the selected table."); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.plugin.arrow.testingConnector; | ||
|
|
||
| import com.facebook.plugin.arrow.ArrowConnector; | ||
| import com.facebook.presto.spi.connector.ConnectorMetadata; | ||
| import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; | ||
| import com.facebook.presto.spi.connector.ConnectorSplitManager; | ||
| import com.facebook.presto.spi.function.table.ConnectorTableFunction; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import com.google.inject.Inject; | ||
| import org.apache.arrow.memory.BufferAllocator; | ||
|
|
||
| import java.util.Set; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class TestingArrowConnector | ||
| extends ArrowConnector | ||
| { | ||
| private final Set<ConnectorTableFunction> connectorTableFunctions; | ||
|
|
||
| @Inject | ||
| public TestingArrowConnector(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorPageSourceProvider pageSourceProvider, Set<ConnectorTableFunction> connectorTableFunctions, BufferAllocator allocator) | ||
| { | ||
| super(metadata, splitManager, pageSourceProvider, allocator); | ||
| this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null")); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<ConnectorTableFunction> getTableFunctions() | ||
| { | ||
| return connectorTableFunctions; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,12 +143,25 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin | |
| public FlightDescriptor getFlightDescriptorForTableScan(ConnectorSession session, ArrowTableLayoutHandle tableLayoutHandle) | ||
| { | ||
| ArrowTableHandle tableHandle = tableLayoutHandle.getTable(); | ||
| String query = new TestingArrowQueryBuilder().buildSql( | ||
| tableHandle.getSchema(), | ||
| tableHandle.getTable(), | ||
| tableLayoutHandle.getColumnHandles(), ImmutableMap.of(), | ||
| tableLayoutHandle.getTupleDomain()); | ||
| TestingArrowFlightRequest request = TestingArrowFlightRequest.createQueryRequest(tableHandle.getSchema(), tableHandle.getTable(), query); | ||
|
|
||
| String query; | ||
| String table; | ||
|
|
||
| if (tableHandle instanceof TestingQueryArrowTableHandle) { | ||
| TestingQueryArrowTableHandle testingQueryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle; | ||
| query = testingQueryArrowTableHandle.getQuery(); | ||
| table = null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't you just use tableHandle.getTable() in this case too and remove
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The request to Flight server shouldn't include a table, if the request contains a query. Table name is irrelevant in case of sending a query defined in the TVF
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a big deal, but the case below has a query and table set too
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. table should be set null for the case below as well. But |
||
| } | ||
| else { | ||
| query = new TestingArrowQueryBuilder().buildSql( | ||
| tableHandle.getSchema(), | ||
| tableHandle.getTable(), | ||
| tableLayoutHandle.getColumnHandles(), ImmutableMap.of(), | ||
| tableLayoutHandle.getTupleDomain()); | ||
| table = tableHandle.getTable(); | ||
| } | ||
|
|
||
| TestingArrowFlightRequest request = TestingArrowFlightRequest.createQueryRequest(tableHandle.getSchema(), table, query); | ||
| return FlightDescriptor.command(requestCodec.toBytes(request)); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.plugin.arrow.testingConnector; | ||
|
|
||
| import com.facebook.plugin.arrow.ArrowBlockBuilder; | ||
| import com.facebook.plugin.arrow.ArrowColumnHandle; | ||
| import com.facebook.plugin.arrow.ArrowFlightConfig; | ||
| import com.facebook.plugin.arrow.ArrowMetadata; | ||
| import com.facebook.plugin.arrow.BaseArrowFlightClientHandler; | ||
| import com.facebook.plugin.arrow.testingConnector.tvf.QueryFunctionProvider; | ||
| import com.facebook.presto.spi.ColumnHandle; | ||
| import com.facebook.presto.spi.ColumnMetadata; | ||
| import com.facebook.presto.spi.ConnectorSession; | ||
| import com.facebook.presto.spi.ConnectorTableHandle; | ||
| import com.facebook.presto.spi.ConnectorTableMetadata; | ||
| import com.facebook.presto.spi.SchemaTableName; | ||
| import com.facebook.presto.spi.connector.TableFunctionApplicationResult; | ||
| import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; | ||
| import jakarta.inject.Inject; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class TestingArrowMetadata | ||
| extends ArrowMetadata | ||
| { | ||
| @Inject | ||
| public TestingArrowMetadata(BaseArrowFlightClientHandler clientHandler, ArrowBlockBuilder arrowBlockBuilder, ArrowFlightConfig config) | ||
| { | ||
| super(clientHandler, arrowBlockBuilder, config); | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) | ||
| { | ||
| if (handle instanceof QueryFunctionProvider.QueryFunctionHandle) { | ||
| QueryFunctionProvider.QueryFunctionHandle functionHandle = (QueryFunctionProvider.QueryFunctionHandle) handle; | ||
| return Optional.of(new TableFunctionApplicationResult<>(functionHandle.getTableHandle(), new ArrayList<>(functionHandle.getTableHandle().getColumns()))); | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) | ||
| { | ||
| if (tableHandle instanceof TestingQueryArrowTableHandle) { | ||
| TestingQueryArrowTableHandle queryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle; | ||
| return queryArrowTableHandle.getColumns().stream().collect(Collectors.toMap(c -> normalizeIdentifier(session, c.getColumnName()), c -> c)); | ||
| } | ||
| else { | ||
| return super.getColumnHandles(session, tableHandle); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) | ||
| { | ||
| if (tableHandle instanceof TestingQueryArrowTableHandle) { | ||
| TestingQueryArrowTableHandle queryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle; | ||
|
|
||
| List<ColumnMetadata> meta = new ArrayList<>(); | ||
| for (ArrowColumnHandle columnHandle : queryArrowTableHandle.getColumns()) { | ||
| meta.add(ColumnMetadata.builder().setName(normalizeIdentifier(session, columnHandle.getColumnName())).setType(columnHandle.getColumnType()).build()); | ||
| } | ||
| return new ConnectorTableMetadata(new SchemaTableName(queryArrowTableHandle.getSchema(), queryArrowTableHandle.getTable()), meta); | ||
| } | ||
| else { | ||
| return super.getTableMetadata(session, tableHandle); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.plugin.arrow.testingConnector; | ||
|
|
||
| import com.facebook.plugin.arrow.ArrowColumnHandle; | ||
| import com.facebook.plugin.arrow.ArrowTableHandle; | ||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
| public class TestingQueryArrowTableHandle | ||
| extends ArrowTableHandle | ||
| { | ||
| private final String query; | ||
| private final List<ArrowColumnHandle> columns; | ||
|
|
||
| @JsonCreator | ||
| public TestingQueryArrowTableHandle(String query, List<ArrowColumnHandle> columns) | ||
| { | ||
| super("schema-" + UUID.randomUUID(), "table-" + UUID.randomUUID()); | ||
| this.columns = Collections.unmodifiableList(requireNonNull(columns)); | ||
| this.query = requireNonNull(query); | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public String getQuery() | ||
| { | ||
| return query; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public List<ArrowColumnHandle> getColumns() | ||
| { | ||
| return columns; | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.