3737
3838import java .io .IOException ;
3939import java .sql .Connection ;
40- import java .sql .DriverManager ;
4140import java .sql .ResultSet ;
4241import java .sql .SQLException ;
4342import java .sql .Statement ;
5150import java .util .logging .Logger ;
5251
5352import static com .facebook .presto .testing .TestingSession .testSessionBuilder ;
53+ import static java .sql .DriverManager .getConnection ;
5454
5555public class ContainerQueryRunner
5656 implements QueryRunner
@@ -62,32 +62,31 @@ public class ContainerQueryRunner
6262 private static final String CLUSTER_SHUTDOWN_TIMEOUT = System .getProperty ("clusterShutDownTimeout" , "10" );
6363 private static final String BASE_DIR = System .getProperty ("user.dir" );
6464 private static final int DEFAULT_COORDINATOR_PORT = 8080 ;
65- private static final int DEFAULT_FUNCTION_SERVER_PORT = 1122 ;
6665 private static final String TPCH_CATALOG = "tpch" ;
6766 private static final String TINY_SCHEMA = "tiny" ;
6867 private static final int DEFAULT_NUMBER_OF_WORKERS = 4 ;
6968 private static final Logger logger = Logger .getLogger (ContainerQueryRunner .class .getName ());
7069 private final GenericContainer <?> coordinator ;
7170 private final List <GenericContainer <?>> workers = new ArrayList <>();
7271 private final int coordinatorPort ;
73- private final int functionServerPort ;
7472 private final String catalog ;
7573 private final String schema ;
76- private Statement statement ;
74+ private final int numberOfWorkers ;
75+ private Connection connection ;
7776
7877 public ContainerQueryRunner ()
7978 throws InterruptedException , IOException
8079 {
81- this (DEFAULT_COORDINATOR_PORT , DEFAULT_FUNCTION_SERVER_PORT , TPCH_CATALOG , TINY_SCHEMA , DEFAULT_NUMBER_OF_WORKERS );
80+ this (DEFAULT_COORDINATOR_PORT , TPCH_CATALOG , TINY_SCHEMA , DEFAULT_NUMBER_OF_WORKERS );
8281 }
8382
84- public ContainerQueryRunner (int coordinatorPort , int functionServerPort , String catalog , String schema , int numberOfWorkers )
83+ public ContainerQueryRunner (int coordinatorPort , String catalog , String schema , int numberOfWorkers )
8584 throws InterruptedException , IOException
8685 {
8786 this .coordinatorPort = coordinatorPort ;
88- this .functionServerPort = functionServerPort ;
8987 this .catalog = catalog ;
9088 this .schema = schema ;
89+ this .numberOfWorkers = numberOfWorkers ;
9190
9291 // The container details can be added as properties in VM options for testing in IntelliJ.
9392 coordinator = createCoordinator ();
@@ -109,9 +108,7 @@ public ContainerQueryRunner(int coordinatorPort, int functionServerPort, String
109108 "timeZoneId=UTC" );
110109
111110 try {
112- Connection connection = DriverManager .getConnection (url , "test" , null );
113- statement = connection .createStatement ();
114- statement .execute ("set session remote_functions_enabled=true" );
111+ connection = getConnection (url , "test" , null );
115112 }
116113 catch (SQLException e ) {
117114 throw new RuntimeException (e );
@@ -133,14 +130,11 @@ private GenericContainer<?> createCoordinator()
133130 ContainerQueryRunnerUtils .createCoordinatorLogProperties ();
134131 ContainerQueryRunnerUtils .createCoordinatorNodeProperties ();
135132 ContainerQueryRunnerUtils .createCoordinatorEntryPointScript ();
136- ContainerQueryRunnerUtils .createFunctionNamespaceRemoteProperties (functionServerPort );
137- ContainerQueryRunnerUtils .createFunctionServerConfigProperties (functionServerPort );
138133
139134 return new GenericContainer <>(PRESTO_COORDINATOR_IMAGE )
140135 .withExposedPorts (coordinatorPort )
141136 .withNetwork (network ).withNetworkAliases ("presto-coordinator" )
142137 .withFileSystemBind (BASE_DIR + "/testcontainers/coordinator/etc" , "/opt/presto-server/etc" , BindMode .READ_WRITE )
143- .withFileSystemBind (BASE_DIR + "/testcontainers/coordinator/etc/function-server" , "/opt/function-server/etc" , BindMode .READ_ONLY )
144138 .withFileSystemBind (BASE_DIR + "/testcontainers/coordinator/entrypoint.sh" , "/opt/entrypoint.sh" , BindMode .READ_ONLY )
145139 .waitingFor (Wait .forLogMessage (".*======== SERVER STARTED ========.*" , 1 ))
146140 .withStartupTimeout (Duration .ofSeconds (Long .parseLong (CONTAINER_TIMEOUT )));
@@ -149,7 +143,7 @@ private GenericContainer<?> createCoordinator()
149143 private GenericContainer <?> createNativeWorker (int port , String nodeId )
150144 throws IOException
151145 {
152- ContainerQueryRunnerUtils .createNativeWorkerConfigProperties (coordinatorPort , functionServerPort , nodeId );
146+ ContainerQueryRunnerUtils .createNativeWorkerConfigProperties (coordinatorPort , nodeId );
153147 ContainerQueryRunnerUtils .createNativeWorkerTpchProperties (nodeId );
154148 ContainerQueryRunnerUtils .createNativeWorkerEntryPointScript (nodeId );
155149 ContainerQueryRunnerUtils .createNativeWorkerNodeProperties (nodeId );
@@ -302,12 +296,12 @@ public Session getDefaultSession()
302296 public MaterializedResult execute (Session session , String sql )
303297 {
304298 try {
299+ Statement statement = connection .createStatement ();
305300 ResultSet resultSet = statement .executeQuery (sql );
306- return ContainerQueryRunnerUtils
307- .toMaterializedResult (resultSet );
301+ return ContainerQueryRunnerUtils .toMaterializedResult (resultSet );
308302 }
309303 catch (SQLException e ) {
310- throw new RuntimeException (e );
304+ throw new RuntimeException ("Error executing query: " + sql , e );
311305 }
312306 }
313307}
0 commit comments