Skip to content

Commit adb08b3

Browse files
committed
feat(functions) : Add e2e test for remote functions
1 parent 05c66cf commit adb08b3

File tree

8 files changed

+250
-47
lines changed

8 files changed

+250
-47
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,4 @@ presto-native-execution/deps-install
6666
# Compiled executables used for docker build
6767
/docker/presto-cli-*-executable.jar
6868
/docker/presto-server-*.tar.gz
69+
/docker/presto-function-server-executable.jar

Jenkinsfile

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ pipeline {
133133
returnStdout: true).trim()
134134
env.PRESTO_PKG = "presto-server-${PRESTO_VERSION}.tar.gz"
135135
env.PRESTO_CLI_JAR = "presto-cli-${PRESTO_VERSION}-executable.jar"
136+
env.PRESTO_REMOTE_SERVER_JAR = "presto-function-server-executable.jar"
136137
env.PRESTO_BUILD_VERSION = env.PRESTO_VERSION + '-' +
137138
sh(script: "git show -s --format=%cd --date=format:'%Y%m%d%H%M%S'", returnStdout: true).trim() + "-" +
138139
env.PRESTO_COMMIT_SHA.substring(0, 7)
@@ -159,9 +160,10 @@ pipeline {
159160
sh '''
160161
echo "${PRESTO_BUILD_VERSION}" > index.txt
161162
git log -n 10 >> index.txt
162-
aws s3 cp index.txt ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
163-
aws s3 cp presto-server/target/${PRESTO_PKG} ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
164-
aws s3 cp presto-cli/target/${PRESTO_CLI_JAR} ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
163+
aws s3 cp index.txt ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
164+
aws s3 cp presto-server/target/${PRESTO_PKG} ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
165+
aws s3 cp presto-cli/target/${PRESTO_CLI_JAR} ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
166+
aws s3 cp presto-function-server/target/${PRESTO_REMOTE_SERVER_JAR} ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ --no-progress
165167
'''
166168
}
167169
}
@@ -203,8 +205,9 @@ pipeline {
203205
secretKeyVariable: 'AWS_SECRET_ACCESS_KEY']]) {
204206
sh '''#!/bin/bash -ex
205207
cd docker/
206-
aws s3 cp ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/${PRESTO_PKG} . --no-progress
207-
aws s3 cp ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/${PRESTO_CLI_JAR} . --no-progress
208+
aws s3 cp ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/${PRESTO_PKG} . --no-progress
209+
aws s3 cp ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/${PRESTO_CLI_JAR} . --no-progress
210+
aws s3 cp ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/${PRESTO_REMOTE_SERVER_JAR} . --no-progress
208211
209212
echo "Building ${DOCKER_IMAGE}"
210213
REG_ORG=${AWS_ECR} IMAGE_NAME=${IMG_NAME} TAG=${PRESTO_BUILD_VERSION} ./build.sh ${PRESTO_VERSION}
@@ -265,7 +268,7 @@ pipeline {
265268
-t "${NATIVE_DOCKER_IMAGE}" \
266269
--build-arg BUILD_TYPE=Release \
267270
--build-arg DEPENDENCY_IMAGE=${AWS_ECR}/presto-native-dependency:latest \
268-
--build-arg "EXTRA_CMAKE_FLAGS=-DPRESTO_ENABLE_TESTING=OFF -DPRESTO_ENABLE_PARQUET=ON -DPRESTO_ENABLE_S3=ON" \
271+
--build-arg "EXTRA_CMAKE_FLAGS=-DPRESTO_ENABLE_TESTING=OFF -DPRESTO_ENABLE_PARQUET=ON -DPRESTO_ENABLE_S3=ON -DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON" \
269272
-f scripts/dockerfiles/prestissimo-runtime.dockerfile \
270273
.
271274
docker image ls

docker/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ FROM quay.io/centos/centos:stream9
33
ARG PRESTO_VERSION
44
ARG PRESTO_PKG=presto-server-$PRESTO_VERSION.tar.gz
55
ARG PRESTO_CLI_JAR=presto-cli-$PRESTO_VERSION-executable.jar
6+
ARG PRESTO_REMOTE_SERVER_JAR=presto-function-server-executable.jar
67
ARG JMX_PROMETHEUS_JAVAAGENT_VERSION=0.20.0
78

89
ENV PRESTO_HOME="/opt/presto-server"
@@ -13,7 +14,8 @@ RUN --mount=type=cache,target=/var/cache/dnf,sharing=locked \
1314
# clean cache jobs
1415
&& mv /etc/yum/protected.d/systemd.conf /etc/yum/protected.d/systemd.conf.bak
1516

16-
COPY --chmod=755 $PRESTO_CLI_JAR /opt/presto-cli
17+
COPY --chmod=755 $PRESTO_CLI_JAR /opt/presto-cli
18+
COPY --chmod=755 $PRESTO_REMOTE_SERVER_JAR /opt/presto-function-server
1719

1820
RUN --mount=type=bind,source=$PRESTO_PKG,target=/$PRESTO_PKG \
1921
# Download Presto and move \
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Ignore build directories
2+
_build/
3+
cmake-build-debug/
4+
cmake-build-release/

presto-native-execution/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,12 @@
503503
<include>presto-cli-*-executable.jar</include>
504504
</includes>
505505
</resource>
506+
<resource>
507+
<directory>${project.parent.basedir}/presto-function-server/target</directory>
508+
<includes>
509+
<include>presto-function-server-executable.jar</include>
510+
</includes>
511+
</resource>
506512
<resource>
507513
<directory>${project.parent.basedir}/presto-server/target</directory>
508514
<includes>
@@ -574,7 +580,7 @@
574580
<args>
575581
<BUILD_TYPE>Release</BUILD_TYPE>
576582
<DEPENDENCY_IMAGE>presto-native-dependency:latest</DEPENDENCY_IMAGE>
577-
<EXTRA_CMAKE_FLAGS>-DPRESTO_ENABLE_TESTING=OFF</EXTRA_CMAKE_FLAGS>
583+
<EXTRA_CMAKE_FLAGS>"-DPRESTO_ENABLE_TESTING=OFF -DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON"</EXTRA_CMAKE_FLAGS>
578584
<NUM_THREADS>2</NUM_THREADS>
579585
<BASE_IMAGE>ubuntu:22.04</BASE_IMAGE>
580586
<OSNAME>ubuntu</OSNAME>

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java

Lines changed: 95 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.facebook.presto.sql.planner.NodePartitioningManager;
2929
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
3030
import com.facebook.presto.testing.MaterializedResult;
31+
import com.facebook.presto.testing.MaterializedRow;
3132
import com.facebook.presto.testing.QueryRunner;
3233
import com.facebook.presto.testing.TestingAccessControlManager;
3334
import com.facebook.presto.transaction.TransactionManager;
@@ -38,6 +39,7 @@
3839

3940
import java.io.IOException;
4041
import java.sql.Connection;
42+
import java.sql.DriverManager;
4143
import java.sql.ResultSet;
4244
import java.sql.SQLException;
4345
import java.sql.Statement;
@@ -50,46 +52,57 @@
5052
import java.util.logging.Logger;
5153

5254
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
53-
import static java.sql.DriverManager.getConnection;
5455

5556
public class ContainerQueryRunner
5657
implements QueryRunner
5758
{
58-
private static final Network network = Network.newNetwork();
59-
private static final String PRESTO_COORDINATOR_IMAGE = System.getProperty("coordinatorImage", "presto-coordinator:latest");
60-
private static final String PRESTO_WORKER_IMAGE = System.getProperty("workerImage", "presto-worker:latest");
61-
private static final String CONTAINER_TIMEOUT = System.getProperty("containerTimeout", "120");
62-
private static final String CLUSTER_SHUTDOWN_TIMEOUT = System.getProperty("clusterShutDownTimeout", "10");
63-
private static final String BASE_DIR = System.getProperty("user.dir");
64-
private static final int DEFAULT_COORDINATOR_PORT = 8080;
65-
private static final String TPCH_CATALOG = "tpch";
66-
private static final String TINY_SCHEMA = "tiny";
67-
private static final int DEFAULT_NUMBER_OF_WORKERS = 4;
68-
private static final Logger logger = Logger.getLogger(ContainerQueryRunner.class.getName());
69-
private final GenericContainer<?> coordinator;
70-
private final List<GenericContainer<?>> workers = new ArrayList<>();
71-
private final int coordinatorPort;
72-
private final String catalog;
73-
private final String schema;
74-
private final int numberOfWorkers;
75-
private Connection connection;
59+
protected static final Network network = Network.newNetwork();
60+
protected static final String PRESTO_COORDINATOR_IMAGE = System.getProperty("coordinatorImage", "presto-coordinator:latest");
61+
protected static final String PRESTO_WORKER_IMAGE = System.getProperty("workerImage", "presto-worker:latest");
62+
protected static final String CONTAINER_TIMEOUT = System.getProperty("containerTimeout", "120");
63+
protected static final String CLUSTER_SHUTDOWN_TIMEOUT = System.getProperty("clusterShutDownTimeout", "10");
64+
protected static final String BASE_DIR = System.getProperty("user.dir");
65+
protected static final int DEFAULT_COORDINATOR_PORT = 8080;
66+
protected static final int DEFAULT_FUNCTION_SERVER_PORT = 1122;
67+
protected static final String TPCH_CATALOG = "tpch";
68+
protected static final String TINY_SCHEMA = "tiny";
69+
protected static final int DEFAULT_NUMBER_OF_WORKERS = 4;
70+
71+
protected static final Logger logger = Logger.getLogger(ContainerQueryRunner.class.getName());
72+
73+
protected final GenericContainer<?> coordinator;
74+
protected final List<GenericContainer<?>> workers = new ArrayList<>();
75+
protected final int coordinatorPort;
76+
protected final String catalog;
77+
protected final String schema;
78+
protected GenericContainer<?> functionServer;
79+
protected int functionServerPort;
80+
protected boolean enableFunctionServer;
81+
protected Connection connection;
7682

7783
public ContainerQueryRunner()
7884
throws InterruptedException, IOException
7985
{
80-
this(DEFAULT_COORDINATOR_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS);
86+
this(DEFAULT_COORDINATOR_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS, DEFAULT_FUNCTION_SERVER_PORT, false);
8187
}
8288

83-
public ContainerQueryRunner(int coordinatorPort, String catalog, String schema, int numberOfWorkers)
89+
public ContainerQueryRunner(int coordinatorPort, String catalog, String schema, int numberOfWorkers, int functionServerPort, boolean enableFunctionServer)
8490
throws InterruptedException, IOException
8591
{
8692
this.coordinatorPort = coordinatorPort;
8793
this.catalog = catalog;
8894
this.schema = schema;
89-
this.numberOfWorkers = numberOfWorkers;
95+
this.functionServerPort = functionServerPort;
96+
this.enableFunctionServer = enableFunctionServer;
97+
98+
// Start function server first if enabled
99+
if (enableFunctionServer) {
100+
this.functionServer = createFunctionServer();
101+
this.functionServer.start();
102+
logger.info("Presto function server is deployed at http://" + functionServer.getHost() + ":" + functionServer.getMappedPort(functionServerPort));
103+
}
90104

91-
// The container details can be added as properties in VM options for testing in IntelliJ.
92-
coordinator = createCoordinator();
105+
this.coordinator = createCoordinator();
93106
for (int i = 0; i < numberOfWorkers; i++) {
94107
workers.add(createNativeWorker(7777 + i, "native-worker-" + i));
95108
}
@@ -107,23 +120,27 @@ public ContainerQueryRunner(int coordinatorPort, String catalog, String schema,
107120
coordinator.getMappedPort(coordinatorPort),
108121
catalog,
109122
schema,
110-
"timeZoneId=UTC");
123+
enableFunctionServer ? "timeZoneId=UTC&sessionProperties=remote_functions_enabled:true" : "timeZoneId=UTC");
111124

112125
try {
113-
connection = getConnection(url, "test", null);
126+
this.connection = DriverManager.getConnection(url, "test", null);
114127
}
115128
catch (SQLException e) {
116129
throw new RuntimeException(e);
117130
}
118131

119132
// Delete the temporary files once the containers are started.
120133
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/coordinator");
121-
for (int i = 0; i < numberOfWorkers; i++) {
122-
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/native-worker-" + i);
134+
for (GenericContainer<?> worker : workers) {
135+
String alias = worker.getNetworkAliases().get(1);
136+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/" + alias);
137+
}
138+
if (enableFunctionServer) {
139+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/function-server");
123140
}
124141
}
125142

126-
private GenericContainer<?> createCoordinator()
143+
protected GenericContainer<?> createCoordinator()
127144
throws IOException
128145
{
129146
ContainerQueryRunnerUtils.createCoordinatorTpchProperties();
@@ -132,22 +149,25 @@ private GenericContainer<?> createCoordinator()
132149
ContainerQueryRunnerUtils.createCoordinatorJvmConfig();
133150
ContainerQueryRunnerUtils.createCoordinatorLogProperties();
134151
ContainerQueryRunnerUtils.createCoordinatorNodeProperties();
135-
ContainerQueryRunnerUtils.createCoordinatorEntryPointScript();
152+
ContainerQueryRunnerUtils.createCoordinatorEntryPointScript(); // Never run function server in coordinator
153+
if (enableFunctionServer) {
154+
ContainerQueryRunnerUtils.createRestRemoteProperties(functionServerPort);
155+
}
136156

137157
return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE)
138-
.withExposedPorts(coordinatorPort)
139158
.withNetwork(network)
140159
.withNetworkAliases("presto-coordinator")
141160
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/coordinator/etc"), "/opt/presto-server/etc")
142161
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/coordinator/entrypoint.sh"), "/opt/entrypoint.sh")
143162
.waitingFor(Wait.forLogMessage(".*======== SERVER STARTED ========.*", 1))
144-
.withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)));
163+
.withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)))
164+
.withExposedPorts(coordinatorPort);
145165
}
146166

147-
private GenericContainer<?> createNativeWorker(int port, String nodeId)
167+
protected GenericContainer<?> createNativeWorker(int port, String nodeId)
148168
throws IOException
149169
{
150-
ContainerQueryRunnerUtils.createNativeWorkerConfigProperties(coordinatorPort, nodeId);
170+
ContainerQueryRunnerUtils.createNativeWorkerConfigPropertiesWithFunctionServer(coordinatorPort, functionServerPort, nodeId);
151171
ContainerQueryRunnerUtils.createNativeWorkerTpchProperties(nodeId);
152172
ContainerQueryRunnerUtils.createNativeWorkerEntryPointScript(nodeId);
153173
ContainerQueryRunnerUtils.createNativeWorkerNodeProperties(nodeId);
@@ -160,6 +180,23 @@ private GenericContainer<?> createNativeWorker(int port, String nodeId)
160180
.waitingFor(Wait.forLogMessage(".*Announcement succeeded: HTTP 202.*", 1));
161181
}
162182

183+
protected GenericContainer<?> createFunctionServer()
184+
throws IOException
185+
{
186+
ContainerQueryRunnerUtils.createFunctionServerConfigProperties(functionServerPort);
187+
ContainerQueryRunnerUtils.createFunctionServerEntryPointScript();
188+
189+
// Reuse the coordinator image since it already contains the function server jar
190+
return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE)
191+
.withNetwork(network)
192+
.withNetworkAliases("presto-function-server")
193+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/function-server/etc"), "/opt/function-server/etc")
194+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/function-server/entrypoint.sh"), "/opt/entrypoint.sh")
195+
.waitingFor(Wait.forLogMessage(".*======== REMOTE FUNCTION SERVER STARTED at: .*", 1))
196+
.withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)))
197+
.withExposedPorts(functionServerPort);
198+
}
199+
163200
@Override
164201
public void close()
165202
{
@@ -171,6 +208,9 @@ public void close()
171208
}
172209
coordinator.stop();
173210
workers.forEach(GenericContainer::stop);
211+
if (functionServer != null) {
212+
functionServer.stop();
213+
}
174214
}
175215

176216
@Override
@@ -248,7 +288,27 @@ public MaterializedResult execute(String sql)
248288
@Override
249289
public MaterializedResult execute(Session session, String sql, List<? extends Type> resultTypes)
250290
{
251-
throw new UnsupportedOperationException();
291+
// Added logic similar to H2QueryRunner.
292+
try {
293+
Statement statement = connection.createStatement();
294+
ResultSet resultSet = statement.executeQuery(sql);
295+
MaterializedResult rawResult = ContainerQueryRunnerUtils.toMaterializedResult(resultSet);
296+
297+
// Coerce the raw result to the requested resultTypes
298+
List<MaterializedRow> coercedRows = new ArrayList<>();
299+
for (MaterializedRow row : rawResult.getMaterializedRows()) {
300+
List<Object> coercedValues = new ArrayList<>();
301+
for (int i = 0; i < resultTypes.size(); i++) {
302+
Object value = row.getField(i);
303+
coercedValues.add(value);
304+
}
305+
coercedRows.add(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, coercedValues));
306+
}
307+
return new MaterializedResult(coercedRows, resultTypes);
308+
}
309+
catch (SQLException e) {
310+
throw new RuntimeException("Error executing query: " + sql, e);
311+
}
252312
}
253313

254314
@Override

0 commit comments

Comments
 (0)