-
Notifications
You must be signed in to change notification settings - Fork 5.5k
WIP feat(flight-shim): Adding Apache Arrow FlightShim for federation in C++ workers #26369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR introduces a new "presto-flight-shim" Maven module that embeds an Apache Arrow Flight server into Presto workers, enabling federation over JDBC connectors. It implements a plugin manager to load and configure Presto JDBC-based connectors at runtime, a Flight producer to handle Flight gRPC calls and serialize/deserialize FlightShimRequests, and an ArrowBatchSource to stream JDBC splits as Arrow record batches. The module bootstraps via Guice, exposes configuration and JMX metrics, and is covered by end-to-end and unit tests across multiple connectors. Sequence diagram for FlightShim FlightServer GetStream request handlingsequenceDiagram
participant Client as "Flight Client"
participant Server as "FlightServer (FlightShimServer)"
participant Producer as "FlightShimProducer"
participant PluginMgr as "FlightShimPluginManager"
participant Connector as "JDBC Connector"
participant ArrowBatch as "ArrowBatchSource"
Client->>Server: Send GetStream request (Ticket)
Server->>Producer: Delegate getStream(ticket)
Producer->>PluginMgr: getConnector(connectorId)
PluginMgr->>Connector: Load connector
Producer->>Connector: getRecordSetProvider()
Producer->>Connector: getRecordSet(split, columns)
Producer->>ArrowBatch: Create ArrowBatchSource(recordSet)
loop For each batch
ArrowBatch->>Producer: nextBatch()
Producer->>Client: Send Arrow record batch
end
Producer->>Client: completed()
Class diagram for new FlightShim core classesclassDiagram
class FlightShimPluginManager {
+loadPlugins()
+loadCatalogs()
+getConnector(String): ConnectorHolder
-connectorFactories: Map
-connectors: Map
-catalogPropertiesMap: Map
+stop()
}
class FlightShimPluginManager~ConnectorHolder~ {
+getConnector(): Connector
+getCodecSplit(): JsonCodec
+getCodecColumnHandle(): JsonCodec
+getColumnMetadata(ColumnHandle): ColumnMetadata
}
class FlightShimProducer {
+getStream(CallContext, Ticket, ServerStreamListener)
+close()
-pluginManager: FlightShimPluginManager
-allocator: BufferAllocator
-config: FlightShimConfig
-shimExecutor: ExecutorService
}
class ArrowBatchSource {
+nextBatch(): boolean
+getVectorSchemaRoot(): VectorSchemaRoot
+close()
-columns: List
-cursor: RecordCursor
-root: VectorSchemaRoot
-writers: List
-maxRowsPerBatch: int
}
class FlightShimConfig {
+getServerName(): String
+getServerPort(): Integer
+getServerSslEnabled(): boolean
+getMaxRowsPerBatch(): int
+getReadSplitThreadPoolSize(): int
}
class FlightShimRequest {
+getConnectorId(): String
+getSplitBytes(): byte[]
+getColumnHandlesBytes(): List<byte[]>
}
FlightShimPluginManager "1" o-- "*" FlightShimPluginManager~ConnectorHolder~
FlightShimProducer "1" --> "1" FlightShimPluginManager
FlightShimProducer "1" --> "1" FlightShimConfig
FlightShimProducer "1" --> "1" ArrowBatchSource
ArrowBatchSource "1" --> "1" VectorSchemaRoot
FlightShimProducer "1" --> "1" BufferAllocator
FlightShimProducer "1" --> "1" ExecutorService
FlightShimRequest "1" --> "1" FlightShimProducer
FlightShimConfig "1" --> "1" FlightShimProducer
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New security issues found
8591906 to
99225b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New security issues found
| -----BEGIN PRIVATE KEY----- | ||
| MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC8DK/RdBF6I+k+ | ||
| DMGrjhMMBCnpPNwJtzJUuXYcHFYEdBnHY/rpjk/fi+7jD8bppynCZPakrDX+5VIM | ||
| zS4HBU/CHY26eR2ItiWqDoDkPAlCdgeKIGNYYEvVSuUW5YQX6fuD8PfCpCP5zK7D | ||
| JC2xTTsyEjBzD+MnIB7Tja9/22Djo2Ib2l9BEBOD+k79caPFtSqDQVdS5JLJ/P7B | ||
| eqGuFS8bEgtLwwCzRxPKkG64rXb9F0IErGwjXi/70BA24EW0uGAzzeY5Pnlx5Mul | ||
| EIjuxII/dTuoo+/uirN0wxURKSzTMyzzoJqGX9Ng9+Z+VqWFrnqckcmFcD4T0sPo | ||
| HpWZsYIHAgMBAAECggEAViw6JXFa0O3D5HtUBJmGgOsniYoqCwm4NrsGNLuHb2ME | ||
| rSpTwNNGJtqpDcQdEtVXfY1muO9xjuznPJaJkQ4ODpYcbGcz8YIGoHck+XHJjHsp | ||
| 2VIeNFFsbsFzWZqzfYHrj/rMjpVJJx90tlfN2IHbroZHTXLqVPOTLL6wvZZ6P9XF | ||
| zqpWABKOaEDbenhSFFZeF5KR3NG9HSTm4YLuekumkH+QgrveDfDwXG4hAHqg836o | ||
| OF3NPaij6VlSR18nuyW0wMs/Ceu13P+GALqHmz98pFyVgHWQFryL9IccvJQDyEnt | ||
| saeG4IAVlJbZDGTnRgANLhpwBr7XhMG1aK+wmOMRgQKBgQDkcatiATlr9L8gfnHb | ||
| 6pmX//AZLdXuQLXfuTvu638Brhm770noLgfIC+HIp5kCHxT2Xj5Vn+MSnYD6R6Wh | ||
| chApRKJUdsuz1iOq23YJjvsSLWCGpl9IxR7WY27uGOPIjQcOd1PRbkCq9AgUJwyn | ||
| ryca3sbYh/XQOWGLbJNIQs/S/QKBgQDSu6PVeMaS3276KblvGIvvaSAQDQWxXcC+ | ||
| sA4CBmvjzx3xx5GAox/w7tcKmK/KQxNhaYy6N7xLc1YUJ9FbnT2PZQJhtP2d2Gat | ||
| Zre/+Qa+u84cR5hj9EI+B8FjW7D/psEj16KjHCds/SET6ngPM+RdB4N9daVFCurt | ||
| p0f717yiUwKBgBTJDun06I+dDkLbnmp/FwiQff0cgYmTE7lOdliPzteNSsQhypy4 | ||
| i3a1Ng72yOI7h8G+43cQ/C02bYTYPgbJhRTsLMT4piIvysECBORrwQZvYIf/3U2W | ||
| ue6Rz4cUdq1Jv6meS98TZAjp+U40G1+qfSlhub/75u7SOcDg2SnLAnPVAoGBAIOO | ||
| EmRE5qpwA+b2P0Ykq89E8Hg0uPYWEiq427XV7mqkNQxoSuRkcZ9Ga0a5NRzurN2m | ||
| N+1UuB7eHMGubdtkmTa4lzkJ9T4iB09/DX0x6E0QD0bGR1M2/FefHdJ6PlAK+Q34 | ||
| Ixbyj4ZRq+G0AUl0Wr7c3vBmjktA2pKMWLrW3nLzAoGBAKTl7qX6CD42gAJuT5Hp | ||
| rrXqlppVIyRvuXzXtX/Xq81IUHlBgS/t9HPyqDzmTKfxD8540kI+15bWPDHSJxiQ | ||
| ccqPaKyXhBXstDwGmlPKVzJUxk0dz5NHs+8gItUDOg78pM3siXN7vW9XBCH7mCDA | ||
| 4zet/C0YCAiFVT+ipMoXy8Nc | ||
| -----END PRIVATE KEY----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (private-key): Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.
Source: gitleaks
| -----BEGIN PRIVATE KEY----- | ||
| MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCtkJ+r1F8+YOVu | ||
| wWLxbGVsJKw3BEShtCsU+IXHJeaNJdBr59B/4h5WM37wOnnecmyEZTh47FXXkb5h | ||
| 0xVlHES7eTAD+NPlWHufGJ9PR1kvQyZ0fyNRFXLzUID/dl7atHBtlrqE5Bhg7xqA | ||
| yPZjUjhkAZPgrT1/8+gYmbWPbw3Ba3+XRupq3Kn+EVVJi7wk4cj8jf6g1aex6sMO | ||
| kSYNsanb+JdEryevgoju+EtHgCHL6cB0eJs8PfMWiibgWLE2pkI0bdbGjTNVDDyg | ||
| ZoO8Qr/YrGvXXYqt0D7IKSUiO8bnrvZh6ITPEcQ3ePQRGEpqh8ggKaVq3RVkC3t3 | ||
| QMRWGpsvAgMBAAECggEAIK9C+lNAallJ62z8inU8tjxDuAqUOBVbJZRVcPbIr1zn | ||
| HmLlpyd4Sghhh7CjYYoPuHDtTQxIcBNwlDBxb3x+zwUXzy+tC5v5j7DN01qex2Ew | ||
| XTDSAEN3Ra2r1S+/1hSztVd0oXDozFxKk+UETRjfKKoJZH6LPcy7MOLFR5EEuJ8L | ||
| 0kvGdEtuNCmZ1vPBwqR3IKQS9NsB1IdTtK0g2LdtVzM3U6F173CrAx51qNeAL30j | ||
| Np+I0rfm7vYVco6nDQXJB86hzwwBnLMzmZR2E0z+JStQCjQtEJN9wp+NBnViMb8C | ||
| mZl0K/PH3ZKNEs1Aw/TsRpPu6Fc+sN6iIs2oOGiKfQKBgQDa0Wpfj+SHflLfmrRU | ||
| PplGNjWdJiyuXROqX18iNE8nAD0eRqAFdzj9yU1IW49KCzuHInEl2pP9yrDZTWXB | ||
| Bht4C+Vk13mrBE3Sc1LDrks5EhDLaaolLgx1B+JN1X2DpfuzO8WHrXR11PCzFTAp | ||
| yDSVd451CFFXMseS1V9UxCy3lQKBgQDLDrLX/0hGhG+a5RUaAE+hZk+tU9RyjYm6 | ||
| /5lIoDjDwA9Yst69JCTHDApkdZ6IrjPDZrxkAQR6QwsGo+zRGkHV2wCoqR/RxcT5 | ||
| RBcbe/8xL86ZKwnhAheP6ssgZeK5zOG1iLol319kXXuo6NueN+YlocmsppRvAOq7 | ||
| /qMnhzXGswKBgQCpke2wHo9HnNJWK8ohGt2mtm232ZR4jvKlbgEIPac1Hw89/hcW | ||
| BT0qFqyILUQOakP4Re2PGyLiYwfHbh4zhisVTYq4Ke9EYzJ3qxzxPYlXsbNIHxtW | ||
| cqf+rVxnWtFIiwFR9TjvGrEMezcIYJwRVO/DAIJqGUcHnvdfx3B3/Qp2PQKBgQCk | ||
| y7UR37kEog8BotHRXFdEIgigHtzYa05QWYhJjN8E3yaVUfW7g03lzTvR9DNJsjeI | ||
| aiSS9NBxeV/Fb9yOh8TOjwKl3zxXvy3xLvWh9KxTev0tCeTmnBALWP6puIadTE4S | ||
| Snjoq7R7e/MUToeOjMdX20oVuMvWmuPm1u4K8o0OSQKBgQCec+QLllYXk22A8/e/ | ||
| f5HhSYr161lEFFmuzKhuuy+esyCQU/KZmxQH0UqnsL3Ww4ofq42lteqyUJnriHsx | ||
| QP5FTIMKH8W+Xels1i6jCC+MVXAXraAF27dOlmKxWMN7mnElZ/7lQKmBq64wil35 | ||
| sfcJA4FDxVM2Amv4KRo/w1C/zQ== | ||
| -----END PRIVATE KEY----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (private-key): Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.
Source: gitleaks
1f6d1f4 to
f65e31d
Compare
a02de8f to
b877acd
Compare
Works up to getting RecordSet due to classloader conflict Test is now able to read from postgresql into recordset Using FlightShimConfig for location Added conversion with ArrowShimWriter Added more type conversions and writers Config with SSL working, test passes with row count Fix test for tpch Using executor service to run getStream Adding cancel tests Backpressure working Fixed config to load from file Added some basic unit tests Using bound FunctionAndTypeManager for type deser and ConnectorContext Made FlightShimQueryRunner, working but needs selecting columns Able to make select query by column name and use query runner Added test for MySql connector Added type support for date and timestamp, with tests. decimal test not working yet Fix floating point test Added logging Using plugins from plugin-bundles config Added oracle and singlestore tests, oracle not working yet Remove plugin dir Fixed deser of TupleDomain, need to test against query results Add test with additionalPredicate Enable mTLS, fix config typo Moved to test containers after rebase Fix code style Use SynchronousQueue in executor fix gRPC deadlock Fix backpressure to use new instance per call
b877acd to
1757683
Compare
Description
This adds a FlightShim project with an Apache Arrow Flight server that is able to load Presto Java JDBC-based connectors and process JDBC splits and return a stream of Arrow record batches. See related RFC prestodb/rfcs#46
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
If release note is NOT required, use: