Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from osbenchmark.workload_generator import workload_generator
from osbenchmark.utils import io, convert, process, console, net, opts, versions
from osbenchmark import aggregator
from osbenchmark.database.registry import DatabaseType

def create_arg_parser():
def positive_number(v):
Expand Down Expand Up @@ -598,6 +599,13 @@ def add_workload_source(subparser):
help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch "
f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS)
test_run_parser.add_argument(
"--database-type",
help="Target database backend. Selects the DatabaseClient adapter used to run "
"the workload (default: opensearch). Choices are populated from the "
"registered DatabaseType enum.",
choices=[d.value for d in DatabaseType],
default=DatabaseType.OPENSEARCH.value)
test_run_parser.add_argument("--on-error",
choices=["continue", "abort"],
help="Controls how OSB behaves on response errors (default: continue).",
Expand Down Expand Up @@ -1079,6 +1087,11 @@ def configure_connection_params(arg_parser, args, cfg):
# Configure gRPC target hosts
grpc_target_hosts = opts.TargetHosts(args.grpc_target_hosts) if hasattr(args, "grpc_target_hosts") and args.grpc_target_hosts else None
cfg.add(config.Scope.applicationOverride, "client", "grpc_hosts", grpc_target_hosts)

# Configure database backend; worker_coordinator reads cfg.opts("database", "type")
# to pick the DatabaseClient factory via the database/ registry.
database_type = getattr(args, "database_type", "opensearch")
cfg.add(config.Scope.applicationOverride, "database", "type", database_type)
if "timeout" not in client_options.default:
console.info("You did not provide an explicit timeout in the client options. Assuming default of 10 seconds.")
if list(target_hosts.all_hosts) != list(client_options.all_client_options):
Expand Down
8 changes: 7 additions & 1 deletion osbenchmark/test_run_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ def setup(self, sources=False):
# but there are rare cases (external pipeline and user did not specify the distribution version) where we need
# to derive it ourselves. For source builds we always assume "master"
oss_distribution_version = "2.11.0"
if not sources and not self.cfg.exists("builder", "distribution.version"):
# Distribution-version auto-detection probes the target with a raw opensearchpy
# client and the legacy wait_for_rest_layer. For non-OpenSearch backends the
# OS-flavored health/info routes either don't exist or differ enough to make
# this probe meaningless. Skip the whole branch for non-OS databases; the
# OS-min-version check below is also OS-specific.
database_type = self.cfg.opts("database", "type", default_value="opensearch", mandatory=False)
if not sources and not self.cfg.exists("builder", "distribution.version") and database_type.lower() == "opensearch":
distribution_version = builder.cluster_distribution_version(self.cfg)
if distribution_version == 'oss':
self.logger.info("Automatically derived serverless collection, setting distribution version to 2.11.0")
Expand Down
26 changes: 25 additions & 1 deletion osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,13 +975,23 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor

def create_os_clients(self):
all_hosts = self.config.opts("client", "hosts").all_hosts
database_type = self.config.opts("database", "type", default_value="opensearch", mandatory=False)
opensearch = {}
for cluster_name, cluster_hosts in all_hosts.items():
all_client_options = self.config.opts("client", "options").all_client_options
cluster_client_options = dict(all_client_options[cluster_name])
# Use retries to avoid aborts on long living connections for telemetry devices
cluster_client_options["retry-on-timeout"] = True
opensearch[cluster_name] = self.os_client_factory(cluster_hosts, cluster_client_options).create()
if database_type.lower() == "opensearch":
opensearch[cluster_name] = self.os_client_factory(cluster_hosts, cluster_client_options).create()
else:
# Non-OpenSearch backends route through the registry so the right
# url_prefix / transport configuration is applied. This path
# mirrors the async create path at WorkerCoordinator.os_clients.
db_factory = DatabaseClientFactory.create_client_factory(
database_type, cluster_hosts, cluster_client_options,
)
opensearch[cluster_name] = db_factory.create()
return opensearch

def prepare_telemetry(self, opensearch, enable):
Expand Down Expand Up @@ -1021,6 +1031,20 @@ def prepare_telemetry(self, opensearch, enable):

def wait_for_rest_api(self, opensearch):
os_default = opensearch["default"]
database_type = self.config.opts("database", "type", default_value="opensearch", mandatory=False)
# The legacy wait_for_rest_layer probes /_cluster/health then falls back
# to /_cat/indices — both OS-API-shape-specific. For non-OS backends,
# delegate to a factory-provided probe.
if database_type.lower() != "opensearch":
hosts = self.config.opts("client", "hosts").all_hosts["default"]
client_options = dict(self.config.opts("client", "options").all_client_options["default"])
db_factory = DatabaseClientFactory.create_client_factory(database_type, hosts, client_options)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually one NIT i have is that DatabaseClientFactory seems to get instantiated twice, once here and once up above on line 991, maybe we can just clean that up?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rad - thanks @OVI3D0

Good catch - i'll do that real quick.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OVI3D0 Any interest in a Quickwit implementation? I can clean it up a bit and submit with a new issue if so.

Caveat: there is a lot of surface area that doesnt correlate, so we would need to plan out a consistent way of handling this. That said, might be a good forcing function for doing this right before other impls start to arrive?

Copy link
Copy Markdown
Member

@OVI3D0 OVI3D0 May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any interest in a Quickwit implementation?

yeah for sure @zznate ! the hope with this project was to have OSB branch out from just being a benchmarking tool for OpenSearch and have it be used for any compatible search engine :) feel free to open up an issue or even contribute the changes and we can get those in

we would need to plan out a consistent way of handling this

There was a larger plan for this written out here: #998

Here you can read about the proposed architecture for future search engines we had in mind

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perferct - thanks! I'll give that a read. Missed that in my initial pass.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OVI3D0 - do i need to do anything else to get this merged? I see it's blocked still, please let me know if you need anything from my end.

Copy link
Copy Markdown
Member

@OVI3D0 OVI3D0 May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zznate unfortunately code changes across the org are locked down until this is resolved: https://www.stepsecurity.io/blog/mini-shai-hulud-is-back-a-self-spreading-supply-chain-attack-hits-the-npm-ecosystem

:p

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh gross! Understood tho - thanks!

self.logger.info("Checking if non-OS REST layer is available (database_type=%s).", database_type)
if hasattr(db_factory, "wait_for_rest_layer") and db_factory.wait_for_rest_layer(max_attempts=40):
self.logger.info("REST layer is available.")
return
self.logger.error("Non-OS REST layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError(f"{database_type} REST layer is not available.")
self.logger.info("Checking if REST API is available.")
if client.wait_for_rest_layer(os_default, max_attempts=40):
self.logger.info("REST API is available.")
Expand Down