Skip to content

Commit 7eca0e2

Browse files
pdabre12Pratik Joseph Dabre
authored andcommitted
Add serialization logic to ArrowFederationColumnHandle
1 parent f3b2180 commit 7eca0e2

File tree

15 files changed

+358
-20
lines changed

15 files changed

+358
-20
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
connector.name=postgresql
2+
connection-url=jdbc:postgresql://localhost:5432/testdb
3+
connection-user=presto
4+
connection-password=presto
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
flight-shim.server=localhost
22
flight-shim.server.port=9999
3-
flight-shim.server-ssl-certificate-file=src/test/resources/server.crt
4-
flight-shim.server-ssl-key-file=src/test/resources/server.key
3+
#flight-shim.server-ssl-certificate-file=src/test/resources/server.crt
4+
flight-shim.server-ssl-enabled=false
55

66
plugin.bundles=\
7-
../presto-oracle/pom, \
7+
../presto-oracle/pom.xml, \
88
../presto-postgresql/pom.xml, \
99
../presto-mysql/pom.xml

presto-main/etc/catalog/postgresql.properties

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
connector.name=postgresql
2+
connection-url=jdbc:postgresql://localhost:5432/testdb
3+
connection-user=presto
4+
connection-password=presto
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
connector.name=tpcds
22
tpcds.splits-per-node=4
3+
tpcds.use-varchar-type=true

presto-main/etc/config.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
# sample nodeId to provide consistency across test runs
99
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
10-
node.environment=test
10+
node.environment=testing
1111
http-server.http.port=8080
1212

1313
discovery-server.enabled=true
@@ -55,4 +55,5 @@ plugin.bundles=\
5555
../presto-sql-helpers/presto-sql-invoked-functions-plugin/pom.xml
5656

5757
presto.version=testversion
58-
node-scheduler.include-coordinator=true
58+
node-scheduler.include-coordinator=false
59+
native-execution-enabled=true

presto-native-execution/presto_cpp/main/connectors/arrow_flight/arrow_federation/ArrowFederationConnector.cpp

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,25 @@ velox::connector::ColumnHandleMap toArrowFlightColumnHandleMap(
3535
for (const auto& [name, handle] : columnHandles) {
3636
arrowFlightColumnHandles[name] =
3737
std::make_shared<facebook::presto::ArrowFlightColumnHandle>(
38-
"arrow-flight");
38+
handle->name());
3939
}
4040
return arrowFlightColumnHandles;
4141
}
42+
43+
// Json conversion helpers for ColumnHandleMap
44+
void to_json(nlohmann::json& j, const ColumnHandleMap& map) {
45+
j = nlohmann::json::array();
46+
for (const auto& [name, handle] : map) {
47+
if (!handle) {
48+
continue; // skip null handles
49+
}
50+
auto handleStr = folly::toJson(handle->serialize());
51+
52+
// Base64 encode
53+
auto encoded = folly::base64Encode(handleStr);
54+
j.push_back(encoded);
55+
}
56+
}
4257
} // namespace
4358

4459
ArrowFederationDataSource::ArrowFederationDataSource(
@@ -54,7 +69,8 @@ ArrowFederationDataSource::ArrowFederationDataSource(
5469
authenticator,
5570
connectorQueryCtx,
5671
flightConfig,
57-
clientOpts) {}
72+
clientOpts),
73+
columnHandles_(columnHandles) {}
5874

5975
void ArrowFederationDataSource::addSplit(
6076
std::shared_ptr<ConnectorSplit> split) {
@@ -64,16 +80,17 @@ void ArrowFederationDataSource::addSplit(
6480
"ArrowFederationDataSource received wrong type of split");
6581

6682
nlohmann::json request;
67-
request["split"] = federationSplit->splitBytes_;
68-
request["columns"] = columnMapping_;
83+
request["connectorId"] = federationSplit->connectorId;
84+
request["splitBytes"] = federationSplit->splitBytes_;
85+
to_json(request["columnHandlesBytes"], columnHandles_);
6986

7087
arrow::flight::FlightEndpoint flightEndpoint{request.dump()};
7188

7289
std::string flightEndpointBytes;
7390
AFC_ASSIGN_OR_RAISE(flightEndpointBytes, flightEndpoint.SerializeToString());
7491

7592
auto flightSplit = std::make_shared<ArrowFlightSplit>(
76-
federationSplit->connectorId, flightEndpointBytes);
93+
federationSplit->connectorId, folly::base64Encode(flightEndpointBytes));
7794

7895
ArrowFlightDataSource::addSplit(flightSplit);
7996
}

presto-native-execution/presto_cpp/main/connectors/arrow_flight/arrow_federation/ArrowFederationConnector.h

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ class Location;
2626

2727
namespace facebook::presto {
2828

29+
struct ArrowFederationTypeHandle {
30+
int jdbcType;
31+
std::string jdbcTypeName;
32+
int columnSize;
33+
int decimalDigits;
34+
};
35+
2936
struct ArrowFederationSplit : public velox::connector::ConnectorSplit {
3037
/// @param connectorId
3138
/// @param splitBytes Base64 Serialized Split for Arrow Federation Flight
@@ -40,15 +47,65 @@ struct ArrowFederationSplit : public velox::connector::ConnectorSplit {
4047

4148
class ArrowFederationColumnHandle : public velox::connector::ColumnHandle {
4249
public:
43-
explicit ArrowFederationColumnHandle(const std::string& columnName)
44-
: columnName_(columnName) {}
50+
explicit ArrowFederationColumnHandle(
51+
const std::string& columnName,
52+
const std::string& connectorId,
53+
const std::string& columnType,
54+
bool nullable,
55+
const ArrowFederationTypeHandle& arrowFederationTypeHandle)
56+
: columnName_(columnName),
57+
connectorId_(connectorId),
58+
columnType_(columnType),
59+
nullable_(nullable),
60+
arrowFederationTypeHandle_(arrowFederationTypeHandle) {}
4561

4662
const std::string& name() const {
4763
return columnName_;
4864
}
4965

66+
const std::string& connectorId() const {
67+
return connectorId_;
68+
}
69+
70+
const std::string& columnType() const {
71+
return columnType_;
72+
}
73+
74+
bool nullable() const {
75+
return nullable_;
76+
}
77+
78+
const ArrowFederationTypeHandle& arrowFederationTypeHandle() const {
79+
return arrowFederationTypeHandle_;
80+
}
81+
82+
folly::dynamic serializeArrowFederationTypeHandle(
83+
const ArrowFederationTypeHandle& handle) const {
84+
folly::dynamic obj = folly::dynamic::object;
85+
obj["jdbcType"] = handle.jdbcType;
86+
obj["jdbcTypeName"] = handle.jdbcTypeName;
87+
obj["columnSize"] = handle.columnSize;
88+
obj["decimalDigits"] = handle.decimalDigits;
89+
return obj;
90+
}
91+
92+
folly::dynamic serialize() const {
93+
folly::dynamic obj = folly::dynamic::object;
94+
obj["connectorId"] = connectorId();
95+
obj["columnName"] = name();
96+
obj["columnType"] = columnType();
97+
obj["nullable"] = nullable();
98+
obj["jdbcTypeHandle"] =
99+
serializeArrowFederationTypeHandle(arrowFederationTypeHandle());
100+
return obj;
101+
}
102+
50103
private:
51104
std::string columnName_;
105+
std::string connectorId_;
106+
std::string columnType_;
107+
bool nullable_;
108+
const ArrowFederationTypeHandle arrowFederationTypeHandle_;
52109
};
53110

54111
class ArrowFederationTableHandle
@@ -81,6 +138,9 @@ class ArrowFederationDataSource : public ArrowFlightDataSource {
81138
std::optional<velox::RowVectorPtr> next(
82139
uint64_t size,
83140
velox::ContinueFuture& /* unused */) override;
141+
142+
private:
143+
const velox::connector::ColumnHandleMap columnHandles_;
84144
};
85145

86146
class ArrowFederationConnector : public ArrowFlightConnector {

presto-native-execution/presto_cpp/main/connectors/arrow_flight/arrow_federation/ArrowFederationPrestoToVeloxConnector.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,18 @@ ArrowFederationPrestoToVeloxConnector::toVeloxColumnHandle(
3939
const protocol::arrow_federation::ArrowFederationColumnHandle*>(column);
4040
VELOX_CHECK_NOT_NULL(
4141
arrowColumn, "Unexpected column handle type {}", column->_type);
42+
43+
auto arrowJdbcTypeHandle = presto::ArrowFederationTypeHandle{
44+
arrowColumn->jdbcTypeHandle.jdbcType,
45+
arrowColumn->jdbcTypeHandle.jdbcTypeName,
46+
arrowColumn->jdbcTypeHandle.columnSize,
47+
arrowColumn->jdbcTypeHandle.decimalDigits};
4248
return std::make_unique<presto::ArrowFederationColumnHandle>(
43-
arrowColumn->columnName);
49+
arrowColumn->columnName,
50+
arrowColumn->connectorId,
51+
arrowColumn->columnType,
52+
arrowColumn->nullable,
53+
arrowJdbcTypeHandle);
4454
}
4555

4656
std::unique_ptr<velox::connector::ConnectorTableHandle>

presto-native-execution/presto_cpp/presto_protocol/connector/arrow_federation/presto_protocol-json-hpp.mustache

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,90 @@ void from_json(const json& j, ArrowFederationTableHandle& p) {
8989
namespace facebook::presto::protocol::arrow_federation {
9090
9191
struct ArrowFederationColumnHandle : public ColumnHandle {
92+
String connectorId = {};
9293
String columnName = {};
94+
String columnType = {};
95+
boolean nullable = {};
96+
JdbcTypeHandle jdbcTypeHandle = {};
9397
};
9498

9599
void to_json(json& j, const ArrowFederationColumnHandle& p) {
96100
j = json::object();
97101
j["@type"] = "arrow-federation";
98102
to_json_key(
99-
j, "columnName", p.columnName, "ArrowFederationColumnHandle", "ColumnName", "columnName");
100-
103+
j,
104+
"connectorId",
105+
p.connectorId,
106+
"ArrowFederationColumnHandle",
107+
"ConnectorId",
108+
"connectorId");
109+
to_json_key(
110+
j,
111+
"columnName",
112+
p.columnName,
113+
"ArrowFederationColumnHandle",
114+
"ColumnName",
115+
"columnName");
116+
to_json_key(
117+
j,
118+
"columnType",
119+
p.columnType,
120+
"ArrowFederationColumnHandle",
121+
"ColumnType",
122+
"columnType");
123+
to_json_key(
124+
j,
125+
"nullable",
126+
p.nullable,
127+
"ArrowFederationColumnHandle",
128+
"Nullable",
129+
"nullable");
130+
to_json_key(
131+
j,
132+
"jdbcTypeHandle",
133+
p.jdbcTypeHandle,
134+
"ArrowFederationColumnHandle",
135+
"JdbcTypeHandle",
136+
"jdbcTypeHandle");
101137
}
102138

103139
void from_json(const json& j, ArrowFederationColumnHandle& p) {
104140
p._type = j["@type"];
105141
from_json_key(
106-
j, "columnName", p.columnName, "ArrowFederationColumnHandle", "ColumnName", "columnName");
142+
j,
143+
"connectorId",
144+
p.connectorId,
145+
"ArrowFederationColumnHandle",
146+
"ConnectorId",
147+
"connectorId");
148+
from_json_key(
149+
j,
150+
"columnName",
151+
p.columnName,
152+
"ArrowFederationColumnHandle",
153+
"ColumnName",
154+
"columnName");
155+
from_json_key(
156+
j,
157+
"columnType",
158+
p.columnType,
159+
"ArrowFederationColumnHandle",
160+
"ColumnType",
161+
"columnType");
162+
from_json_key(
163+
j,
164+
"nullable",
165+
p.nullable,
166+
"ArrowFederationColumnHandle",
167+
"Nullable",
168+
"nullable");
169+
from_json_key(
170+
j,
171+
"jdbcTypeHandle",
172+
p.jdbcTypeHandle,
173+
"ArrowFederationColumnHandle",
174+
"JdbcTypeHandle",
175+
"jdbcTypeHandle");
107176
}
108177
} // namespace facebook::presto::protocol::arrow_federation
109178

0 commit comments

Comments
 (0)