Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -97,16 +98,19 @@ public Page getNextPage()

// Create blocks from the loaded Arrow record batch
List<Block> blocks = new ArrayList<>();
List<FieldVector> vectors = flightStreamAndClient.getRoot().getFieldVectors();
for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) {
FieldVector vector = vectors.get(columnIndex);
Type type = columnHandles.get(columnIndex).getColumnType();
VectorSchemaRoot vectorSchemaRoot = flightStreamAndClient.getRoot();
for (ArrowColumnHandle columnHandle : columnHandles) {
// In scenarios where the user query contains a Table Valued Function, the output columns could be in a
// different order or could be a subset of the columns in the flight stream. So we are fetching the requested
// field vector by matching the column name instead of fetching by column index.
FieldVector vector = requireNonNull(vectorSchemaRoot.getVector(columnHandle.getColumnName()), "No field named " + columnHandle.getColumnName() + " in the list of vectors from flight stream");
Type type = columnHandle.getColumnType();
Block block = arrowBlockBuilder.buildBlockFromFieldVector(vector, type, flightStreamAndClient.getDictionaryProvider());
blocks.add(block);
}

if (logger.isDebugEnabled()) {
logger.debug("Read Arrow record batch with rows: %s, columns: %s", flightStreamAndClient.getRoot().getRowCount(), vectors.size());
logger.debug("Read Arrow record batch with rows: %s, columns: %s", flightStreamAndClient.getRoot().getRowCount(), vectorSchemaRoot.getFieldVectors().size());
}

return new Page(flightStreamAndClient.getRoot().getRowCount(), blocks.toArray(new Block[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ protected FeaturesConfig createFeaturesConfig()
return new FeaturesConfig().setNativeExecutionEnabled(true);
}

@Test
public void testQueryFunctionWithRestrictedColumns()
{
assertQuery("SELECT NAME FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME FROM nation WHERE NATIONKEY = 4");
}

@Test
public void testQueryFunctionWithoutRestrictedColumns() throws InterruptedException
{
assertQuery("SELECT NAME, NATIONKEY FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME, NATIONKEY FROM nation WHERE NATIONKEY = 4");
}

@Test
public void testFiltersAndProjections1()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,30 @@ public void testDescribeUnknownTable()
assertEquals(actualRows, expectedRows);
}

@Test
public void testQueryFunctionWithRestrictedColumns()
{
assertQuery("SELECT NAME FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME FROM nation WHERE NATIONKEY = 4");
}

@Test
public void testQueryFunctionWithoutRestrictedColumns()
{
assertQuery("SELECT NATIONKEY, NAME FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NATIONKEY, NAME FROM nation WHERE NATIONKEY = 4");
}

@Test
public void testQueryFunctionWithDifferentColumnOrder()
{
assertQuery("SELECT NAME, NATIONKEY FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME, NATIONKEY FROM nation WHERE NATIONKEY = 4");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that would reverse the order of the columns? e.g. SELECT NATIONKEY, NAME ...

Also a negative test where the output column is not present in the TVF? like SELECT FOO ..
This would fail during analysis right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added negative test

}

@Test
public void testQueryFunctionWithInvalidColumn()
{
assertQueryFails("SELECT NAME, NATIONKEY, INVALID_COLUMN FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "Column 'invalid_column' cannot be resolved", true);
}

private LocalDate getDate(String dateString)
{
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.plugin.arrow.testingConnector;

import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;

import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;

public final class PrimitiveToPrestoTypeMappings
{
private PrimitiveToPrestoTypeMappings()
{
throw new UnsupportedOperationException();
}

public static Type fromPrimitiveToPrestoType(String dataType)
{
switch (dataType) {
case "INTEGER":
return IntegerType.INTEGER;
case "VARCHAR":
return createUnboundedVarcharType();
case "DOUBLE":
return DoubleType.DOUBLE;
case "SMALLINT":
return SmallintType.SMALLINT;
case "BOOLEAN":
return BooleanType.BOOLEAN;
case "TIMESTAMP":
return TimestampType.TIMESTAMP;
case "TIME":
return TimeType.TIME;
case "REAL":
return RealType.REAL;
case "DATE":
return DateType.DATE;
case "BIGINT":
return BigintType.BIGINT;
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported datatype '" + dataType + "' in the selected table.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.plugin.arrow.testingConnector;

import com.facebook.plugin.arrow.ArrowConnector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import org.apache.arrow.memory.BufferAllocator;

import java.util.Set;

import static java.util.Objects.requireNonNull;

public class TestingArrowConnector
extends ArrowConnector
{
private final Set<ConnectorTableFunction> connectorTableFunctions;

@Inject
public TestingArrowConnector(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorPageSourceProvider pageSourceProvider, Set<ConnectorTableFunction> connectorTableFunctions, BufferAllocator allocator)
{
super(metadata, splitManager, pageSourceProvider, allocator);
this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null"));
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,25 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
public FlightDescriptor getFlightDescriptorForTableScan(ConnectorSession session, ArrowTableLayoutHandle tableLayoutHandle)
{
ArrowTableHandle tableHandle = tableLayoutHandle.getTable();
String query = new TestingArrowQueryBuilder().buildSql(
tableHandle.getSchema(),
tableHandle.getTable(),
tableLayoutHandle.getColumnHandles(), ImmutableMap.of(),
tableLayoutHandle.getTupleDomain());
TestingArrowFlightRequest request = TestingArrowFlightRequest.createQueryRequest(tableHandle.getSchema(), tableHandle.getTable(), query);

String query;
String table;

if (tableHandle instanceof TestingQueryArrowTableHandle) {
TestingQueryArrowTableHandle testingQueryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle;
query = testingQueryArrowTableHandle.getQuery();
table = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you just use tableHandle.getTable() in this case too and remove String table;?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request to Flight server shouldn't include a table, if the request contains a query. Table name is irrelevant in case of sending a query defined in the TVF

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a big deal, but the case below has a query and table set too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table should be set null for the case below as well. But TestingEchoFlightProducer fails with this change. This should be fixed but probably in a different PR.

}
else {
query = new TestingArrowQueryBuilder().buildSql(
tableHandle.getSchema(),
tableHandle.getTable(),
tableLayoutHandle.getColumnHandles(), ImmutableMap.of(),
tableLayoutHandle.getTupleDomain());
table = tableHandle.getTable();
}

TestingArrowFlightRequest request = TestingArrowFlightRequest.createQueryRequest(tableHandle.getSchema(), table, query);
return FlightDescriptor.command(requestCodec.toBytes(request));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.plugin.arrow.testingConnector;

import com.facebook.plugin.arrow.ArrowBlockBuilder;
import com.facebook.plugin.arrow.ArrowColumnHandle;
import com.facebook.plugin.arrow.ArrowFlightConfig;
import com.facebook.plugin.arrow.ArrowMetadata;
import com.facebook.plugin.arrow.BaseArrowFlightClientHandler;
import com.facebook.plugin.arrow.testingConnector.tvf.QueryFunctionProvider;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.TableFunctionApplicationResult;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import jakarta.inject.Inject;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class TestingArrowMetadata
extends ArrowMetadata
{
@Inject
public TestingArrowMetadata(BaseArrowFlightClientHandler clientHandler, ArrowBlockBuilder arrowBlockBuilder, ArrowFlightConfig config)
{
super(clientHandler, arrowBlockBuilder, config);
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
if (handle instanceof QueryFunctionProvider.QueryFunctionHandle) {
QueryFunctionProvider.QueryFunctionHandle functionHandle = (QueryFunctionProvider.QueryFunctionHandle) handle;
return Optional.of(new TableFunctionApplicationResult<>(functionHandle.getTableHandle(), new ArrayList<>(functionHandle.getTableHandle().getColumns())));
}
return Optional.empty();
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
if (tableHandle instanceof TestingQueryArrowTableHandle) {
TestingQueryArrowTableHandle queryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle;
return queryArrowTableHandle.getColumns().stream().collect(Collectors.toMap(c -> normalizeIdentifier(session, c.getColumnName()), c -> c));
}
else {
return super.getColumnHandles(session, tableHandle);
}
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
{
if (tableHandle instanceof TestingQueryArrowTableHandle) {
TestingQueryArrowTableHandle queryArrowTableHandle = (TestingQueryArrowTableHandle) tableHandle;

List<ColumnMetadata> meta = new ArrayList<>();
for (ArrowColumnHandle columnHandle : queryArrowTableHandle.getColumns()) {
meta.add(ColumnMetadata.builder().setName(normalizeIdentifier(session, columnHandle.getColumnName())).setType(columnHandle.getColumnType()).build());
}
return new ConnectorTableMetadata(new SchemaTableName(queryArrowTableHandle.getSchema(), queryArrowTableHandle.getTable()), meta);
}
else {
return super.getTableMetadata(session, tableHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
package com.facebook.plugin.arrow.testingConnector;

import com.facebook.plugin.arrow.ArrowBlockBuilder;
import com.facebook.plugin.arrow.ArrowConnector;
import com.facebook.plugin.arrow.BaseArrowFlightClientHandler;
import com.facebook.plugin.arrow.testingConnector.tvf.QueryFunctionProvider;
import com.facebook.plugin.arrow.testingServer.TestingArrowFlightRequest;
import com.facebook.plugin.arrow.testingServer.TestingArrowFlightResponse;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static com.google.inject.multibindings.Multibinder.newSetBinder;

public class TestingArrowModule
implements Module
Expand All @@ -36,6 +41,9 @@ public TestingArrowModule(boolean nativeExecution)
@Override
public void configure(Binder binder)
{
binder.bind(ConnectorMetadata.class).to(TestingArrowMetadata.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(QueryFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(ArrowConnector.class).to(TestingArrowConnector.class).in(Scopes.SINGLETON);
// Concrete implementation of the BaseFlightClientHandler
binder.bind(BaseArrowFlightClientHandler.class).to(TestingArrowFlightClientHandler.class).in(Scopes.SINGLETON);
// Override the ArrowBlockBuilder with an implementation that handles h2 types, skip for native
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.plugin.arrow.testingConnector;

import com.facebook.plugin.arrow.ArrowColumnHandle;
import com.facebook.plugin.arrow.ArrowTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static java.util.Objects.requireNonNull;
public class TestingQueryArrowTableHandle
extends ArrowTableHandle
{
private final String query;
private final List<ArrowColumnHandle> columns;

@JsonCreator
public TestingQueryArrowTableHandle(String query, List<ArrowColumnHandle> columns)
{
super("schema-" + UUID.randomUUID(), "table-" + UUID.randomUUID());
this.columns = Collections.unmodifiableList(requireNonNull(columns));
this.query = requireNonNull(query);
}

@JsonProperty
public String getQuery()
{
return query;
}

@JsonProperty
public List<ArrowColumnHandle> getColumns()
{
return columns;
}
}
Loading
Loading