diff --git a/etc/catalog/jmx.properties b/etc/catalog/jmx.properties new file mode 100644 index 00000000..b6e0372b --- /dev/null +++ b/etc/catalog/jmx.properties @@ -0,0 +1 @@ +connector.name=jmx diff --git a/etc/catalog/memory.properties b/etc/catalog/memory.properties new file mode 100644 index 00000000..833abd3f --- /dev/null +++ b/etc/catalog/memory.properties @@ -0,0 +1 @@ +connector.name=memory diff --git a/etc/catalog/tpcds.properties b/etc/catalog/tpcds.properties new file mode 100644 index 00000000..ba8147db --- /dev/null +++ b/etc/catalog/tpcds.properties @@ -0,0 +1 @@ +connector.name=tpcds diff --git a/etc/catalog/tpch.properties b/etc/catalog/tpch.properties new file mode 100644 index 00000000..599f5ec6 --- /dev/null +++ b/etc/catalog/tpch.properties @@ -0,0 +1,2 @@ +connector.name=tpch +tpch.splits-per-node=4 diff --git a/etc/config-pre-466.properties b/etc/config-pre-466.properties new file mode 100644 index 00000000..e28f2281 --- /dev/null +++ b/etc/config-pre-466.properties @@ -0,0 +1,11 @@ +node.id=coordinator +node.environment=test + +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +query.max-memory=1GB +discovery.uri=http://localhost:8080 + +# Disable http request log +http-server.log.enabled=false diff --git a/etc/config.properties b/etc/config.properties new file mode 100644 index 00000000..10372938 --- /dev/null +++ b/etc/config.properties @@ -0,0 +1,17 @@ +node.id=coordinator +node.environment=test + +coordinator=true +experimental.concurrent-startup=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +query.max-memory=1GB +discovery.uri=http://localhost:8080 + +# spooling protocol settings +protocol.spooling.enabled=true +protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0= +protocol.spooling.retrieval-mode=coordinator_proxy + +# Disable http request log +http-server.log.enabled=false diff --git a/etc/jvm-pre-466.config b/etc/jvm-pre-466.config new file mode 100644 index 00000000..09753c04 --- /dev/null +++ b/etc/jvm-pre-466.config @@ -0,0 +1,16 @@ +-server +-Xmx2G +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+ExitOnOutOfMemoryError +-XX:+HeapDumpOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=150M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache. +-Djdk.nio.maxCachedBufferSize=0 +# Allow loading dynamic agent used by JOL +-XX:+EnableDynamicAgentLoading +-XX:+UnlockDiagnosticVMOptions diff --git a/etc/jvm.config b/etc/jvm.config new file mode 100644 index 00000000..08e3285d --- /dev/null +++ b/etc/jvm.config @@ -0,0 +1,17 @@ +-server +-Xmx2G +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+ExitOnOutOfMemoryError +-XX:+HeapDumpOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=150M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache. +-Djdk.nio.maxCachedBufferSize=0 +# Allow loading dynamic agent used by JOL +-XX:+EnableDynamicAgentLoading +-XX:+UnlockDiagnosticVMOptions +--enable-native-access=ALL-UNNAMED diff --git a/etc/spooling-manager.properties b/etc/spooling-manager.properties new file mode 100644 index 00000000..72d8e396 --- /dev/null +++ b/etc/spooling-manager.properties @@ -0,0 +1,8 @@ +spooling-manager.name=filesystem +fs.s3.enabled=true +fs.location=s3://spooling/ +s3.endpoint=http://localstack:4566/ +s3.region=us-east-1 +s3.aws-access-key=test +s3.aws-secret-key=test +s3.path-style-access=true diff --git a/setup.py b/setup.py index 0a512e6e..b8b83b1d 100755 --- a/setup.py +++ b/setup.py @@ -46,7 +46,9 @@ "pre-commit", "black", "isort", - "keyring" + "keyring", + "testcontainers", + "boto3" ] setup( diff --git a/tests/development_server.py b/tests/development_server.py new file mode 100644 index 00000000..422f4bcd --- /dev/null +++ b/tests/development_server.py @@ -0,0 +1,127 @@ +import os +import time +from contextlib import contextmanager +from pathlib import Path + +from testcontainers.core.container import DockerContainer +from testcontainers.core.network import Network +from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.localstack import LocalStackContainer + +from trino.constants import DEFAULT_PORT + +MINIO_ROOT_USER = "minio-access-key" +MINIO_ROOT_PASSWORD = "minio-secret-key" + +TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest" +TRINO_HOST = "localhost" + + +def create_bucket(s3_client): + bucket_name = "spooling" + try: + print("Checking for bucket existence...") + response = s3_client.list_buckets() + buckets = [bucket["Name"] for bucket in response["Buckets"]] + if bucket_name in buckets: + print("Bucket exists!") + return + except s3_client.exceptions.ClientError as e: + if not e.response['Error']['Code'] == '404': + print("An error occurred:", e) + return + + try: + print("Creating bucket...") + s3_client.create_bucket( + Bucket=bucket_name, + ) + print("Bucket created!") + except s3_client.exceptions.ClientError as e: + print("An error occurred:", e) + + +@contextmanager +def start_development_server(port=None, trino_version=TRINO_VERSION): + network = None + localstack = None + trino = None + + try: + network = Network().create() + supports_spooling_protocol = TRINO_VERSION == "latest" or int(TRINO_VERSION) >= 466 + if supports_spooling_protocol: + localstack = LocalStackContainer(image="localstack/localstack:latest", region_name="us-east-1") \ + .with_name("localstack") \ + .with_network(network) \ + .with_bind_ports(4566, 4566) \ + .with_bind_ports(4571, 4571) \ + .with_env("SERVICES", "s3") + + # Start the container + print("Starting LocalStack container...") + localstack.start() + + # Wait for logs indicating MinIO has started + wait_for_logs(localstack, "Ready.", timeout=30) + + # create spooling bucket + create_bucket(localstack.get_client("s3")) + + trino = DockerContainer(f"trinodb/trino:{trino_version}") \ + .with_name("trino") \ + .with_network(network) \ + .with_env("TRINO_CONFIG_DIR", "/etc/trino") \ + .with_bind_ports(DEFAULT_PORT, port) + + root = Path(__file__).parent.parent + + trino = trino \ + .with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog") + + # Enable spooling config + if supports_spooling_protocol: + trino \ + .with_volume_mapping( + str(root / "etc/spooling-manager.properties"), + "/etc/trino/spooling-manager.properties", "rw") \ + .with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \ + .with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties") + else: + trino \ + .with_volume_mapping(str(root / "etc/jvm-pre-466.config"), "/etc/trino/jvm.config") \ + .with_volume_mapping(str(root / "etc/config-pre-466.properties"), "/etc/trino/config.properties") + + print("Starting Trino container...") + trino.start() + + # Wait for logs indicating the service has started + wait_for_logs(trino, "SERVER STARTED", timeout=60) + + # Otherwise some tests fail with No nodes available + time.sleep(2) + + yield localstack, trino, network + finally: + # Stop containers when exiting the context + if trino: + print("Stopping Trino container...") + trino.stop() + if localstack: + print("Stopping LocalStack container...") + localstack.stop() + if network: + network.remove() + + +def main(): + """Run Trino setup independently from pytest.""" + with start_development_server(port=DEFAULT_PORT): + print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}") + + # Keep the process running so that the containers stay up + input("Press Enter to stop containers...") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 93ecf0fe..3184de6e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -11,34 +11,27 @@ # limitations under the License. import os import socket -import subprocess import sys -import time from contextlib import closing -from uuid import uuid4 import pytest import trino.logging -from trino.client import ClientSession -from trino.client import TrinoQuery -from trino.client import TrinoRequest +from tests.development_server import start_development_server +from tests.development_server import TRINO_HOST +from tests.development_server import TRINO_VERSION from trino.constants import DEFAULT_PORT logger = trino.logging.get_logger(__name__) -TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest" -TRINO_HOST = "127.0.0.1" -TRINO_PORT = 8080 - - -def is_trino_available(): +def is_trino_available(host, port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: sock.settimeout(2) - result = sock.connect_ex((TRINO_HOST, DEFAULT_PORT)) + result = sock.connect_ex((host, port)) if result == 0: return True + return False def get_local_port(): @@ -47,115 +40,21 @@ def get_local_port(): return s.getsockname()[1] -def get_default_trino_image_tag(): - return "trinodb/trino:" + TRINO_VERSION - - -def start_trino(image_tag=None): - if not image_tag: - image_tag = get_default_trino_image_tag() - - container_id = "trino-python-client-tests-" + uuid4().hex[:7] - local_port = get_local_port() - logger.info("starting Docker container") - docker_run = [ - "docker", - "run", - "--rm", - "-p", - "{host_port}:{cont_port}".format(host_port=local_port, cont_port=TRINO_PORT), - "--name", - container_id, - image_tag, - ] - run = subprocess.Popen(docker_run, universal_newlines=True, stderr=subprocess.PIPE) - return (container_id, run, "localhost", local_port) - - -def wait_for_trino_workers(host, port, timeout=180): - request = TrinoRequest( - host=host, - port=port, - client_session=ClientSession( - user="test_fixture" - ) - ) - sql = "SELECT state FROM system.runtime.nodes" - t0 = time.time() - while True: - query = TrinoQuery(request, sql) - rows = list(query.execute()) - if any(row[0] == "active" for row in rows): - break - if time.time() - t0 > timeout: - raise TimeoutError - time.sleep(1) - - -def wait_for_trino_coordinator(stream, timeout=180): - started_tag = "======== SERVER STARTED ========" - t0 = time.time() - for line in iter(stream.readline, b""): - if line: - print(line) - if started_tag in line: - time.sleep(5) - return True - if time.time() - t0 > timeout: - logger.error("coordinator took longer than %s to start", timeout) - raise TimeoutError - return False - - -def start_local_trino_server(image_tag): - container_id, proc, host, port = start_trino(image_tag) - print("trino.server.state starting") - trino_ready = wait_for_trino_coordinator(proc.stderr) - if not trino_ready: - raise Exception("Trino server did not start") - wait_for_trino_workers(host, port) - print("trino.server.state ready") - return container_id, proc, host, port - - -def start_trino_and_wait(image_tag=None): - container_id = None - proc = None - host = os.environ.get("TRINO_RUNNING_HOST", None) - if host: - port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT) - else: - container_id, proc, host, port = start_local_trino_server( - image_tag - ) - - print("trino.server.hostname {}".format(host)) - print("trino.server.port {}".format(port)) - if proc: - print("trino.server.pid {}".format(proc.pid)) - if container_id: - print("trino.server.contained_id {}".format(container_id)) - return container_id, proc, host, port - - -def stop_trino(container_id, proc): - subprocess.check_call(["docker", "kill", container_id]) - - -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def run_trino(): - if is_trino_available(): - yield None, TRINO_HOST, DEFAULT_PORT - return + host = os.environ.get("TRINO_RUNNING_HOST", TRINO_HOST) + port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT) - image_tag = os.environ.get("TRINO_IMAGE") - if not image_tag: - image_tag = get_default_trino_image_tag() + # Is there any local Trino available + if is_trino_available(host, port): + yield host, port + return - container_id, proc, host, port = start_trino_and_wait(image_tag) - yield proc, host, port - if container_id or proc: - stop_trino(container_id, proc) + # Start Trino and MinIO server + print(f"Could not connect to Trino at {host}:{port}, starting server...") + local_port = get_local_port() + with start_development_server(port=local_port): + yield TRINO_HOST, local_port def trino_version() -> int: diff --git a/tests/integration/test_dbapi_integration.py b/tests/integration/test_dbapi_integration.py index d2e46236..01921474 100644 --- a/tests/integration/test_dbapi_integration.py +++ b/tests/integration/test_dbapi_integration.py @@ -40,7 +40,7 @@ @pytest.fixture def trino_connection(run_trino): - _, host, port = run_trino + host, port = run_trino yield trino.dbapi.Connection( host=host, port=port, user="test", source="test", max_attempts=1 @@ -49,7 +49,7 @@ def trino_connection(run_trino): @pytest.fixture def trino_connection_with_transaction(run_trino): - _, host, port = run_trino + host, port = run_trino yield trino.dbapi.Connection( host=host, @@ -63,7 +63,7 @@ def trino_connection_with_transaction(run_trino): @pytest.fixture def trino_connection_in_autocommit(run_trino): - _, host, port = run_trino + host, port = run_trino yield trino.dbapi.Connection( host=host, @@ -269,7 +269,7 @@ def test_legacy_primitive_types_with_connection_and_cursor( cursor_legacy_primitive_types, run_trino ): - _, host, port = run_trino + host, port = run_trino connection = trino.dbapi.Connection( host=host, @@ -1365,7 +1365,7 @@ def test_close_cursor(trino_connection): def test_session_properties(run_trino): - _, host, port = run_trino + host, port = run_trino connection = trino.dbapi.Connection( host=host, @@ -1513,7 +1513,7 @@ def test_client_tags_special_characters(run_trino): def retrieve_client_tags_from_query(run_trino, client_tags): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, @@ -1562,7 +1562,7 @@ def test_use_catalog_schema(trino_connection): @pytest.mark.skipif(trino_version() == 351, reason="current_catalog not supported in older Trino versions") def test_use_schema(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", source="test", catalog="tpch", max_attempts=1 @@ -1589,7 +1589,7 @@ def test_use_schema(run_trino): def test_set_role(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch" @@ -1609,7 +1609,7 @@ def test_set_role(run_trino): def test_set_role_in_connection(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch", roles={"system": "ALL"} @@ -1621,7 +1621,7 @@ def test_set_role_in_connection(run_trino): def test_set_system_role_in_connection(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch", roles="ALL" @@ -1676,7 +1676,7 @@ def test_prepared_statements(legacy_prepared_statements, run_trino): def test_set_timezone_in_connection(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch", timezone="Europe/Brussels" @@ -1688,7 +1688,7 @@ def test_set_timezone_in_connection(run_trino): def test_connection_without_timezone(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch" @@ -1704,7 +1704,7 @@ def test_connection_without_timezone(run_trino): def test_describe(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch", @@ -1720,7 +1720,7 @@ def test_describe(run_trino): def test_describe_table_query(run_trino): - _, host, port = run_trino + host, port = run_trino trino_connection = trino.dbapi.Connection( host=host, port=port, user="test", catalog="tpch", @@ -1810,7 +1810,7 @@ def test_prepared_statement_capability_autodetection(legacy_prepared_statements, trino.dbapi.must_use_legacy_prepared_statements = TimeBoundLRUCache(1024, 3600) user_name = f"user_{t.monotonic_ns()}" - _, host, port = run_trino + host, port = run_trino connection = trino.dbapi.Connection( host=host, port=port, @@ -1830,8 +1830,24 @@ def test_prepared_statement_capability_autodetection(legacy_prepared_statements, assert statements.count("EXECUTE IMMEDIATE 'SELECT 1'") == (1 if legacy_prepared_statements is None else 0) +@pytest.mark.skipif( + trino_version() <= '464', + reason="spooled protocol was introduced in version 464" +) +def test_select_query_spooled_segments(trino_connection): + cur = trino_connection.cursor() + cur.execute("""SELECT l.* + FROM tpch.tiny.lineitem l, TABLE(sequence( + start => 1, + stop => 5, + step => 1)) n""") + rows = cur.fetchall() + # TODO: improve test + assert len(rows) > 0 + + def get_cursor(legacy_prepared_statements, run_trino): - _, host, port = run_trino + host, port = run_trino connection = trino.dbapi.Connection( host=host, diff --git a/tests/integration/test_sqlalchemy_integration.py b/tests/integration/test_sqlalchemy_integration.py index 730e20c0..896d0d91 100644 --- a/tests/integration/test_sqlalchemy_integration.py +++ b/tests/integration/test_sqlalchemy_integration.py @@ -29,7 +29,7 @@ @pytest.fixture def trino_connection(run_trino, request): - _, host, port = run_trino + host, port = run_trino connect_args = {"source": "test", "max_attempts": 1} if trino_version() <= 417: connect_args["legacy_prepared_statements"] = True diff --git a/tests/integration/test_types_integration.py b/tests/integration/test_types_integration.py index d38d4770..4e595c78 100644 --- a/tests/integration/test_types_integration.py +++ b/tests/integration/test_types_integration.py @@ -19,7 +19,7 @@ @pytest.fixture def trino_connection(run_trino): - _, host, port = run_trino + host, port = run_trino yield trino.dbapi.Connection( host=host, port=port, user="test", source="test", max_attempts=1