Skip to content

Add 'fire and forget' mode to OSB#964

Open
OVI3D0 wants to merge 9 commits into
opensearch-project:mainfrom
OVI3D0:fire-and-forget
Open

Add 'fire and forget' mode to OSB#964
OVI3D0 wants to merge 9 commits into
opensearch-project:mainfrom
OVI3D0:fire-and-forget

Conversation

@OVI3D0
Copy link
Copy Markdown
Member

@OVI3D0 OVI3D0 commented Sep 24, 2025

Description

Adds new 'fire and forget' mode to OSB.

Rather than OSB's traditional client model of 'fire request -> await request -> record metrics -> move on', this mode allows each client to fire requests without needing to worry about awaiting responses or recording metrics. This allows OSB to easily sustain high throughput values when load testing cluster, and is intended for those who don't necessarily want precise measurements on each request, but would rather test their clusters against very high sustained throughput levels.

The drawback here is of course that there is no information returned to the user on how their cluster is performing aside from outside forms of polling/measurements, like the performance charts than can be viewed in the AWS console for their cluster.

The PR introduces a new flag, --fire-and-forget, which tells OSB to choose the DeterministicScheduler. Unlike the UnitAwareScheduler used for most benchmarks, this scheduler doesn't care about request metadata, it only calculates throughput for each client and tells them to send requests at a certain rate.

The flag also tells OSB to make use of a new request executor, called the UnhingedExecutor. This executor, unlike the AsyncExecutor, creates separate asynchronous tasks to send requests without awaiting them, ensuring requests are sent at the specified rate no matter the latency or failure rates. To sum it up, if each executor needs to send 2 RPS, this mode tells them to create 2 async tasks per second, each of which will send the request, rather than trying to send 1 request every 0.5 seconds on its own.

This mode can consume resources very quickly, and users should ensure their hardware is able to handle the thousands of async processes that are created when using this flag. It's also likely they will run into an OSerror too many open files with all of the connections being established.

The max number of network sockets can be checked with ulimit -n and changed with the same command, like: ulimit -n 2048

Issues Resolved

#958

Testing

  • New functionality includes testing

New unit tests + running tests in 'fire and forget' mode against OS cluster at 500 TPS:

Screenshot 2025-09-24 at 11 48 18 AM Screenshot 2025-09-24 at 11 47 55 AM

Unit tests produce this warning since we don't await the async tasks:

sys:1: RuntimeWarning: coroutine 'UnhingedExecutor._fire_and_forget_request.<locals>.fire_and_forget_runner' was never awaited
Coroutine created at (most recent call last)
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/unittest/case.py", line 633, in _callTestMethod
    method()
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/unittest/mock.py", line 1325, in patched
    return func(*newargs, **newkeywargs)
  File "/Users/mikeovi/workplace/opensearch-benchmark/tests/__init__.py", line 35, in async_wrapper
    asyncio.run(t(*args, **kwargs), debug=True)
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/asyncio/base_events.py", line 1851, in _run_once
    handle._run()
  File "/Users/mikeovi/.pyenv/versions/3.8.12/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/mikeovi/workplace/opensearch-benchmark/tests/worker_coordinator/worker_coordinator_test.py", line 2454, in test_fire_and_forget_request_no_throttling_needed
    await executor._fire_and_forget_request({}, 1.0, 0.0)  # expected_scheduled_time = 1.0
  File "/Users/mikeovi/workplace/opensearch-benchmark/osbenchmark/worker_coordinator/worker_coordinator.py", line 2603, in _fire_and_forget_request
    task = asyncio.create_task(fire_and_forget_runner())

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Comment thread osbenchmark/benchmark.py Outdated
default=None
)
test_run_parser.add_argument(
"--fire-and-forget",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we come up with some other name, like --no-await or --sustain etc?

self.complete.set()
await self._cleanup()

class UnhingedExecutor:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Same here, can we change this to AsyncNoAwaitExecutor or something similar to maintain naming convention?

self.logger.info("Client id [%s] is running now.", self.client_id)


async def _fire_and_forget_request(self, params: dict, expected_scheduled_time: float, total_start: float) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here for the naming.

@rishabh6788
Copy link
Copy Markdown
Collaborator

LGTM apart from some naming conventions. Can you run test in timed mode, may be for 15-30 mins and share some more results, good idea to test it with ramp up mode as well.

@OVI3D0
Copy link
Copy Markdown
Member Author

OVI3D0 commented Oct 15, 2025

LGTM apart from some naming conventions. Can you run test in timed mode, may be for 15-30 mins and share some more results, good idea to test it with ramp up mode as well.

Here's a screenshot after running combined with the ramp-up test procedure property:
Screenshot 2025-10-15 at 3 25 00 PM

I used this test procedure:

{
  "name": "ramp-up-test-procedure",
  "schedule": [
    {
       "operation": "range",
       "warmup-time-period": {{ warmup_time | default(900) | tojson }},
       "ramp-up-time-period": {{ ramp_up_time | default(600) | tojson }},
       "time-period": {{ time_period | default(1500) | tojson }},
       "target-throughput": {{ target_throughput | default(2000) | tojson }},
       "clients": {{ search_clients | default(2000) }}
    }
  ]
}

@OVI3D0 OVI3D0 requested a review from rishabh6788 October 20, 2025 17:37
@OVI3D0 OVI3D0 force-pushed the fire-and-forget branch from ad68977 to d2bbb27 Compare April 8, 2026 17:28
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d7eb567.

PathLineSeverityDescription
osbenchmark/worker_coordinator/worker_coordinator.py2656mediumDuplicate bare `except Exception` blocks in `fire_and_forget_runner` — one unreachable after the first — silently suppresses all errors with no logging. While this is labeled 'fire-and-forget', the unreachable second handler suggests copy-paste confusion that could mask unexpected behavior, and the silent suppression of all exceptions makes it impossible to detect if the runner is being misused or exploited.
osbenchmark/worker_coordinator/worker_coordinator.py2739mediumReference to `self._inflight_tasks` in the `finally` block of `AsyncNoAwaitExecutor.__call__` but this attribute is never initialized in `__init__`. This will raise an `AttributeError` at runtime during cleanup, potentially preventing proper drain of in-flight tasks and leaving open connections. While likely a bug rather than malicious intent, the inconsistency between the documented behavior and the implementation warrants review.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 2 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

PR Reviewer Guide 🔍

(Review updated until commit e43a7fa)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add UnhingedScheduler for fire-and-forget mode

Relevant files:

  • osbenchmark/worker_coordinator/scheduler.py

Sub-PR theme: Add AsyncNoAwaitExecutor and wire fire-and-forget CLI flag

Relevant files:

  • osbenchmark/benchmark.py
  • osbenchmark/worker_coordinator/worker_coordinator.py
  • osbenchmark/worker_coordinator/runner.py
  • tests/worker_coordinator/worker_coordinator_test.py

⚡ Recommended focus areas for review

Missing Attribute

AsyncNoAwaitExecutor references self._inflight_tasks in the finally block of __call__, but this attribute is never initialized in __init__. This will raise an AttributeError at runtime when the executor finishes, preventing proper cleanup of in-flight tasks.

if self._inflight_tasks:
    self.logger.info(
        "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
        self.client_id, len(self._inflight_tasks)
    )
    await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
Config Key Mismatch

The CLI flag is registered as --no-await and stored in config as no_await (line 1150), but it is read back as fire_and_forget (line 2245). This mismatch means the fire-and-forget mode will never be activated via the CLI flag.

fire_and_forget = self.cfg.opts("worker_coordinator", "fire_and_forget", mandatory=False, default_value=False)
schedule = schedule_for(task_allocation, params_per_task[task], fire_and_forget)
Task Mutation

In schedule_for, the code mutates task.schedule directly (task.schedule = "deterministic") when fire-and-forget mode is enabled. This side-effect on a shared task object could cause unexpected behavior if the task is reused across multiple clients or invocations.

if fire_and_forget and not task.schedule:
    task.schedule = "deterministic"
Division by Zero

UnhingedScheduler.__init__ computes self.wait_time = 1 / target_throughput without guarding against target_throughput being zero or None, which would raise a ZeroDivisionError or TypeError at runtime.

self.wait_time = 1 / target_throughput
Silent Error Swallowing

In _fire_and_forget_query, all exceptions are silently swallowed with a bare except Exception: pass. This means network errors, authentication failures, and other critical issues will go completely unnoticed, making it impossible to diagnose problems during a benchmark run.

if pit_op is None:
    await opensearch.delete_point_in_time(body=None, all=True, params=request_params, headers=None)
else:

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

PR Code Suggestions ✨

Latest suggestions up to e43a7fa

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Initialize and populate in-flight task tracking list

self._inflight_tasks is referenced in the finally block but is never initialized or
populated anywhere in AsyncNoAwaitExecutor. Tasks created via asyncio.create_task in
_async_no_await_request are not tracked, so this will raise an AttributeError at
runtime. You need to initialize self._inflight_tasks = [] in init and append
each created task to it.

osbenchmark/worker_coordinator/worker_coordinator.py [2718-2723]

+# In __init__, add:
+self._inflight_tasks = []
+
+# In _async_no_await_request, after creating the task:
+task = asyncio.create_task(fire_and_forget_runner())
+self._inflight_tasks.append(task)
+
+# The finally block then works correctly:
 if self._inflight_tasks:
     self.logger.info(
         "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
         self.client_id, len(self._inflight_tasks)
     )
     await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
Suggestion importance[1-10]: 9

__

Why: self._inflight_tasks is referenced in the finally block but never initialized in __init__ or populated in _async_no_await_request. This will cause an AttributeError at runtime, making the drain logic completely broken. This is a critical bug.

High
Fix mismatched config key for fire-and-forget flag

The config key stored in configure_test is "no_await" (under section
"worker_coordinator"), but here it is read as "fire_and_forget". This mismatch means
the flag will never be True and the AsyncNoAwaitExecutor will never be used. The key
should be "no_await" to match what was stored.

osbenchmark/worker_coordinator/worker_coordinator.py [2245]

-fire_and_forget = self.cfg.opts("worker_coordinator", "fire_and_forget", mandatory=False, default_value=False)
+fire_and_forget = self.cfg.opts("worker_coordinator", "no_await", mandatory=False, default_value=False)
Suggestion importance[1-10]: 9

__

Why: The config key is stored as "no_await" in configure_test but read as "fire_and_forget" here, meaning AsyncNoAwaitExecutor will never be selected. This is a critical functional bug that makes the entire feature non-operational.

High
Guard against zero or None target throughput

If target_throughput is 0 or None, this will raise a ZeroDivisionError or TypeError
at instantiation. A guard should be added to handle these cases, either by raising a
descriptive error or falling back to a default wait time.

osbenchmark/worker_coordinator/scheduler.py [237-239]

 def __init__(self, task, target_throughput):
     super().__init__()
+    if not target_throughput:
+        raise exceptions.SystemSetupError(
+            "UnhingedScheduler requires a positive target_throughput, but got: %s" % target_throughput
+        )
     self.wait_time = 1 / target_throughput
Suggestion importance[1-10]: 5

__

Why: A ZeroDivisionError or TypeError would occur if target_throughput is 0 or None. Adding a guard improves robustness, though this is a moderate issue since callers should typically provide valid values.

Low
General
Avoid incorrect context manager and callable dual usage

Using async with self.runner as a context manager and then calling await
self.runner(...) inside the same block will invoke aenter/aexit on the
runner object. If the runner is not designed to be used as both a context manager
and a callable in this nested fashion, it may cause double-initialization or
resource issues. The runner should be called directly without the context manager,
or the context manager usage should be verified against the runner's implementation.

osbenchmark/worker_coordinator/runner.py [2655-2656]

-async with self.runner:
+try:
     await self.runner(self.opensearch, params)
+except Exception:
+    pass
Suggestion importance[1-10]: 4

__

Why: The async with self.runner pattern combined with await self.runner(...) may cause issues depending on the runner's implementation, but this pattern is also used in the regular AsyncExecutor, suggesting it may be intentional. The suggestion's improved_code removes the context manager entirely which could break resource management.

Low

Previous suggestions

Suggestions up to commit d7eb567
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove duplicate except block causing syntax error

There are two consecutive except Exception blocks inside fire_and_forget_runner,
which is a syntax error in Python — only one except clause can follow a try block at
the same level. The second except block will cause a SyntaxError at runtime and
prevent the module from loading. Remove the duplicate except Exception block.

osbenchmark/worker_coordinator/worker_coordinator.py [2649-2666]

 async def fire_and_forget_runner():
     try:
-
         # Initialize request context for the fire-and-forget task
         _, token = RequestContextHolder.init_request_context()
         try:
             async with self.runner:
                 await self.runner(self.opensearch, params)
         finally:
             # Clean up the context
             RequestContextHolder.restore_context(token)
     except Exception:
         # Silently swallow errors in fire-and-forget mode
         pass
 
-    except Exception:
-        # Log errors for debugging
-        pass
-
Suggestion importance[1-10]: 9

__

Why: There are two consecutive except Exception blocks at the same level inside fire_and_forget_runner, which is a Python syntax error that would prevent the module from loading entirely. This is a critical bug that must be fixed.

High
Initialize missing inflight tasks collection

self._inflight_tasks is referenced in the finally block but is never initialized or
populated anywhere in AsyncNoAwaitExecutor. This will raise an AttributeError at
runtime whenever the executor exits. Either initialize it in init and populate
it when tasks are created, or remove the drain logic if tasks are truly
fire-and-forget.

osbenchmark/worker_coordinator/worker_coordinator.py [2722-2727]

+# In __init__, add:
+# self._inflight_tasks = []
+
+# In _async_no_await_request, after creating the task:
+# self._inflight_tasks.append(task)
+# task.add_done_callback(lambda t: self._inflight_tasks.remove(t) if t in self._inflight_tasks else None)
+
 if self._inflight_tasks:
     self.logger.info(
         "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
         self.client_id, len(self._inflight_tasks)
     )
     await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
Suggestion importance[1-10]: 9

__

Why: self._inflight_tasks is referenced in the finally block but never initialized in __init__ or populated anywhere in the class, which will cause an AttributeError at runtime on every executor exit. This is a critical bug.

High
Fix mismatched config key for fire-and-forget flag

The config key stored in configure_test is "no_await" (under section
"worker_coordinator"), but here it is read as "fire_and_forget". This mismatch means
the flag will never be True and the AsyncNoAwaitExecutor will never be selected. The
key should match what was stored.

osbenchmark/worker_coordinator/worker_coordinator.py [2245]

-fire_and_forget = self.cfg.opts("worker_coordinator", "fire_and_forget", mandatory=False, default_value=False)
+fire_and_forget = self.cfg.opts("worker_coordinator", "no_await", mandatory=False, default_value=False)
Suggestion importance[1-10]: 9

__

Why: The config key is stored as "no_await" in configure_test but read as "fire_and_forget" here, meaning the AsyncNoAwaitExecutor will never be selected regardless of the CLI flag. This is a functional bug that completely breaks the feature.

High
General
Guard against zero or invalid throughput value

If target_throughput is 0 or None, this will raise a ZeroDivisionError or TypeError.
Since UnhingedScheduler is designed for maximum throughput scenarios, a guard should
be added to handle the case where target_throughput is not a positive number.

osbenchmark/worker_coordinator/scheduler.py [237-239]

 def __init__(self, task, target_throughput):
     super().__init__()
+    if not target_throughput or target_throughput <= 0:
+        raise exceptions.SystemSetupError(
+            f"UnhingedScheduler requires a positive target_throughput, got: {target_throughput}"
+        )
     self.wait_time = 1 / target_throughput
Suggestion importance[1-10]: 5

__

Why: A target_throughput of 0 or None would cause a ZeroDivisionError or TypeError in UnhingedScheduler.__init__. Adding a guard improves robustness, though this is a moderate-impact defensive improvement.

Low
Suggestions up to commit 8a09702
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove duplicate except block causing syntax error

There are two consecutive except Exception blocks in fire_and_forget_runner, which
is a syntax error in Python — only the first except clause will be reachable and the
second will cause a SyntaxError. Remove the duplicate except block.

osbenchmark/worker_coordinator/worker_coordinator.py [2649-2666]

 async def fire_and_forget_runner():
     try:
-
         # Initialize request context for the fire-and-forget task
         _, token = RequestContextHolder.init_request_context()
         try:
             async with self.runner:
                 await self.runner(self.opensearch, params)
         finally:
             # Clean up the context
             RequestContextHolder.restore_context(token)
     except Exception:
         # Silently swallow errors in fire-and-forget mode
         pass
 
-    except Exception:
-        # Log errors for debugging
-        pass
-
Suggestion importance[1-10]: 9

__

Why: Having two consecutive except Exception blocks inside the same try statement is a Python SyntaxError that would prevent the module from loading at all. The duplicate except block must be removed.

High
Initialize and populate missing inflight tasks collection

self._inflight_tasks is never defined or populated anywhere in AsyncNoAwaitExecutor.
Tasks are created with asyncio.create_task() but never added to any collection, so
this finally block will always raise an AttributeError. Either initialize
self._inflight_tasks = [] in init and append each created task to it, or use
asyncio.all_tasks() to gather pending tasks.

osbenchmark/worker_coordinator/worker_coordinator.py [2722-2727]

+# In __init__, add:
+# self._inflight_tasks = []
+
+# In _async_no_await_request, after creating the task:
+# self._inflight_tasks.append(task)
+
 finally:
     # Drain any in-flight fire-and-forget tasks before exiting.
     if self._inflight_tasks:
         self.logger.info(
             "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
             self.client_id, len(self._inflight_tasks)
         )
         await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
Suggestion importance[1-10]: 9

__

Why: self._inflight_tasks is referenced in the finally block but never initialized in __init__ or populated anywhere, which will cause an AttributeError at runtime and prevent proper draining of in-flight tasks.

High
Fix mismatched config key for fire-and-forget flag

The --no-await CLI flag is stored in config as "no_await" (via cfg.add(...,
"worker_coordinator", "no_await", args.no_await)), but here it is read as
"fire_and_forget". This mismatch means the fire-and-forget mode will never be
activated from the CLI flag. Use the correct config key "no_await".

osbenchmark/worker_coordinator/worker_coordinator.py [2245-2246]

-fire_and_forget = self.cfg.opts("worker_coordinator", "fire_and_forget", mandatory=False, default_value=False)
+fire_and_forget = self.cfg.opts("worker_coordinator", "no_await", mandatory=False, default_value=False)
 schedule = schedule_for(task_allocation, params_per_task[task], fire_and_forget)
Suggestion importance[1-10]: 9

__

Why: The CLI flag --no-await is stored under config key "no_await" in configure_test, but the executor reads it as "fire_and_forget", meaning the feature can never be activated via the CLI. This is a functional bug that completely breaks the intended feature.

High
Pass required arguments to parent scheduler constructor

UnhingedScheduler.init calls super().init() without passing task and
target_throughput, but SimpleScheduler.init likely requires these arguments.
This will cause a TypeError at runtime. Pass the required arguments to
super().init().

osbenchmark/worker_coordinator/scheduler.py [237-239]

 def __init__(self, task, target_throughput):
-    super().__init__()
+    super().__init__(task, target_throughput)
     self.wait_time = 1 / target_throughput
Suggestion importance[1-10]: 6

__

Why: UnhingedScheduler.__init__ calls super().__init__() without arguments, but without seeing SimpleScheduler.__init__'s signature it's unclear if this is truly a TypeError. The suggestion is plausible but depends on the parent class implementation not shown in the diff.

Low
Suggestions up to commit 1693a48
CategorySuggestion                                                                                                                                    Impact
Possible issue
Safely consume task exceptions in callback

Calling t.exception() on a task that raised an exception will re-raise it if the
task was cancelled, and silently consuming exceptions this way can hide real errors.
The t.exception() call itself raises CancelledError if the task was cancelled, which
contradicts the guard. Use a try/except block to safely consume the exception.

osbenchmark/worker_coordinator/worker_coordinator.py [2666-2670]

 def handle_task_completion(t):
     self._inflight_tasks.discard(t)
     if not t.cancelled():
-        t.exception()  # Consume exception to prevent logging
+        try:
+            t.exception()  # Consume exception to prevent "Task exception was never retrieved" warning
+        except Exception:
+            pass
Suggestion importance[1-10]: 7

__

Why: The t.exception() call can raise CancelledError if the task was cancelled despite the guard, and can also raise InvalidStateError in edge cases. Wrapping it in a try/except is a valid defensive improvement to prevent unexpected exceptions in the callback.

Medium
Prevent set mutation during gather iteration

The _inflight_tasks set is being mutated by handle_task_completion callbacks (via
discard) while asyncio.gather is iterating over it. This can cause a RuntimeError:
Set changed size during iteration. A snapshot of the set should be taken before
gathering.

osbenchmark/worker_coordinator/worker_coordinator.py [2718-2723]

 if self._inflight_tasks:
     self.logger.info(
         "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
         self.client_id, len(self._inflight_tasks)
     )
-    await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
+    await asyncio.gather(*list(self._inflight_tasks), return_exceptions=True)
Suggestion importance[1-10]: 6

__

Why: While asyncio.gather(*self._inflight_tasks) unpacks the set at call time (not lazily), the done callbacks could theoretically fire and mutate _inflight_tasks during the unpacking. Using list(self._inflight_tasks) is a safer and clearer approach, though the practical risk is low in CPython's single-threaded event loop.

Low
Eliminate shared-state race by passing runner directly

Assigning self.runner = runner in the main loop and then snapshotting it inside
_async_no_await_request is fragile — the snapshot comment acknowledges the race but
the assignment still happens before _async_no_await_request is called. Since runner
is already available in the loop body, it should be passed directly as a parameter
to _async_no_await_request to avoid the shared-state race condition entirely.

osbenchmark/worker_coordinator/worker_coordinator.py [2693-2696]

 async for expected_scheduled_time, sample_type, _, runner, params in schedule:
     self.expected_scheduled_time = expected_scheduled_time
     self.sample_type = sample_type
-    self.runner = runner
 
+    if self.cancel.is_set():
+        self.logger.info("User cancelled execution.")
+        break
+
+    # Fire and forget mode - don't wait for responses
+    await self._async_no_await_request(params, expected_scheduled_time, total_start, runner=runner)
+
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a design smell where self.runner is set in the loop and then snapshotted inside _async_no_await_request. Passing runner directly as a parameter would be cleaner and eliminate the shared-state concern, though the improved code also restructures the loop body which changes more than just the runner passing.

Low
General
Clarify unused return value in fire-and-forget runner

The _fire_and_forget_query function awaits the HTTP request inline and returns a
result dict, but the comment says "The caller (AsyncNoAwaitExecutor) has already
wrapped this call in an asyncio.Task". However, looking at _async_no_await_request,
the runner is called inside fire_and_forget_runner which is wrapped in a task —
meaning _fire_and_forget_query is actually awaited inside the background task, not
in the dispatch loop. The returned dict from _fire_and_forget_query is unused since
fire_and_forget_runner discards the return value of runner(self.opensearch, params).
This is not a bug per se, but the misleading comment and unused return value could
cause confusion and maintenance issues.

osbenchmark/worker_coordinator/runners/opensearch.py [2278-2283]

-return {
-    "weight": 1,
-    "unit": "ops",
-    "success": True,
-    "fire_and_forget": True
-}
+# No return value needed; result is intentionally discarded in fire-and-forget mode
+return None
Suggestion importance[1-10]: 2

__

Why: The suggestion correctly identifies that the return value of _fire_and_forget_query is discarded by the caller, but changing it to return None is a minor style concern with minimal impact on correctness or functionality.

Low
Suggestions up to commit 976cfb7
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent race condition when draining in-flight tasks

*self._inflight_tasks is a set that is being modified concurrently by
handle_task_completion callbacks as tasks complete. Iterating over it while it's
being modified (via self._inflight_tasks) can raise RuntimeError: Set changed size
during iteration. Take a snapshot of the set before gathering to avoid this race
condition.

osbenchmark/worker_coordinator/worker_coordinator.py [2718-2723]

 if self._inflight_tasks:
+    inflight_snapshot = set(self._inflight_tasks)
     self.logger.info(
         "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
-        self.client_id, len(self._inflight_tasks)
+        self.client_id, len(inflight_snapshot)
     )
-    await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
+    await asyncio.gather(*inflight_snapshot, return_exceptions=True)
Suggestion importance[1-10]: 7

__

Why: The _inflight_tasks set can be modified by handle_task_completion callbacks while being iterated during asyncio.gather(*self._inflight_tasks), potentially causing a RuntimeError. Taking a snapshot before gathering is a valid fix for this race condition.

Medium
Prevent params mutation across loop iterations

The runner is assigned to self.runner and then a snapshot is taken inside
_async_no_await_request. However, since params is also passed by reference and
mutated inside _async_no_await_request (e.g., params["fire_and_forget"] = True and
params.pop("request-timeout", None)), the next iteration of the loop may receive a
mutated params dict from the parameter source if it reuses the same dict object.
Consider making a copy of params before passing it to _async_no_await_request.

osbenchmark/worker_coordinator/worker_coordinator.py [2693-2703]

 async for expected_scheduled_time, sample_type, _, runner, params in schedule:
     self.expected_scheduled_time = expected_scheduled_time
     self.sample_type = sample_type
     self.runner = runner
 
     if self.cancel.is_set():
         self.logger.info("User cancelled execution.")
         break
 
     # Fire and forget mode - don't wait for responses
-    await self._async_no_await_request(params, expected_scheduled_time, total_start)
+    # Copy params to avoid mutating the shared dict across iterations
+    await self._async_no_await_request(dict(params) if params else {}, expected_scheduled_time, total_start)
Suggestion importance[1-10]: 7

__

Why: The params dict is mutated inside _async_no_await_request by adding fire_and_forget key and removing request-timeout, which could corrupt the shared params object if the parameter source reuses the same dict across iterations. Copying params before passing it is a correct fix for this potential bug.

Medium
Safely consume task exceptions in callback

Calling t.exception() on a task that raised an exception will re-raise it in some
Python versions or contexts, and silently swallowing exceptions makes debugging very
difficult. The comment says "Consume exception to prevent logging" but this is a
risky pattern. Use a try/except block around t.exception() to safely consume the
exception without risk of propagation.

osbenchmark/worker_coordinator/worker_coordinator.py [2666-2670]

 def handle_task_completion(t):
     self._inflight_tasks.discard(t)
     if not t.cancelled():
-        t.exception()  # Consume exception to prevent logging
+        try:
+            t.exception()  # Consume exception to prevent unhandled exception warnings
+        except Exception:
+            pass
 task.add_done_callback(handle_task_completion)
Suggestion importance[1-10]: 6

__

Why: Calling t.exception() without a try/except can raise CancelledError in certain contexts. Wrapping it in a try/except block is a safer pattern, though the current code does check t.cancelled() first, reducing the risk somewhat.

Low
General
Accurately report failure status in fire-and-forget mode

The function always returns "success": True even when _raw_search raises an
exception. This means failures are silently masked and reported as successes, making
it impossible to detect errors in fire-and-forget mode. Consider returning
"success": False in the except block, or at minimum tracking the failure state.

osbenchmark/worker_coordinator/runners/opensearch.py [1272-1283]

+success = True
 try:
     await self._raw_search(opensearch, doc_type, index, body, ff_request_params, headers=headers)
 except Exception as e:
-    # TEMPORARY: log the error so we can diagnose why requests aren't landing
+    success = False
     self.logger.warning("[FF DEBUG] runner._raw_search failed: %s: %s", type(e).__name__, e)
 
 return {
     "weight": 1,
     "unit": "ops",
-    "success": True,
+    "success": success,
     "fire_and_forget": True
 }
Suggestion importance[1-10]: 4

__

Why: The function always returns "success": True even on exceptions, masking failures. However, since this is explicitly a fire-and-forget mode designed for maximum throughput without response handling, the design intent may be to always report success; the improvement is valid but may conflict with the intended behavior.

Low
Suggestions up to commit 5217071
CategorySuggestion                                                                                                                                    Impact
General
Restrict fire-and-forget mode to search operations only

The fire_and_forget flag is read inside the loop over task_allocations, causing a
redundant config lookup on every iteration. This is a minor inefficiency but more
importantly, if no_await mode is enabled for non-search operations (e.g., bulk
indexing), AsyncNoAwaitExecutor will still be used, which could cause unexpected
behavior since the docstring says it only supports search queries. Consider adding a
guard to only use AsyncNoAwaitExecutor for search operation types.

osbenchmark/worker_coordinator/worker_coordinator.py [2236-2248]

 fire_and_forget = self.cfg.opts("worker_coordinator", "no_await", mandatory=False, default_value=False)
-schedule = schedule_for(task_allocation, params_per_task[task], fire_and_forget)
+# ... (outside the loop)
 
-if fire_and_forget:
-    # Use AsyncNoAwaitExecutor for fire-and-forget mode
+# Inside the loop, add operation type check:
+use_fire_and_forget = fire_and_forget and task.operation.type in ("search", "paginated-search", "scroll-search", "knn-search")
+schedule = schedule_for(task_allocation, params_per_task[task], use_fire_and_forget)
+
+if use_fire_and_forget:
     async_executor = AsyncNoAwaitExecutor(
         client_id, task, schedule, opensearch, self.sampler, self.profile_sampler, self.cancel, self.complete,
         task.error_behavior(self.abort_on_error), self.cfg, self.shared_states, self.feedback_actor, self.error_queue, self.queue_lock)
 else:
-    # Use regular AsyncExecutor
     async_executor = AsyncExecutor(
         client_id, task, schedule, opensearch, self.sampler, self.profile_sampler, self.cancel, self.complete,
         task.error_behavior(self.abort_on_error), self.cfg, self.shared_states, self.feedback_actor, self.error_queue, self.queue_lock)
Suggestion importance[1-10]: 6

__

Why: The AsyncNoAwaitExecutor is documented to only support search queries, but the current code applies it to all operation types when no_await is enabled. Adding an operation type check prevents unexpected behavior for non-search operations like bulk indexing. This is a meaningful correctness improvement.

Low
Log suppressed exceptions for fire-and-forget debugging

The _fire_and_forget_query function is defined as a nested async def inside the
outer function but is called with await, meaning it executes synchronously within
the current coroutine rather than truly fire-and-forget. The actual HTTP request
inside it is also awaited. This means the caller (AsyncNoAwaitExecutor) already
wraps it in asyncio.create_task, so the double-await pattern is correct per the
docstring. However, the function returns a result dict that is then returned from
the outer function — but AsyncNoAwaitExecutor ignores the return value of
runner(self.opensearch, params). This is fine, but the _fire_and_forget_query
silently swallows all exceptions including connection errors, making debugging very
difficult. Consider at minimum logging the exception type.

osbenchmark/worker_coordinator/runners/opensearch.py [1272-1276]

-fire_and_forget = params.get("fire_and_forget", False)
-if fire_and_forget:
-    return await _fire_and_forget_query(opensearch, params)
+try:
+    await self._raw_search(opensearch, doc_type, index, body, ff_request_params, headers=headers)
+except Exception as e:
+    # Silently consume all exceptions in fire-and-forget mode
+    logging.getLogger(__name__).debug("Fire-and-forget query failed: %s: %s", type(e).__name__, e)
Suggestion importance[1-10]: 4

__

Why: Adding debug-level logging for suppressed exceptions in _fire_and_forget_query improves debuggability without impacting performance. The suggestion is valid but the improved_code only shows the inner try/except block, not the full function context, making it a minor improvement.

Low
Possible issue
Snapshot in-flight tasks before draining to avoid mutation

*The self._inflight_tasks set is being iterated while tasks may still be completing
and removing themselves via the done callback (discard). Passing
self._inflight_tasks creates a snapshot at that point, but new completions during
the gather could cause discard to be called on tasks already removed. More
critically, if a task completes between the if self._inflight_tasks check and the
asyncio.gather(...) call, the set could be empty by the time gather runs — this is
benign but the snapshot approach is correct. However, to be safe and avoid potential
issues with set mutation during iteration, take an explicit snapshot before
gathering.

osbenchmark/worker_coordinator/worker_coordinator.py [2718-2723]

-if self._inflight_tasks:
+inflight = set(self._inflight_tasks)
+if inflight:
     self.logger.info(
         "Client [%s] draining [%d] in-flight fire-and-forget tasks before exit...",
-        self.client_id, len(self._inflight_tasks)
+        self.client_id, len(inflight)
     )
-    await asyncio.gather(*self._inflight_tasks, return_exceptions=True)
+    await asyncio.gather(*inflight, return_exceptions=True)
Suggestion importance[1-10]: 5

__

Why: Taking an explicit snapshot of self._inflight_tasks before passing to asyncio.gather avoids potential issues with set mutation during iteration. The done callbacks call discard on the set, and while *self._inflight_tasks already creates a snapshot at call time, the explicit snapshot makes the intent clearer and is safer.

Low
Safely consume task exceptions in done callback

Calling t.exception() on a task that raised an exception will re-raise it if the
task was cancelled, but more importantly, calling it on a non-cancelled task that
completed without an exception returns None — this is fine. However, if the task
raised an exception, t.exception() returns it without raising, so it does consume
the exception. But if the task was cancelled, t.cancelled() is True and the guard
prevents calling t.exception(). The real issue is that if t.exception() itself
raises CancelledError (which it does when the task was cancelled), the callback
would propagate an unhandled exception. The guard if not t.cancelled() handles this,
but the comment is misleading and the code should also handle the case where
t.exception() might raise InvalidStateError if the task is not done yet (though done
callbacks are only called when the task is done). Consider wrapping in a try/except
for safety.

osbenchmark/worker_coordinator/worker_coordinator.py [2666-2670]

 def handle_task_completion(t):
     self._inflight_tasks.discard(t)
     if not t.cancelled():
-        t.exception()  # Consume exception to prevent logging
+        try:
+            t.exception()  # Consume exception to prevent unhandled exception logging
+        except Exception:
+            pass
Suggestion importance[1-10]: 4

__

Why: The suggestion adds a try/except around t.exception() in the done callback for safety. While the existing guard if not t.cancelled() already handles the CancelledError case, wrapping in try/except provides additional robustness against edge cases like InvalidStateError. This is a minor defensive improvement.

Low

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Persistent review updated to latest commit 3da0ec4

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Persistent review updated to latest commit 028da1b

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Persistent review updated to latest commit 5217071

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Persistent review updated to latest commit 976cfb7

@OVI3D0
Copy link
Copy Markdown
Member Author

OVI3D0 commented Apr 8, 2026

confirmed still working. OSB will not collect metrics in this mode but the requests are sent. In this case we sent 10,000 queries:

[ec2-user@ip-10-0-137-227 ~]$ curl -s 'http://localhost:9200/_nodes/stats/indices/search?human' | python3 -m json.tool | grep -E "query_total|query_time"
                    "query_total": 60006,
                    "query_time": "1.2m",
                    "query_time_in_millis": 76019,
                    "concurrent_query_total": 60002,
                    "concurrent_query_time": "1.2m",
                    "concurrent_query_time_in_millis": 75958,
                    "startree_query_total": 0,
                    "startree_query_time": "0s",
                    "startree_query_time_in_millis": 0,
[ec2-user@ip-10-0-137-227 ~]$ curl -s 'http://localhost:9200/_nodes/stats/indices/search?human' | python3 -m json.tool | grep -E "query_total|query_time"
                    "query_total": 60019,
                    "query_time": "1.2m",
                    "query_time_in_millis": 76038,
                    "concurrent_query_total": 60015,
                    "concurrent_query_time": "1.2m",
                    "concurrent_query_time_in_millis": 75977,
                    "startree_query_total": 0,
                    "startree_query_time": "0s",
                    "startree_query_time_in_millis": 0,
[ec2-user@ip-10-0-137-227 ~]$ curl -s 'http://localhost:9200/_nodes/stats/indices/search?human' | python3 -m json.tool | grep -E "query_total|query_time"
                    "query_total": 70006,
                    "query_time": "1.4m",
                    "query_time_in_millis": 88315,
                    "concurrent_query_total": 70002,
                    "concurrent_query_time": "1.4m",
                    "concurrent_query_time_in_millis": 88254,
                    "startree_query_total": 0,
                    "startree_query_time": "0s",
                    "startree_query_time_in_millis": 0,
[ec2-user@ip-10-0-137-227 ~]$ curl -s 'http://localhost:9200/_nodes/stats/indices/search?human' | python3 -m json.tool | grep -E "query_total|query_time"
                    "query_total": 70006,
                    "query_time": "1.4m",
                    "query_time_in_millis": 88315,
                    "concurrent_query_total": 70002,
                    "concurrent_query_time": "1.4m",
                    "concurrent_query_time_in_millis": 88254,
                    "startree_query_total": 0,
                    "startree_query_time": "0s",
                    "startree_query_time_in_millis": 0,
[ec2-user@ip-10-0-137-227 ~]$ curl -s 'http://localhost:9200/_nodes/stats/indices/search?human' | python3 -m json.tool | grep -E "query_total|query_time"
                    "query_total": 70006,
                    "query_time": "1.4m",
                    "query_time_in_millis": 88315,
                    "concurrent_query_total": 70002,
                    "concurrent_query_time": "1.4m",
                    "concurrent_query_time_in_millis": 88254,
                    "startree_query_total": 0,
                    "startree_query_time": "0s",
                    "startree_query_time_in_millis": 0,
[ec2-user@ip-10-0-137-227 ~]$

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

Persistent review updated to latest commit 1693a48

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 8a09702

OVI3D0 added 6 commits April 13, 2026 10:22
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
OVI3D0 added 2 commits April 13, 2026 10:22
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit d7eb567

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit e43a7fa

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants