2828import com .facebook .presto .sql .planner .NodePartitioningManager ;
2929import com .facebook .presto .sql .planner .sanity .PlanCheckerProviderManager ;
3030import com .facebook .presto .testing .MaterializedResult ;
31+ import com .facebook .presto .testing .MaterializedRow ;
3132import com .facebook .presto .testing .QueryRunner ;
3233import com .facebook .presto .testing .TestingAccessControlManager ;
3334import com .facebook .presto .transaction .TransactionManager ;
3839
3940import java .io .IOException ;
4041import java .sql .Connection ;
42+ import java .sql .DriverManager ;
4143import java .sql .ResultSet ;
4244import java .sql .SQLException ;
4345import java .sql .Statement ;
5052import java .util .logging .Logger ;
5153
5254import static com .facebook .presto .testing .TestingSession .testSessionBuilder ;
53- import static java .sql .DriverManager .getConnection ;
5455
5556public class ContainerQueryRunner
5657 implements QueryRunner
5758{
58- private static final Network network = Network .newNetwork ();
59- private static final String PRESTO_COORDINATOR_IMAGE = System .getProperty ("coordinatorImage" , "presto-coordinator:latest" );
60- private static final String PRESTO_WORKER_IMAGE = System .getProperty ("workerImage" , "presto-worker:latest" );
61- private static final String CONTAINER_TIMEOUT = System .getProperty ("containerTimeout" , "120" );
62- private static final String CLUSTER_SHUTDOWN_TIMEOUT = System .getProperty ("clusterShutDownTimeout" , "10" );
63- private static final String BASE_DIR = System .getProperty ("user.dir" );
64- private static final int DEFAULT_COORDINATOR_PORT = 8080 ;
65- private static final String TPCH_CATALOG = "tpch" ;
66- private static final String TINY_SCHEMA = "tiny" ;
67- private static final int DEFAULT_NUMBER_OF_WORKERS = 4 ;
68- private static final Logger logger = Logger .getLogger (ContainerQueryRunner .class .getName ());
69- private final GenericContainer <?> coordinator ;
70- private final List <GenericContainer <?>> workers = new ArrayList <>();
71- private final int coordinatorPort ;
72- private final String catalog ;
73- private final String schema ;
74- private final int numberOfWorkers ;
75- private Connection connection ;
59+ protected static final Network network = Network .newNetwork ();
60+ protected static final String PRESTO_COORDINATOR_IMAGE = System .getProperty ("coordinatorImage" , "presto-coordinator:latest" );
61+ protected static final String PRESTO_WORKER_IMAGE = System .getProperty ("workerImage" , "presto-worker:latest" );
62+ protected static final String CONTAINER_TIMEOUT = System .getProperty ("containerTimeout" , "120" );
63+ protected static final String CLUSTER_SHUTDOWN_TIMEOUT = System .getProperty ("clusterShutDownTimeout" , "10" );
64+ protected static final String BASE_DIR = System .getProperty ("user.dir" );
65+ protected static final int DEFAULT_COORDINATOR_PORT = 8080 ;
66+ protected static final int DEFAULT_FUNCTION_SERVER_PORT = 1122 ;
67+ protected static final String TPCH_CATALOG = "tpch" ;
68+ protected static final String TINY_SCHEMA = "tiny" ;
69+ protected static final int DEFAULT_NUMBER_OF_WORKERS = 4 ;
70+
71+ protected static final Logger logger = Logger .getLogger (ContainerQueryRunner .class .getName ());
72+
73+ protected final GenericContainer <?> coordinator ;
74+ protected final List <GenericContainer <?>> workers = new ArrayList <>();
75+ protected final int coordinatorPort ;
76+ protected final String catalog ;
77+ protected final String schema ;
78+ protected GenericContainer <?> functionServer ;
79+ protected int functionServerPort ;
80+ protected boolean enableFunctionServer ;
81+ protected Connection connection ;
7682
7783 public ContainerQueryRunner ()
7884 throws InterruptedException , IOException
7985 {
80- this (DEFAULT_COORDINATOR_PORT , TPCH_CATALOG , TINY_SCHEMA , DEFAULT_NUMBER_OF_WORKERS );
86+ this (DEFAULT_COORDINATOR_PORT , TPCH_CATALOG , TINY_SCHEMA , DEFAULT_NUMBER_OF_WORKERS , DEFAULT_FUNCTION_SERVER_PORT , false );
8187 }
8288
83- public ContainerQueryRunner (int coordinatorPort , String catalog , String schema , int numberOfWorkers )
89+ public ContainerQueryRunner (int coordinatorPort , String catalog , String schema , int numberOfWorkers , int functionServerPort , boolean enableFunctionServer )
8490 throws InterruptedException , IOException
8591 {
8692 this .coordinatorPort = coordinatorPort ;
8793 this .catalog = catalog ;
8894 this .schema = schema ;
89- this .numberOfWorkers = numberOfWorkers ;
95+ this .functionServerPort = functionServerPort ;
96+ this .enableFunctionServer = enableFunctionServer ;
97+
98+ // Start function server first if enabled
99+ if (enableFunctionServer ) {
100+ this .functionServer = createFunctionServer ();
101+ this .functionServer .start ();
102+ logger .info ("Presto function server is deployed at http://" + functionServer .getHost () + ":" + functionServer .getMappedPort (functionServerPort ));
103+ }
90104
91- // The container details can be added as properties in VM options for testing in IntelliJ.
92- coordinator = createCoordinator ();
105+ this .coordinator = createCoordinator ();
93106 for (int i = 0 ; i < numberOfWorkers ; i ++) {
94107 workers .add (createNativeWorker (7777 + i , "native-worker-" + i ));
95108 }
@@ -107,23 +120,32 @@ public ContainerQueryRunner(int coordinatorPort, String catalog, String schema,
107120 coordinator .getMappedPort (coordinatorPort ),
108121 catalog ,
109122 schema ,
110- "timeZoneId=UTC" );
123+ enableFunctionServer ? "timeZoneId=UTC&sessionProperties=remote_functions_enabled:true" : "timeZoneId=UTC" );
124+
125+ postStartContainers (url );
126+ }
111127
128+ protected void postStartContainers (String url )
129+ {
112130 try {
113- connection = getConnection (url , "test" , null );
131+ this . connection = DriverManager . getConnection (url , "test" , null );
114132 }
115133 catch (SQLException e ) {
116134 throw new RuntimeException (e );
117135 }
118136
119137 // Delete the temporary files once the containers are started.
120138 ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/coordinator" );
121- for (int i = 0 ; i < numberOfWorkers ; i ++) {
122- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/native-worker-" + i );
139+ for (GenericContainer <?> worker : workers ) {
140+ String alias = worker .getNetworkAliases ().get (1 );
141+ ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/" + alias );
142+ }
143+ if (enableFunctionServer ) {
144+ ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/function-server" );
123145 }
124146 }
125147
126- private GenericContainer <?> createCoordinator ()
148+ protected GenericContainer <?> createCoordinator ()
127149 throws IOException
128150 {
129151 ContainerQueryRunnerUtils .createCoordinatorTpchProperties ();
@@ -132,22 +154,25 @@ private GenericContainer<?> createCoordinator()
132154 ContainerQueryRunnerUtils .createCoordinatorJvmConfig ();
133155 ContainerQueryRunnerUtils .createCoordinatorLogProperties ();
134156 ContainerQueryRunnerUtils .createCoordinatorNodeProperties ();
135- ContainerQueryRunnerUtils .createCoordinatorEntryPointScript ();
157+ ContainerQueryRunnerUtils .createCoordinatorEntryPointScript (); // Never run function server in coordinator
158+ if (enableFunctionServer ) {
159+ ContainerQueryRunnerUtils .createRestRemoteProperties (functionServerPort );
160+ }
136161
137162 return new GenericContainer <>(PRESTO_COORDINATOR_IMAGE )
138- .withExposedPorts (coordinatorPort )
139163 .withNetwork (network )
140164 .withNetworkAliases ("presto-coordinator" )
141165 .withCopyFileToContainer (MountableFile .forHostPath (BASE_DIR + "/testcontainers/coordinator/etc" ), "/opt/presto-server/etc" )
142166 .withCopyFileToContainer (MountableFile .forHostPath (BASE_DIR + "/testcontainers/coordinator/entrypoint.sh" ), "/opt/entrypoint.sh" )
143167 .waitingFor (Wait .forLogMessage (".*======== SERVER STARTED ========.*" , 1 ))
144- .withStartupTimeout (Duration .ofSeconds (Long .parseLong (CONTAINER_TIMEOUT )));
168+ .withStartupTimeout (Duration .ofSeconds (Long .parseLong (CONTAINER_TIMEOUT )))
169+ .withExposedPorts (coordinatorPort );
145170 }
146171
147- private GenericContainer <?> createNativeWorker (int port , String nodeId )
172+ protected GenericContainer <?> createNativeWorker (int port , String nodeId )
148173 throws IOException
149174 {
150- ContainerQueryRunnerUtils .createNativeWorkerConfigProperties (coordinatorPort , nodeId );
175+ ContainerQueryRunnerUtils .createNativeWorkerConfigPropertiesWithFunctionServer (coordinatorPort , functionServerPort , nodeId );
151176 ContainerQueryRunnerUtils .createNativeWorkerTpchProperties (nodeId );
152177 ContainerQueryRunnerUtils .createNativeWorkerEntryPointScript (nodeId );
153178 ContainerQueryRunnerUtils .createNativeWorkerNodeProperties (nodeId );
@@ -160,6 +185,23 @@ private GenericContainer<?> createNativeWorker(int port, String nodeId)
160185 .waitingFor (Wait .forLogMessage (".*Announcement succeeded: HTTP 202.*" , 1 ));
161186 }
162187
188+ protected GenericContainer <?> createFunctionServer ()
189+ throws IOException
190+ {
191+ ContainerQueryRunnerUtils .createFunctionServerConfigProperties (functionServerPort );
192+ ContainerQueryRunnerUtils .createFunctionServerEntryPointScript ();
193+
194+ // Reuse the coordinator image since it already contains the function server jar
195+ return new GenericContainer <>(PRESTO_COORDINATOR_IMAGE )
196+ .withNetwork (network )
197+ .withNetworkAliases ("presto-function-server" )
198+ .withCopyFileToContainer (MountableFile .forHostPath (BASE_DIR + "/testcontainers/function-server/etc" ), "/opt/function-server/etc" )
199+ .withCopyFileToContainer (MountableFile .forHostPath (BASE_DIR + "/testcontainers/function-server/entrypoint.sh" ), "/opt/entrypoint.sh" )
200+ .waitingFor (Wait .forLogMessage (".* STARTED .*" , 1 ))
201+ .withStartupTimeout (Duration .ofSeconds (Long .parseLong (CONTAINER_TIMEOUT )))
202+ .withExposedPorts (functionServerPort );
203+ }
204+
163205 @ Override
164206 public void close ()
165207 {
@@ -171,6 +213,9 @@ public void close()
171213 }
172214 coordinator .stop ();
173215 workers .forEach (GenericContainer ::stop );
216+ if (functionServer != null ) {
217+ functionServer .stop ();
218+ }
174219 }
175220
176221 @ Override
@@ -248,7 +293,27 @@ public MaterializedResult execute(String sql)
248293 @ Override
249294 public MaterializedResult execute (Session session , String sql , List <? extends Type > resultTypes )
250295 {
251- throw new UnsupportedOperationException ();
296+ // Added logic similar to H2QueryRunner.
297+ try {
298+ Statement statement = connection .createStatement ();
299+ ResultSet resultSet = statement .executeQuery (sql );
300+ MaterializedResult rawResult = ContainerQueryRunnerUtils .toMaterializedResult (resultSet );
301+
302+ // Coerce the raw result to the requested resultTypes
303+ List <MaterializedRow > coercedRows = new ArrayList <>();
304+ for (MaterializedRow row : rawResult .getMaterializedRows ()) {
305+ List <Object > coercedValues = new ArrayList <>();
306+ for (int i = 0 ; i < resultTypes .size (); i ++) {
307+ Object value = row .getField (i );
308+ coercedValues .add (value );
309+ }
310+ coercedRows .add (new MaterializedRow (MaterializedResult .DEFAULT_PRECISION , coercedValues ));
311+ }
312+ return new MaterializedResult (coercedRows , resultTypes );
313+ }
314+ catch (SQLException e ) {
315+ throw new RuntimeException ("Error executing query: " + sql , e );
316+ }
252317 }
253318
254319 @ Override
0 commit comments