Skip to content

Commit

Permalink
Add development server
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Dec 23, 2024
1 parent c517055 commit e1dabdd
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 138 deletions.
1 change: 1 addition & 0 deletions etc/catalog/jmx.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=jmx
1 change: 1 addition & 0 deletions etc/catalog/memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=memory
1 change: 1 addition & 0 deletions etc/catalog/tpcds.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpcds
2 changes: 2 additions & 0 deletions etc/catalog/tpch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=tpch
tpch.splits-per-node=4
11 changes: 11 additions & 0 deletions etc/config-pre-466.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
node.id=coordinator
node.environment=test

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# Disable http request log
http-server.log.enabled=false
17 changes: 17 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
node.id=coordinator
node.environment=test

coordinator=true
experimental.concurrent-startup=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# spooling protocol settings
protocol.spooling.enabled=true
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
protocol.spooling.retrieval-mode=coordinator_proxy

# Disable http request log
http-server.log.enabled=false
16 changes: 16 additions & 0 deletions etc/jvm-pre-466.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
17 changes: 17 additions & 0 deletions etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
--enable-native-access=ALL-UNNAMED
8 changes: 8 additions & 0 deletions etc/spooling-manager.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
spooling-manager.name=filesystem
fs.s3.enabled=true
fs.location=s3://spooling/
s3.endpoint=http://localstack:4566/
s3.region=us-east-1
s3.aws-access-key=test
s3.aws-secret-key=test
s3.path-style-access=true
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
"pre-commit",
"black",
"isort",
"keyring"
"keyring",
"testcontainers",
"boto3"
]

setup(
Expand Down
127 changes: 127 additions & 0 deletions tests/development_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import time
from contextlib import contextmanager
from pathlib import Path

from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.localstack import LocalStackContainer

from trino.constants import DEFAULT_PORT

MINIO_ROOT_USER = "minio-access-key"
MINIO_ROOT_PASSWORD = "minio-secret-key"

TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "localhost"


def create_bucket(s3_client):
bucket_name = "spooling"
try:
print("Checking for bucket existence...")
response = s3_client.list_buckets()
buckets = [bucket["Name"] for bucket in response["Buckets"]]
if bucket_name in buckets:
print("Bucket exists!")
return
except s3_client.exceptions.ClientError as e:
if not e.response['Error']['Code'] == '404':
print("An error occurred:", e)
return

try:
print("Creating bucket...")
s3_client.create_bucket(
Bucket=bucket_name,
)
print("Bucket created!")
except s3_client.exceptions.ClientError as e:
print("An error occurred:", e)


@contextmanager
def start_development_server(port=None, trino_version=TRINO_VERSION):
network = None
localstack = None
trino = None

try:
network = Network().create()
supports_spooling_protocol = TRINO_VERSION == "latest" or int(TRINO_VERSION) >= 466
if supports_spooling_protocol:
localstack = LocalStackContainer(image="localstack/localstack:latest", region_name="us-east-1") \
.with_name("localstack") \
.with_network(network) \
.with_bind_ports(4566, 4566) \
.with_bind_ports(4571, 4571) \
.with_env("SERVICES", "s3")

# Start the container
print("Starting LocalStack container...")
localstack.start()

# Wait for logs indicating MinIO has started
wait_for_logs(localstack, "Ready.", timeout=30)

# create spooling bucket
create_bucket(localstack.get_client("s3"))

trino = DockerContainer(f"trinodb/trino:{trino_version}") \
.with_name("trino") \
.with_network(network) \
.with_env("TRINO_CONFIG_DIR", "/etc/trino") \
.with_bind_ports(DEFAULT_PORT, port)

root = Path(__file__).parent.parent

trino = trino \
.with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog")

# Enable spooling config
if supports_spooling_protocol:
trino \
.with_volume_mapping(
str(root / "etc/spooling-manager.properties"),
"/etc/trino/spooling-manager.properties", "rw") \
.with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties")
else:
trino \
.with_volume_mapping(str(root / "etc/jvm-pre-466.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config-pre-466.properties"), "/etc/trino/config.properties")

print("Starting Trino container...")
trino.start()

# Wait for logs indicating the service has started
wait_for_logs(trino, "SERVER STARTED", timeout=60)

# Otherwise some tests fail with No nodes available
time.sleep(2)

yield localstack, trino, network
finally:
# Stop containers when exiting the context
if trino:
print("Stopping Trino container...")
trino.stop()
if localstack:
print("Stopping LocalStack container...")
localstack.stop()
if network:
network.remove()


def main():
"""Run Trino setup independently from pytest."""
with start_development_server(port=DEFAULT_PORT):
print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}")

# Keep the process running so that the containers stay up
input("Press Enter to stop containers...")


if __name__ == "__main__":
main()
137 changes: 18 additions & 119 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,27 @@
# limitations under the License.
import os
import socket
import subprocess
import sys
import time
from contextlib import closing
from uuid import uuid4

import pytest

import trino.logging
from trino.client import ClientSession
from trino.client import TrinoQuery
from trino.client import TrinoRequest
from tests.development_server import start_development_server
from tests.development_server import TRINO_HOST
from tests.development_server import TRINO_VERSION
from trino.constants import DEFAULT_PORT

logger = trino.logging.get_logger(__name__)


TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "127.0.0.1"
TRINO_PORT = 8080


def is_trino_available():
def is_trino_available(host, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((TRINO_HOST, DEFAULT_PORT))
result = sock.connect_ex((host, port))
if result == 0:
return True
return False


def get_local_port():
Expand All @@ -47,115 +40,21 @@ def get_local_port():
return s.getsockname()[1]


def get_default_trino_image_tag():
return "trinodb/trino:" + TRINO_VERSION


def start_trino(image_tag=None):
if not image_tag:
image_tag = get_default_trino_image_tag()

container_id = "trino-python-client-tests-" + uuid4().hex[:7]
local_port = get_local_port()
logger.info("starting Docker container")
docker_run = [
"docker",
"run",
"--rm",
"-p",
"{host_port}:{cont_port}".format(host_port=local_port, cont_port=TRINO_PORT),
"--name",
container_id,
image_tag,
]
run = subprocess.Popen(docker_run, universal_newlines=True, stderr=subprocess.PIPE)
return (container_id, run, "localhost", local_port)


def wait_for_trino_workers(host, port, timeout=180):
request = TrinoRequest(
host=host,
port=port,
client_session=ClientSession(
user="test_fixture"
)
)
sql = "SELECT state FROM system.runtime.nodes"
t0 = time.time()
while True:
query = TrinoQuery(request, sql)
rows = list(query.execute())
if any(row[0] == "active" for row in rows):
break
if time.time() - t0 > timeout:
raise TimeoutError
time.sleep(1)


def wait_for_trino_coordinator(stream, timeout=180):
started_tag = "======== SERVER STARTED ========"
t0 = time.time()
for line in iter(stream.readline, b""):
if line:
print(line)
if started_tag in line:
time.sleep(5)
return True
if time.time() - t0 > timeout:
logger.error("coordinator took longer than %s to start", timeout)
raise TimeoutError
return False


def start_local_trino_server(image_tag):
container_id, proc, host, port = start_trino(image_tag)
print("trino.server.state starting")
trino_ready = wait_for_trino_coordinator(proc.stderr)
if not trino_ready:
raise Exception("Trino server did not start")
wait_for_trino_workers(host, port)
print("trino.server.state ready")
return container_id, proc, host, port


def start_trino_and_wait(image_tag=None):
container_id = None
proc = None
host = os.environ.get("TRINO_RUNNING_HOST", None)
if host:
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)
else:
container_id, proc, host, port = start_local_trino_server(
image_tag
)

print("trino.server.hostname {}".format(host))
print("trino.server.port {}".format(port))
if proc:
print("trino.server.pid {}".format(proc.pid))
if container_id:
print("trino.server.contained_id {}".format(container_id))
return container_id, proc, host, port


def stop_trino(container_id, proc):
subprocess.check_call(["docker", "kill", container_id])


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def run_trino():
if is_trino_available():
yield None, TRINO_HOST, DEFAULT_PORT
return
host = os.environ.get("TRINO_RUNNING_HOST", TRINO_HOST)
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)

image_tag = os.environ.get("TRINO_IMAGE")
if not image_tag:
image_tag = get_default_trino_image_tag()
# Is there any local Trino available
if is_trino_available(host, port):
yield host, port
return

container_id, proc, host, port = start_trino_and_wait(image_tag)
yield proc, host, port
if container_id or proc:
stop_trino(container_id, proc)
# Start Trino and MinIO server
print(f"Could not connect to Trino at {host}:{port}, starting server...")
local_port = get_local_port()
with start_development_server(port=local_port):
yield TRINO_HOST, local_port


def trino_version() -> int:
Expand Down
Loading

0 comments on commit e1dabdd

Please sign in to comment.