Skip to content

Conversation

@BryanCutler
Copy link
Contributor

Description

This adds a FlightShim project with an Apache Arrow Flight server that is able to load Presto Java JDBC-based connectors and process JDBC splits and return a stream of Arrow record batches. See related RFC prestodb/rfcs#46

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* ... 
* ... 

Hive Connector Changes
* ... 
* ... 

If release note is NOT required, use:

== NO RELEASE NOTE ==

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Oct 20, 2025
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Oct 20, 2025

Reviewer's Guide

This PR introduces a new "presto-flight-shim" Maven module that embeds an Apache Arrow Flight server into Presto workers, enabling federation over JDBC connectors. It implements a plugin manager to load and configure Presto JDBC-based connectors at runtime, a Flight producer to handle Flight gRPC calls and serialize/deserialize FlightShimRequests, and an ArrowBatchSource to stream JDBC splits as Arrow record batches. The module bootstraps via Guice, exposes configuration and JMX metrics, and is covered by end-to-end and unit tests across multiple connectors.

Sequence diagram for FlightShim FlightServer GetStream request handling

sequenceDiagram
  participant Client as "Flight Client"
  participant Server as "FlightServer (FlightShimServer)"
  participant Producer as "FlightShimProducer"
  participant PluginMgr as "FlightShimPluginManager"
  participant Connector as "JDBC Connector"
  participant ArrowBatch as "ArrowBatchSource"
  Client->>Server: Send GetStream request (Ticket)
  Server->>Producer: Delegate getStream(ticket)
  Producer->>PluginMgr: getConnector(connectorId)
  PluginMgr->>Connector: Load connector
  Producer->>Connector: getRecordSetProvider()
  Producer->>Connector: getRecordSet(split, columns)
  Producer->>ArrowBatch: Create ArrowBatchSource(recordSet)
  loop For each batch
    ArrowBatch->>Producer: nextBatch()
    Producer->>Client: Send Arrow record batch
  end
  Producer->>Client: completed()
Loading

Class diagram for new FlightShim core classes

classDiagram
  class FlightShimPluginManager {
    +loadPlugins()
    +loadCatalogs()
    +getConnector(String): ConnectorHolder
    -connectorFactories: Map
    -connectors: Map
    -catalogPropertiesMap: Map
    +stop()
  }
  class FlightShimPluginManager~ConnectorHolder~ {
    +getConnector(): Connector
    +getCodecSplit(): JsonCodec
    +getCodecColumnHandle(): JsonCodec
    +getColumnMetadata(ColumnHandle): ColumnMetadata
  }
  class FlightShimProducer {
    +getStream(CallContext, Ticket, ServerStreamListener)
    +close()
    -pluginManager: FlightShimPluginManager
    -allocator: BufferAllocator
    -config: FlightShimConfig
    -shimExecutor: ExecutorService
  }
  class ArrowBatchSource {
    +nextBatch(): boolean
    +getVectorSchemaRoot(): VectorSchemaRoot
    +close()
    -columns: List
    -cursor: RecordCursor
    -root: VectorSchemaRoot
    -writers: List
    -maxRowsPerBatch: int
  }
  class FlightShimConfig {
    +getServerName(): String
    +getServerPort(): Integer
    +getServerSslEnabled(): boolean
    +getMaxRowsPerBatch(): int
    +getReadSplitThreadPoolSize(): int
  }
  class FlightShimRequest {
    +getConnectorId(): String
    +getSplitBytes(): byte[]
    +getColumnHandlesBytes(): List<byte[]>
  }
  FlightShimPluginManager "1" o-- "*" FlightShimPluginManager~ConnectorHolder~
  FlightShimProducer "1" --> "1" FlightShimPluginManager
  FlightShimProducer "1" --> "1" FlightShimConfig
  FlightShimProducer "1" --> "1" ArrowBatchSource
  ArrowBatchSource "1" --> "1" VectorSchemaRoot
  FlightShimProducer "1" --> "1" BufferAllocator
  FlightShimProducer "1" --> "1" ExecutorService
  FlightShimRequest "1" --> "1" FlightShimProducer
  FlightShimConfig "1" --> "1" FlightShimProducer
Loading

File-Level Changes

Change Details Files
Register and configure a new presto-flight-shim Maven module
  • Added module entry in parent POM
  • Created presto-flight-shim/pom.xml with dependencies and build plugins
pom.xml
presto-flight-shim/pom.xml
Conversion of RecordCursor to Arrow record batches
  • Implemented ArrowBatchSource with vector schema creation, allocation, and batching
  • Built ArrowShimWriter hierarchy for all supported Arrow types
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/ArrowBatchSource.java
Dynamic loading and management of JDBC connectors
  • Developed FlightShimPluginManager to discover, load, and register connector factories
  • Managed catalog properties, connector instances, and classloader isolation
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimPluginManager.java
Flight producer implementation for gRPC Flight API
  • Added FlightShimProducer to handle getStream calls and backpressure
  • Serialized and deserialized FlightShimRequest and managed streaming lifecycles
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimProducer.java
Server bootstrap and dependency injection setup
  • Created FlightShimServer to initialize and start FlightServer
  • Defined FlightShimModule (Guice bindings) and FlightShimConfig
  • Exposed thread pool metrics via FlightShimServerExecutionMBean and qualifier
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimServer.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimModule.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimConfig.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimServerExecutionMBean.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/ForFlightShimServer.java
FlightShim request data model
  • Added FlightShimRequest with JSON annotations for ticket payload
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimRequest.java
Integration and unit tests for FlightShim
  • Built abstract test frameworks for connector queries
  • Implemented connector-specific tests (PostgreSQL, MySQL, Oracle, SingleStore)
  • Added tests for config mapping and request JSON round-trip
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/AbstractTestFlightShimBase.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/AbstractTestFlightShimQueries.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimProducer.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimRequest.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimConfig.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimMySql.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimOracle.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimPostgres.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimSingleStore.java
Configuration and logging setup
  • Added default log.properties
  • Added default flightshim.properties
presto-flight-shim/etc/log.properties
presto-flight-shim/etc/flightshim.properties

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from 8591906 to 99225b1 Compare November 5, 2025 02:30
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

Comment on lines +1 to +28
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC8DK/RdBF6I+k+
DMGrjhMMBCnpPNwJtzJUuXYcHFYEdBnHY/rpjk/fi+7jD8bppynCZPakrDX+5VIM
zS4HBU/CHY26eR2ItiWqDoDkPAlCdgeKIGNYYEvVSuUW5YQX6fuD8PfCpCP5zK7D
JC2xTTsyEjBzD+MnIB7Tja9/22Djo2Ib2l9BEBOD+k79caPFtSqDQVdS5JLJ/P7B
eqGuFS8bEgtLwwCzRxPKkG64rXb9F0IErGwjXi/70BA24EW0uGAzzeY5Pnlx5Mul
EIjuxII/dTuoo+/uirN0wxURKSzTMyzzoJqGX9Ng9+Z+VqWFrnqckcmFcD4T0sPo
HpWZsYIHAgMBAAECggEAViw6JXFa0O3D5HtUBJmGgOsniYoqCwm4NrsGNLuHb2ME
rSpTwNNGJtqpDcQdEtVXfY1muO9xjuznPJaJkQ4ODpYcbGcz8YIGoHck+XHJjHsp
2VIeNFFsbsFzWZqzfYHrj/rMjpVJJx90tlfN2IHbroZHTXLqVPOTLL6wvZZ6P9XF
zqpWABKOaEDbenhSFFZeF5KR3NG9HSTm4YLuekumkH+QgrveDfDwXG4hAHqg836o
OF3NPaij6VlSR18nuyW0wMs/Ceu13P+GALqHmz98pFyVgHWQFryL9IccvJQDyEnt
saeG4IAVlJbZDGTnRgANLhpwBr7XhMG1aK+wmOMRgQKBgQDkcatiATlr9L8gfnHb
6pmX//AZLdXuQLXfuTvu638Brhm770noLgfIC+HIp5kCHxT2Xj5Vn+MSnYD6R6Wh
chApRKJUdsuz1iOq23YJjvsSLWCGpl9IxR7WY27uGOPIjQcOd1PRbkCq9AgUJwyn
ryca3sbYh/XQOWGLbJNIQs/S/QKBgQDSu6PVeMaS3276KblvGIvvaSAQDQWxXcC+
sA4CBmvjzx3xx5GAox/w7tcKmK/KQxNhaYy6N7xLc1YUJ9FbnT2PZQJhtP2d2Gat
Zre/+Qa+u84cR5hj9EI+B8FjW7D/psEj16KjHCds/SET6ngPM+RdB4N9daVFCurt
p0f717yiUwKBgBTJDun06I+dDkLbnmp/FwiQff0cgYmTE7lOdliPzteNSsQhypy4
i3a1Ng72yOI7h8G+43cQ/C02bYTYPgbJhRTsLMT4piIvysECBORrwQZvYIf/3U2W
ue6Rz4cUdq1Jv6meS98TZAjp+U40G1+qfSlhub/75u7SOcDg2SnLAnPVAoGBAIOO
EmRE5qpwA+b2P0Ykq89E8Hg0uPYWEiq427XV7mqkNQxoSuRkcZ9Ga0a5NRzurN2m
N+1UuB7eHMGubdtkmTa4lzkJ9T4iB09/DX0x6E0QD0bGR1M2/FefHdJ6PlAK+Q34
Ixbyj4ZRq+G0AUl0Wr7c3vBmjktA2pKMWLrW3nLzAoGBAKTl7qX6CD42gAJuT5Hp
rrXqlppVIyRvuXzXtX/Xq81IUHlBgS/t9HPyqDzmTKfxD8540kI+15bWPDHSJxiQ
ccqPaKyXhBXstDwGmlPKVzJUxk0dz5NHs+8gItUDOg78pM3siXN7vW9XBCH7mCDA
4zet/C0YCAiFVT+ipMoXy8Nc
-----END PRIVATE KEY-----
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (private-key): Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.

Source: gitleaks

Comment on lines +1 to +28
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCtkJ+r1F8+YOVu
wWLxbGVsJKw3BEShtCsU+IXHJeaNJdBr59B/4h5WM37wOnnecmyEZTh47FXXkb5h
0xVlHES7eTAD+NPlWHufGJ9PR1kvQyZ0fyNRFXLzUID/dl7atHBtlrqE5Bhg7xqA
yPZjUjhkAZPgrT1/8+gYmbWPbw3Ba3+XRupq3Kn+EVVJi7wk4cj8jf6g1aex6sMO
kSYNsanb+JdEryevgoju+EtHgCHL6cB0eJs8PfMWiibgWLE2pkI0bdbGjTNVDDyg
ZoO8Qr/YrGvXXYqt0D7IKSUiO8bnrvZh6ITPEcQ3ePQRGEpqh8ggKaVq3RVkC3t3
QMRWGpsvAgMBAAECggEAIK9C+lNAallJ62z8inU8tjxDuAqUOBVbJZRVcPbIr1zn
HmLlpyd4Sghhh7CjYYoPuHDtTQxIcBNwlDBxb3x+zwUXzy+tC5v5j7DN01qex2Ew
XTDSAEN3Ra2r1S+/1hSztVd0oXDozFxKk+UETRjfKKoJZH6LPcy7MOLFR5EEuJ8L
0kvGdEtuNCmZ1vPBwqR3IKQS9NsB1IdTtK0g2LdtVzM3U6F173CrAx51qNeAL30j
Np+I0rfm7vYVco6nDQXJB86hzwwBnLMzmZR2E0z+JStQCjQtEJN9wp+NBnViMb8C
mZl0K/PH3ZKNEs1Aw/TsRpPu6Fc+sN6iIs2oOGiKfQKBgQDa0Wpfj+SHflLfmrRU
PplGNjWdJiyuXROqX18iNE8nAD0eRqAFdzj9yU1IW49KCzuHInEl2pP9yrDZTWXB
Bht4C+Vk13mrBE3Sc1LDrks5EhDLaaolLgx1B+JN1X2DpfuzO8WHrXR11PCzFTAp
yDSVd451CFFXMseS1V9UxCy3lQKBgQDLDrLX/0hGhG+a5RUaAE+hZk+tU9RyjYm6
/5lIoDjDwA9Yst69JCTHDApkdZ6IrjPDZrxkAQR6QwsGo+zRGkHV2wCoqR/RxcT5
RBcbe/8xL86ZKwnhAheP6ssgZeK5zOG1iLol319kXXuo6NueN+YlocmsppRvAOq7
/qMnhzXGswKBgQCpke2wHo9HnNJWK8ohGt2mtm232ZR4jvKlbgEIPac1Hw89/hcW
BT0qFqyILUQOakP4Re2PGyLiYwfHbh4zhisVTYq4Ke9EYzJ3qxzxPYlXsbNIHxtW
cqf+rVxnWtFIiwFR9TjvGrEMezcIYJwRVO/DAIJqGUcHnvdfx3B3/Qp2PQKBgQCk
y7UR37kEog8BotHRXFdEIgigHtzYa05QWYhJjN8E3yaVUfW7g03lzTvR9DNJsjeI
aiSS9NBxeV/Fb9yOh8TOjwKl3zxXvy3xLvWh9KxTev0tCeTmnBALWP6puIadTE4S
Snjoq7R7e/MUToeOjMdX20oVuMvWmuPm1u4K8o0OSQKBgQCec+QLllYXk22A8/e/
f5HhSYr161lEFFmuzKhuuy+esyCQU/KZmxQH0UqnsL3Ww4ofq42lteqyUJnriHsx
QP5FTIMKH8W+Xels1i6jCC+MVXAXraAF27dOlmKxWMN7mnElZ/7lQKmBq64wil35
sfcJA4FDxVM2Amv4KRo/w1C/zQ==
-----END PRIVATE KEY-----
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (private-key): Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.

Source: gitleaks

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from 1f6d1f4 to f65e31d Compare November 5, 2025 19:25
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from a02de8f to b877acd Compare November 17, 2025 23:46
Works up to getting RecordSet due to classloader conflict

Test is now able to read from postgresql into recordset

Using FlightShimConfig for location

Added conversion with ArrowShimWriter

Added more type conversions and writers

Config with SSL working, test passes with row count

Fix test for tpch

Using executor service to run getStream

Adding cancel tests

Backpressure working

Fixed config to load from file

Added some basic unit tests

Using bound FunctionAndTypeManager for type deser and ConnectorContext

Made FlightShimQueryRunner, working but needs selecting columns

Able to make select query by column name and use query runner

Added test for MySql connector

Added type support for date and timestamp, with tests. decimal test not working yet

Fix floating point test

Added logging

Using plugins from plugin-bundles config

Added oracle and singlestore tests, oracle not working yet

Remove plugin dir

Fixed deser of TupleDomain, need to test against query results

Add test with additionalPredicate

Enable mTLS, fix config typo

Moved to test containers after rebase

Fix code style

Use SynchronousQueue in executor fix gRPC deadlock

Fix backpressure to use new instance per call
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch from b877acd to 1757683 Compare November 20, 2025 22:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants