-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix(plugin-arrow): Handle restricted output columns in Arrow Page Source #26175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(plugin-arrow): Handle restricted output columns in Arrow Page Source #26175
Conversation
Reviewer's GuideThis PR enhances ArrowPageSource to correctly map columns when using TVFs with restricted output by switching to name-based vector lookup, and introduces a testing framework for the new query_function TVF—including handler, provider, metadata, connector classes, and unit tests. ER diagram for TVF output column mapping in ArrowPageSourceerDiagram
FLIGHT_RESULT {
string name
int id
}
TVF_OUTPUT {
int id
}
FLIGHT_RESULT ||--o| TVF_OUTPUT : restricts
TVF_OUTPUT {
int id
}
Class diagram for ArrowPageSource column mapping updateclassDiagram
class ArrowPageSource {
+getNextPage()
-getVectorByColumnName(vectors: List<FieldVector>, name: String): FieldVector
+close()
}
ArrowPageSource --> ArrowBlockBuilder
ArrowPageSource --> FlightStreamAndClient
ArrowPageSource --> FieldVector
ArrowPageSource --> ArrowException
Class diagram for new TVF testing framework classesclassDiagram
class QueryFunctionProvider {
}
class TestingArrowConnector {
}
class TestingArrowMetadata {
}
class TestingQueryArrowTableHandle {
}
class PrimitiveToPrestoTypeMappings {
}
QueryFunctionProvider --> TestingArrowConnector
TestingArrowConnector --> TestingArrowMetadata
TestingArrowMetadata --> TestingQueryArrowTableHandle
TestingArrowConnector --> PrimitiveToPrestoTypeMappings
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
f6ce553 to
f5f55bc
Compare
0efd5c1 to
9ada46d
Compare
|
@sourcery-ai dismiss |
presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
Outdated
Show resolved
Hide resolved
9ada46d to
2b4de89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In ArrowPageSource#getVectorIndexForColumnHandleIndex, consider building and caching a mapping from column handle to vector index once per split instead of scanning the list on every page to simplify the logic and improve performance.
- The logger.debug call in the ArrowPageSource constructor uses a
%splaceholder but the Airlift logger expects{}syntax; either correct the placeholder or remove this debug statement if it’s no longer needed. - The regex-based type parsing in QueryFunctionProvider.extractColumnParameters is brittle for more complex SQL types; consider leveraging Presto’s built-in SQL parser or a more robust parsing approach to support a wider range of data types.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In ArrowPageSource#getVectorIndexForColumnHandleIndex, consider building and caching a mapping from column handle to vector index once per split instead of scanning the list on every page to simplify the logic and improve performance.
- The logger.debug call in the ArrowPageSource constructor uses a `%s` placeholder but the Airlift logger expects `{}` syntax; either correct the placeholder or remove this debug statement if it’s no longer needed.
- The regex-based type parsing in QueryFunctionProvider.extractColumnParameters is brittle for more complex SQL types; consider leveraging Presto’s built-in SQL parser or a more robust parsing approach to support a wider range of data types.
## Individual Comments
### Comment 1
<location> `presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java:57-58` </location>
<code_context>
this.arrowBlockBuilder = requireNonNull(arrowBlockBuilder, "arrowBlockBuilder is null");
this.flightStreamAndClient = clientHandler.getFlightStream(connectorSession, split);
+ this.columnHandlesInSplit = split.getColumns();
+ logger.debug("columnHandlesInSplit %s", columnHandlesInSplit);
}
</code_context>
<issue_to_address>
**suggestion:** Consider removing or adjusting debug logging for columnHandlesInSplit.
Debug logging in production may cause excessive logs or expose sensitive data. Use a less verbose log level or remove before merging if not needed.
```suggestion
this.columnHandlesInSplit = split.getColumns();
```
</issue_to_address>
### Comment 2
<location> `presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java:147` </location>
<code_context>
+ vectorIndex = index.getAsInt();
+ }
+ else {
+ throw new ArrowException(ARROW_INTERNAL_ERROR, "Unable to find column " + columnHandles.get(columnHandleIndex).getColumnName() + " in the column handles given in split");
+ }
+ }
</code_context>
<issue_to_address>
**suggestion:** Consider including more context in the error message for missing column.
Including the available columns in the error message will make it easier to identify the issue during debugging.
```suggestion
String availableColumns = columnHandlesInSplit.get().stream()
.map(handle -> handle.getColumnName())
.collect(Collectors.joining(", "));
throw new ArrowException(
ARROW_INTERNAL_ERROR,
"Unable to find column " + columnHandles.get(columnHandleIndex).getColumnName() +
" in the column handles given in split. Available columns: [" + availableColumns + "]"
);
```
</issue_to_address>
### Comment 3
<location> `presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java:34` </location>
<code_context>
private final String schemaName;
private final String tableName;
private final byte[] flightEndpointBytes;
+ private final Optional<List<ArrowColumnHandle>> columns;
@JsonCreator
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider validating that columns Optional is not null in ArrowSplit constructor.
Passing null for columns may cause a NullPointerException. Add a requireNonNull check in the constructor to ensure columns is not null.
Suggested implementation:
```java
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Objects;
```
```java
{
this.schemaName = schemaName;
this.tableName = tableName;
this.flightEndpointBytes = flightEndpointBytes;
this.columns = Objects.requireNonNull(columns, "columns is null");
```
</issue_to_address>
### Comment 4
<location> `presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightQueriesWithTVF.java:38` </location>
<code_context>
+ @Test
+ public void testQueryFunction()
+ {
+ MaterializedResult actualRow = computeActual("SELECT id from TABLE(system.query_function('SELECT name, id FROM tpch.member WHERE id = 1', 'name VARCHAR, id INTEGER'))");
+ MaterializedResult expectedRow = resultBuilder(getSession(), INTEGER)
+ .row(1)
</code_context>
<issue_to_address>
**suggestion (testing):** Test does not cover error handling for mismatched column names or types.
Add tests for cases where the TVF signature's column names or types do not match the query result, such as missing columns or incorrect types, to verify proper error handling.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
Outdated
Show resolved
Hide resolved
presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java
Outdated
Show resolved
Hide resolved
2b4de89 to
c9efdfc
Compare
|
@sourcery-ai dismiss |
|
@sourcery-ai dismiss |
c9efdfc to
20e2a14
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider replacing Optional<List> in ArrowSplit with an empty-list default to simplify the vector‐index mapping logic and avoid pervasive Optional checks.
- The regex-based parsing of the DATATYPES string in QueryFunctionProvider is brittle; it would be more robust to leverage Presto’s SQL parser or a dedicated type‐declaration parser.
- In ArrowPageSource, add an explicit sanity check that the split’s column list size matches the incoming FieldVector count so any mismatches are caught with a clear error early on.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider replacing Optional<List<ArrowColumnHandle>> in ArrowSplit with an empty-list default to simplify the vector‐index mapping logic and avoid pervasive Optional checks.
- The regex-based parsing of the DATATYPES string in QueryFunctionProvider is brittle; it would be more robust to leverage Presto’s SQL parser or a dedicated type‐declaration parser.
- In ArrowPageSource, add an explicit sanity check that the split’s column list size matches the incoming FieldVector count so any mismatches are caught with a clear error early on.
## Individual Comments
### Comment 1
<location> `presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightQueriesWithTVF.java:35-36` </location>
<code_context>
+ return ArrowFlightQueryRunner.createQueryRunner(serverPort, true);
+ }
+
+ @Test
+ public void testQueryFunction()
+ {
+ MaterializedResult actualRow = computeActual("SELECT id from TABLE(system.query_function('SELECT name, id FROM tpch.member WHERE id = 1', 'name VARCHAR, id INTEGER'))");
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding negative and edge case tests for the query_function TVF.
Please add tests for scenarios like non-existent columns, unsupported data types, malformed SQL, and restricting all columns to improve error handling and edge case coverage.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
20e2a14 to
48a35cb
Compare
|
@sourcery-ai dismiss |
|
@sourcery-ai review |
|
@sourcery-ai guide |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider replacing Optional<List> with a non‐null empty list to simplify the split‐column logic and avoid Optional checks everywhere.
- The check that vectors.size() == columnHandlesInSplit.size() may be too strict if the flight stream returns extra metadata or dictionary vectors—consider relaxing this to a >= check or filtering only the data vectors.
- You can simplify the index mapping by building a name→index map from columnHandlesInSplit once (e.g. in the constructor) instead of scanning the list for each column in getVectorIndexForColumnHandleIndex.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider replacing Optional<List<ArrowColumnHandle>> with a non‐null empty list to simplify the split‐column logic and avoid Optional checks everywhere.
- The check that vectors.size() == columnHandlesInSplit.size() may be too strict if the flight stream returns extra metadata or dictionary vectors—consider relaxing this to a >= check or filtering only the data vectors.
- You can simplify the index mapping by building a name→index map from columnHandlesInSplit once (e.g. in the constructor) instead of scanning the list for each column in getVectorIndexForColumnHandleIndex.
## Individual Comments
### Comment 1
<location> `presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java:34` </location>
<code_context>
private final String schemaName;
private final String tableName;
private final byte[] flightEndpointBytes;
+ private final Optional<List<ArrowColumnHandle>> columns;
@JsonCreator
</code_context>
<issue_to_address>
**suggestion:** Consider using an immutable list for columns to prevent accidental modification.
Wrapping columns in an immutable collection will enforce read-only access and enhance thread safety.
Suggested implementation:
```java
// columns is always an immutable (unmodifiable) list if present
private final Optional<List<ArrowColumnHandle>> columns;
```
```java
this.schemaName = schemaName;
this.tableName = tableName;
this.flightEndpointBytes = flightEndpointBytes;
this.columns = columns.map(list -> Collections.unmodifiableList(list));
```
</issue_to_address>
### Comment 2
<location> `presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java:53` </location>
<code_context>
flightEndpoint = new FlightEndpoint(ticket, location);
// Instantiate ArrowSplit with mock data
- arrowSplit = new ArrowSplit(schemaName, tableName, flightEndpoint.serialize().array());
+ arrowSplit = new ArrowSplit(schemaName, tableName, flightEndpoint.serialize().array(), Optional.empty());
}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for ArrowSplit with non-empty columns.
Please add a test with populated columns to verify ArrowSplit handles restricted columns correctly during serialization and deserialization.
```suggestion
}
@Test
public void testArrowSplitWithRestrictedColumns() {
String schemaName = "test_schema";
String tableName = "test_table";
Location location = new Location("http://localhost:8080");
Ticket ticket = new Ticket("test_ticket".getBytes());
FlightEndpoint flightEndpoint = new FlightEndpoint(ticket, location);
List<String> restrictedColumns = List.of("col1", "col2", "col3");
ArrowSplit arrowSplit = new ArrowSplit(
schemaName,
tableName,
flightEndpoint.serialize().array(),
Optional.of(restrictedColumns)
);
// Serialize and deserialize
ByteBuffer serialized = arrowSplit.serialize();
ArrowSplit deserialized = ArrowSplit.deserialize(serialized);
// Assert restricted columns are preserved
assertNotNull(deserialized.getRestrictedColumns());
assertEquals(deserialized.getRestrictedColumns().get(), restrictedColumns);
assertEquals(deserialized.getSchemaName(), schemaName);
assertEquals(deserialized.getTableName(), tableName);
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
daec4f0 to
96a9bf7
Compare
028f6fc to
efc25da
Compare
elbinpallimalilibm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the tests to compare with results from expected query instead of comparing with explicit values.
| struct ArrowTableHandle : public ConnectorTableHandle { | ||
| String schema = {}; | ||
| String table = {}; | ||
| std::shared_ptr<List<ArrowColumnHandle>> columns = {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests in TestArrowFlightNativeQueries to test scenarios with restricted columns. Seems the native connector doesn't even need to read these values, since the tests pass successfully.
efc25da to
a6bb015
Compare
elbinpallimalilibm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redesigned the whole PR since only a simple fix was needed in ArrowPageSource. I have kept the sample implementation of TVF so that TVF with Arrow Flight connector can be tested including scenarios where output columns are restricted.
|
|
||
| private FieldVector getVectorByColumnName(List<FieldVector> vectors, String name) | ||
| { | ||
| return vectors.stream().filter(v -> v.getName().equals(name)).findFirst().orElseThrow(() -> new ArrowException(ARROW_INTERNAL_ERROR, "Unable to find field for column " + name + " in the fields returned from Arrow Flight stream")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revisited the pull request and identified that only this fix was needed to handle restricted output columns when using TVF.
|
@SourceryAI guide |
|
@SourceryAI review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java:116-119` </location>
<code_context>
+ private FieldVector getVectorByColumnName(List<FieldVector> vectors, String name)
+ {
+ return vectors.stream().filter(v -> v.getName().equals(name)).findFirst().orElseThrow(() -> new ArrowException(ARROW_INTERNAL_ERROR, "Unable to find field for column " + name + " in the fields returned from Arrow Flight stream"));
+ }
+
</code_context>
<issue_to_address>
**suggestion:** ArrowException message could include available field names for easier debugging.
Consider adding the available field names to the ArrowException message to help identify missing or misnamed columns.
```suggestion
private FieldVector getVectorByColumnName(List<FieldVector> vectors, String name)
{
String availableFields = vectors.stream()
.map(FieldVector::getName)
.reduce((a, b) -> a + ", " + b)
.orElse("(none)");
return vectors.stream()
.filter(v -> v.getName().equals(name))
.findFirst()
.orElseThrow(() -> new ArrowException(
ARROW_INTERNAL_ERROR,
"Unable to find field for column " + name +
" in the fields returned from Arrow Flight stream. " +
"Available fields: " + availableFields
));
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| List<FieldVector> vectors = flightStreamAndClient.getRoot().getFieldVectors(); | ||
| for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) { | ||
| FieldVector vector = vectors.get(columnIndex); | ||
| FieldVector vector = getVectorByColumnName(vectors, columnHandles.get(columnIndex).getColumnName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern with doing it this way is that it's a full search over the fields for each column, so if there is a very wide record batch, it's not very efficient. Also this would affect all cases, not just when using TVFs.
Since this does simplify quite a bit, I think it's ok for now. Could you add a note to explain the reason the columns are matched by name and not just by index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments explaining why fields are looked up by name. BTW, the native arrow flight connector, also looks up the field by column name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I think what you have now is close to what is in the native connector
|
|
||
| private FieldVector getVectorByColumnName(List<FieldVector> vectors, String name) | ||
| { | ||
| return vectors.stream().filter(v -> v.getName().equals(name)).findFirst().orElseThrow(() -> new ArrowException(ARROW_INTERNAL_ERROR, "Unable to find field for column " + name + " in the fields returned from Arrow Flight stream")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could make this a little more efficient. I think stream().filter() will continue iterating even if a match is found. The Arrow VectorSchemaRoot also has a method to lookup a vector by name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use the method in VectorSchemaRoot to get the field by name.
...-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightNativeQueries.java
Show resolved
Hide resolved
a6bb015 to
9bea9e7
Compare
elbinpallimalilibm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed suggestions.
...-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightNativeQueries.java
Show resolved
Hide resolved
|
|
||
| private FieldVector getVectorByColumnName(List<FieldVector> vectors, String name) | ||
| { | ||
| return vectors.stream().filter(v -> v.getName().equals(name)).findFirst().orElseThrow(() -> new ArrowException(ARROW_INTERNAL_ERROR, "Unable to find field for column " + name + " in the fields returned from Arrow Flight stream")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use the method in VectorSchemaRoot to get the field by name.
| List<FieldVector> vectors = flightStreamAndClient.getRoot().getFieldVectors(); | ||
| for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) { | ||
| FieldVector vector = vectors.get(columnIndex); | ||
| FieldVector vector = getVectorByColumnName(vectors, columnHandles.get(columnIndex).getColumnName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments explaining why fields are looked up by name. BTW, the native arrow flight connector, also looks up the field by column name.
6fd82cf to
1b01b32
Compare
| // In scenarios where the user query contains a Table Valued Function, the output columns could be in a | ||
| // different order or could be a subset of the columns in the flight stream. So we are fetching the requested | ||
| // field vector by matching the column name instead of fetching by column index. | ||
| FieldVector vector = vectorSchemaRoot.getVector(columnHandle.getColumnName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getVector will return a null if not found, need to check for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added check
| @Test | ||
| public void testQueryFunctionWithoutRestrictedColumns() throws InterruptedException | ||
| { | ||
| assertQuery("SELECT NAME, NATIONKEY FROM TABLE(system.query_function('SELECT NATIONKEY, NAME FROM tpch.nation WHERE NATIONKEY = 4','NATIONKEY BIGINT, NAME VARCHAR'))", "SELECT NAME, NATIONKEY FROM nation WHERE NATIONKEY = 4"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test that would reverse the order of the columns? e.g. SELECT NATIONKEY, NAME ...
Also a negative test where the output column is not present in the TVF? like SELECT FOO ..
This would fail during analysis right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added negative test
1b01b32 to
46ef954
Compare
46ef954 to
934eac8
Compare
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Thanks a lot for carrying this through @elbinpallimalilibm. Thanks also @BryanCutler for the detailed and thorough reviews. |
Restriced columns need to be handled when using table valued functions in query.
Description
When using table valued functions (TVF) in queries like given below
ArrowPageSourcecan fail to map the correctFieldVectorfor the column. This happens because a TVF like above executes the query natively in Flight server and gives a result with two columns butArrowPageSourceis expected to only return results for 1 column, ieidfrom above query. The columnnameis restricted from the TVF resultMotivation and Context
This change is required when using TVF against a catalog based on
presto-base-arrow-flightmodule.Impact
This fixes problems when using TVF in a way given above against Arrow Flight based catalogs.
Test Plan
New unit test added that will test this change. This PR also includes an implementation of TVF called
query_functionthat will test this change.This change is backward compatible, so existing test cases will also pass with this change.
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.