Skip to content

Decoupling Metric Store from Worker Coordinator#988

Open
ajleong623 wants to merge 69 commits into
opensearch-project:mainfrom
ajleong623:memory-leaks
Open

Decoupling Metric Store from Worker Coordinator#988
ajleong623 wants to merge 69 commits into
opensearch-project:mainfrom
ajleong623:memory-leaks

Conversation

@ajleong623
Copy link
Copy Markdown
Contributor

@ajleong623 ajleong623 commented Nov 18, 2025

Description

There is a new actor called SamplePostProcessorActor. However the SamplePostProcessorActor acts as an actor for processing samples directly from the Worker actors as well as any task involving the metrics store. '

SamplePostProcessorActor initialization:

The new actor is initialized in the coordinator. This happens in the prepare_benchmark method. The SamplePostProcessorActor is initialized after receiving the StartSamplePostProcessorActor message which has the configurations for creating the metrics store, the metrics sample and profile sample post processor objects, and the telemetry collectors. StartTelemetry and StopTelemetry messages are now used to start and stop the telemetry collection threads through the coordinator

Which methods were changed:

  • send_samples: In the worker class, instead of just sending samples to the coordinator, a ProcessSamples message is sent to the new SamplePostProcessorActor which is then sent to the SamplePostprocessor instance inside of the SamplePostProcessorActor.
  • to_externalizable: In the metric store, the to_externalizable method is used to send the results of running the workload to the coordinator. We will now need to call the method through the SamplePostProcessorActor and the GetExternalizableMetricsStore message.
  • close: When the coordinator is closed, the message CloseMetricsStore will now be used to signal to the new actor to close the metrics store.
  • reset_relative_time: The ResetRelativeTimeRequest message will now be used instead to reset the relative time of the metric store in the new actor.

One of the concerns if with synchronization. A lot of methods involving the metrics store that were synchronous are now handled through an asynchronous message to the SamplePostProcessorActor which holds the metric store.

Issues Resolved

[List any issues this PR will resolve]

Testing

  • New functionality includes testing

[Describe how this change was tested]


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 3, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Important

Action Needed: IP Allowlist Update

If your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:

  • 136.113.208.247/32 (new)
  • 34.170.211.100/32
  • 35.222.179.152/32

Failure to add the new IP will result in interrupted reviews.


Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 16, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 401104e.

PathLineSeverityDescription
osbenchmark/worker_coordinator/worker_coordinator.py1430lowThe new SamplePostProcessorActor.prepare_telemetry removes the database_type check that previously skipped OpenSearch-specific telemetry devices for non-OpenSearch backends. This is a functional regression that could cause unintended cluster probing against non-OpenSearch targets, though it appears to be an oversight rather than deliberate.
osbenchmark/worker_coordinator/worker_coordinator.py1213lowTypo in method call: `time.from_is8601` (should be `from_iso8601`). If this code path is reached it will raise AttributeError, causing a silent failure in metrics index name generation for CPU-based redline feedback. Appears to be a bug rather than intentional sabotage.
tests/worker_coordinator/worker_coordinator_test.py2543lowTwo test methods share the same name (`test_get_externalizable_metrics_store_task_finished` and `test_close_metrics_store_via_message` each appear twice). In Python, the second definition silently overrides the first, meaning half the test coverage is dead. This is a code quality issue and appears to be a copy-paste mistake rather than deliberate test coverage suppression.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 0 | Low: 3


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 16, 2026

PR Reviewer Guide 🔍

(Review updated until commit e923af6)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Introduce SamplePostProcessorActor and decouple metrics store from coordinator

Relevant files:

  • osbenchmark/worker_coordinator/worker_coordinator.py

Sub-PR theme: Add unit tests for SamplePostProcessorActor

Relevant files:

  • tests/worker_coordinator/worker_coordinator_test.py

⚡ Recommended focus areas for review

Typo in Method

The method time.from_is8601 appears to be a typo — it should likely be time.from_iso8601. This will cause a runtime AttributeError when computing the metrics index name for redline testing.

ts = time.from_is8601(test_run_timestamp)
return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Double Close

close_metric_store() is called both in joinpoint_reached (when finished() is true, line 1249) and in close() (line 1327). If close() is called after the benchmark finishes, the metrics store will receive a second CloseMetricsStore message, potentially causing errors or double-close issues in the actor.

def close(self):
    self.progress_publisher.finish()
    self.close_metric_store()

def close_metric_store(self):
    self.target.send(self.target.sample_post_processor_actor, CloseMetricsStore())
Missing Database Type Check

The original prepare_telemetry in the coordinator had a database-type guard that skipped OpenSearch-specific telemetry devices for non-OpenSearch databases. The new prepare_telemetry inside SamplePostProcessorActor always adds all OpenSearch-specific telemetry devices regardless of the database type, losing that conditional logic.

def prepare_telemetry(self, opensearch, enable):
    enabled_devices = self.config.opts("telemetry", "devices")
    telemetry_params = self.config.opts("telemetry", "params")
    log_root = paths.test_run_root(self.config)

    os_default = opensearch["default"]

    if enable:
        devices = [
            telemetry.NodeStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.ExternalEnvironmentInfo(os_default, self.metrics_store),
            telemetry.ClusterEnvironmentInfo(os_default, self.metrics_store),
            telemetry.JvmStatsSummary(os_default, self.metrics_store),
            telemetry.IndexStats(os_default, self.metrics_store),
            telemetry.MlBucketProcessingTime(os_default, self.metrics_store),
            telemetry.SegmentStats(log_root, os_default),
            telemetry.CcrStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.RecoveryStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.TransformStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.SegmentReplicationStats(telemetry_params, opensearch, self.metrics_store),
            telemetry.ShardStats(telemetry_params, opensearch, self.metrics_store)
        ]
    else:
        devices = []
    self.telemetry = telemetry.Telemetry(enabled_devices, devices=devices)
Uninitialized Attributes

SamplePostProcessorActor does not define an __init__ method, so attributes like metrics_store, sample_post_processor, profile_metrics_post_processor, telemetry, and worker_coordinator_actor are only set when receiveMsg_StartSamplePostProcessorActor is called. Any message received before that (e.g., ProcessSamples, ResetRelativeTimeRequest) will raise an AttributeError.

class SamplePostProcessorActor(actor.BenchmarkActor):

    def receiveMsg_StartSamplePostProcessorActor(self, msg, sender):
        self.config = msg.config
        self.metrics_store = metrics.metrics_store(cfg=self.config,
                                                   workload=msg.workload.name,
                                                   test_procedure=msg.test_procedure.name,
                                                   read_only=False)
        self.sample_post_processor = DefaultSamplePostprocessor(self.metrics_store,
                                                         msg.downsample_factor,
                                                         msg.workload.meta_data,
                                                         msg.test_procedure.meta_data)
        self.profile_metrics_post_processor = ProfileMetricsSamplePostprocessor(self.metrics_store,
                                                                            msg.workload.meta_data,
                                                                            msg.test_procedure.meta_data)
        os_clients = self.create_os_clients()
        uses_static_responses = self.config.opts("client", "options").uses_static_responses

        # Avoid issuing any requests to the target cluster when static responses are enabled. The results
        # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
        self.prepare_telemetry(os_clients, enable=not uses_static_responses)
        self.worker_coordinator_actor = sender

    def receiveMsg_ProcessSamples(self, msg, sender):
        if msg.samples:
            self.sample_post_processor(msg.samples)
        if msg.profile_samples:
            self.profile_metrics_post_processor(msg.profile_samples)

    def receiveMsg_StartTelemetry(self, msg, sender):
        self.telemetry.on_benchmark_start()

    def receiveMsg_StopTelemetry(self, msg, sender):
        self.telemetry.on_benchmark_stop()

    def receiveMsg_CloseMetricsStore(self, msg, sender):
        self.close()

    def receiveMsg_GetExternalizableMetricsStore(self, msg, sender):
        # Some metrics store implementations return None because no external representation is required.
        # pylint: disable=assignment-from-none
        metric_results = self.metrics_store.to_externalizable(clear=msg.clear)
        if msg.reason == ReasonForExternalizableRequest.TASK_FINISHED:
            self.send(self.worker_coordinator_actor, TaskFinished(metric_results, msg.waiting_period))
        elif msg.reason == ReasonForExternalizableRequest.BENCHMARK_COMPLETED:
            self.logger.debug("Sending benchmark results...")
            self.send(self.worker_coordinator_actor, BenchmarkComplete(metric_results))

    def receiveMsg_ResetRelativeTimeRequest(self, msg, sender):
        self.metrics_store.reset_relative_time()

    def close(self):
        if self.metrics_store and self.metrics_store.opened:
            self.metrics_store.close()
Duplicate Sample Sending

In send_samples, both UpdateSamples (to self.master) and ProcessSamples (to self.sample_post_processor_actor) are sent with the same samples. If receiveMsg_UpdateSamples in the coordinator still calls self.coordinator.update_samples, and update_samples no longer accumulates raw samples, this may be fine — but it should be verified that the coordinator-side handling of UpdateSamples is consistent with the removal of raw_samples accumulation and that no samples are processed twice.

def send_samples(self):
    if self.sampler:
        samples = self.sampler.samples
        if len(samples) > 0:
            self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
        return samples

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 16, 2026

PR Code Suggestions ✨

Latest suggestions up to e923af6

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in ISO 8601 parsing method name

The method name from_is8601 appears to be a typo — it should be from_iso8601. This
will cause an AttributeError at runtime when CPU-based redline testing is used with
an OsMetricsStore.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method from_is8601 is clearly a typo for from_iso8601, which will cause an AttributeError at runtime when CPU-based redline testing is used with an OsMetricsStore. This is a critical bug that would break functionality.

High
Guard against sending to uninitialized actor

If self.sample_post_processor_actor is None (e.g., when a worker is started without
it being set), calling self.send(None, ...) will silently fail or raise an error. A
guard check should be added before sending to sample_post_processor_actor.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 5

__

Why: Adding a guard for self.sample_post_processor_actor being None is a reasonable defensive check, but in the current PR flow the actor is always set before send_samples is called, making this a minor robustness improvement rather than a critical fix.

Low
General
Prevent double-close of metrics store

The SamplePostProcessorActor.close() method closes the metrics store, but after
receiveMsg_GetExternalizableMetricsStore sends a BenchmarkComplete message,
receiveMsg_CloseMetricsStore is also sent separately via close_metric_store().
However, self.metrics_store is not set to None after closing, so a second
CloseMetricsStore message could attempt to close an already-closed store. Setting
self.metrics_store = None after closing prevents double-close issues.

osbenchmark/worker_coordinator/worker_coordinator.py [1400-1401]

-def receiveMsg_CloseMetricsStore(self, msg, sender):
-    self.close()
+def close(self):
+    if self.metrics_store and self.metrics_store.opened:
+        self.metrics_store.close()
+        self.metrics_store = None
Suggestion importance[1-10]: 5

__

Why: Setting self.metrics_store = None after closing prevents potential double-close issues, since CloseMetricsStore can be sent multiple times. The existing close() method already guards with self.metrics_store.opened, but nullifying the reference adds an extra layer of safety.

Low

Previous suggestions

Suggestions up to commit 3bc634a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in ISO 8601 parsing method name

The method name from_is8601 appears to be a typo — it should likely be from_iso8601
(note the missing 'o'). This will cause an AttributeError at runtime when CPU-based
redline testing is used with an OsMetricsStore.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method from_is8601 is clearly a typo for from_iso8601, which would cause an AttributeError at runtime when CPU-based redline testing is used with an OsMetricsStore. This is a critical bug that would break functionality.

High
Guard against None actor before sending samples

If self.sample_post_processor_actor is None (e.g., when the worker is started
without one being assigned), calling self.send(self.sample_post_processor_actor,
...) will fail or silently drop messages. A guard check should be added before
sending to sample_post_processor_actor.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 6

__

Why: If sample_post_processor_actor is None, calling self.send with it could cause a runtime error. Adding a guard check is a reasonable defensive measure, especially since the actor is initialized to None in __init__ and could remain unset in edge cases.

Low
Guard against uninitialized actor reference

close_metric_store sends a CloseMetricsStore message without checking if
self.target.sample_post_processor_actor is None. This method is called from both
close() and joinpoint_reached(), and if the actor was never initialized (e.g., due
to an early failure), this will raise an error. A None-guard should be added.

osbenchmark/worker_coordinator/worker_coordinator.py [1329-1330]

 def close_metric_store(self):
-    self.target.send(self.target.sample_post_processor_actor, CloseMetricsStore())
+    if self.target.sample_post_processor_actor:
+        self.target.send(self.target.sample_post_processor_actor, CloseMetricsStore())
Suggestion importance[1-10]: 6

__

Why: The close_metric_store method is called from both close() and joinpoint_reached() without checking if sample_post_processor_actor is None. Adding a None-guard prevents potential errors if the actor was never initialized due to an early failure.

Low
General
Clear metrics store reference after closing

The close() method in SamplePostProcessorActor only closes the metrics store if it
exists and is opened, but it does not set self.metrics_store = None afterward. This
means repeated CloseMetricsStore messages could attempt to close an already-closed
store if opened remains truthy, or leave a stale reference. The metrics store should
be set to None after closing, consistent with the pattern used in
WorkerCoordinator.joinpoint_reached.

osbenchmark/worker_coordinator/worker_coordinator.py [1400-1401]

 def receiveMsg_CloseMetricsStore(self, msg, sender):
     self.close()
+    self.metrics_store = None
 
+def close(self):
+    if self.metrics_store and self.metrics_store.opened:
+        self.metrics_store.close()
+
Suggestion importance[1-10]: 4

__

Why: While setting self.metrics_store = None after closing is a good defensive practice, the metrics_store.opened check in close() already prevents double-closing. This is a minor improvement for consistency with the pattern in WorkerCoordinator.joinpoint_reached.

Low
Suggestions up to commit 401104e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in ISO 8601 parsing method name

The method name from_is8601 appears to be a typo — it should be from_iso8601. This
will cause an AttributeError at runtime when CPU-based redline testing is used with
an OsMetricsStore.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method from_is8601 is clearly a typo for from_iso8601, which would cause an AttributeError at runtime when CPU-based redline testing is used with an OsMetricsStore. This is a critical bug that would break functionality.

High
Remove duplicate test method definitions

There are two pairs of duplicate test methods with identical names:
test_get_externalizable_metrics_store_task_finished and
test_close_metrics_store_via_message. In Python, the second definition silently
overrides the first, so one test case is never actually executed. Rename the
duplicates to cover distinct scenarios (e.g., BENCHMARK_COMPLETED reason).

tests/worker_coordinator/worker_coordinator_test.py [2602-2632]

 def test_get_externalizable_metrics_store_task_finished(self):
+    # existing test for TASK_FINISHED
     ...
-    self.actor.receiveMsg_GetExternalizableMetricsStore(msg, sender=None)
-    self.actor.send.assert_called_once()
 
-def test_get_externalizable_metrics_store_task_finished(self):
+def test_get_externalizable_metrics_store_benchmark_completed(self):
+    # new test for BENCHMARK_COMPLETED
     ...
-    self.actor.receiveMsg_GetExternalizableMetricsStore(msg, sender=None)
-    self.actor.send.assert_called_once()
 
 def test_close_metrics_store_via_message(self):
-    ...
-def test_close_metrics_store_via_message(self):
+    # existing test
     ...
 
+def test_close_metrics_store_when_already_closed(self):
+    # new test for already-closed store
+    ...
+
Suggestion importance[1-10]: 7

__

Why: The duplicate test methods test_get_externalizable_metrics_store_task_finished (lines 2580 and 2602) and test_close_metrics_store_via_message (lines 2570 and 2624) mean the first definitions are silently overridden and never executed, reducing test coverage. Renaming them to cover distinct scenarios would improve test completeness.

Medium
Guard against None actor reference before sending

If self.sample_post_processor_actor is None (e.g., when the worker is started
without a sample_post_processor_actor), calling
self.send(self.sample_post_processor_actor, ...) will fail. Add a guard to only send
to the actor if it is not None.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 6

__

Why: If sample_post_processor_actor is None, calling self.send with it would fail. Since the field is initialized to None in __init__ and only set when a StartWorker message is received with a non-None value, this guard prevents potential runtime errors in edge cases.

Low
Guard against uninitialized metrics store on close

receiveMsg_CloseMetricsStore delegates to self.close(), but close() checks
self.metrics_store and self.metrics_store.opened. If self.metrics_store was never
initialized (e.g., StartSamplePostProcessorActor was never received), this will
raise an AttributeError. Add a guard to check if metrics_store is defined before
calling close().

osbenchmark/worker_coordinator/worker_coordinator.py [1400-1401]

 def receiveMsg_CloseMetricsStore(self, msg, sender):
-    self.close()
+    if hasattr(self, 'metrics_store'):
+        self.close()
Suggestion importance[1-10]: 5

__

Why: While the guard is a reasonable defensive measure, in practice receiveMsg_CloseMetricsStore should only be called after receiveMsg_StartSamplePostProcessorActor has initialized metrics_store. The risk is real but low in normal operation flow.

Low
Suggestions up to commit e65fa2c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in ISO 8601 parsing method name

The method name from_is8601 appears to be a typo — it should be from_iso8601. This
will cause an AttributeError at runtime when attempting to compute the metrics index
name for CPU-based redline testing.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method from_is8601 is clearly a typo for from_iso8601 and will cause an AttributeError at runtime when CPU-based redline testing computes the metrics index name. This is a critical bug that would break functionality.

High
Prevent premature metrics store closure before results are sent

The metrics store is being closed (close_metric_store()) before
GetExternalizableMetricsStore has been processed by the SamplePostProcessorActor.
Since actor message processing is asynchronous, the CloseMetricsStore message (sent
by close_metric_store()) may arrive and close the store before to_externalizable()
is called. The close should only happen after the externalizable metrics have been
retrieved, i.e., it should be triggered from within
receiveMsg_GetExternalizableMetricsStore after sending BenchmarkComplete.

osbenchmark/worker_coordinator/worker_coordinator.py [1242-1251]

 if self.finished():
     self.target.send(self.target.sample_post_processor_actor, StopTelemetry())
     self.logger.info("All steps completed.")
     # Some metrics store implementations return None because no external representation is required.
     # pylint: disable=assignment-from-none
     self.target.send(self.target.sample_post_processor_actor, GetExternalizableMetricsStore(True, reason=ReasonForExternalizableRequest.BENCHMARK_COMPLETED))
-    self.logger.debug("Closing metrics store...")
-    self.close_metric_store()
-    # immediately clear as we don't need it anymore and it can consume a significant amount of memory
-    self.metrics_store = None
+    # Close is now handled inside SamplePostProcessorActor after sending BenchmarkComplete
Suggestion importance[1-10]: 7

__

Why: Since actor messages are processed sequentially in the SamplePostProcessorActor, the CloseMetricsStore message sent by close_metric_store() will be queued after GetExternalizableMetricsStore, so the ordering concern may be less severe than suggested. However, the suggestion raises a valid architectural point about ensuring the close happens after the externalizable metrics are retrieved.

Medium
Guard against uninitialized actor reference

If self.sample_post_processor_actor is None (e.g., when the actor hasn't been
initialized yet), calling self.send on it will raise an error. A guard check should
be added before sending to sample_post_processor_actor.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 5

__

Why: The sample_post_processor_actor is initialized to None in __init__ and set in receiveMsg_StartWorker, so there's a valid window where it could be None. Adding a guard prevents potential errors, though in practice send_samples should only be called after the worker is started.

Low
General
Stop actor after closing metrics store

The close() method only closes the metrics store but does not stop the actor itself.
After closing the metrics store, the actor should also stop itself (e.g., via
self.send(self.myAddress, ActorExitRequest())) to avoid resource leaks and dangling
actors.

osbenchmark/worker_coordinator/worker_coordinator.py [1400-1401]

 def receiveMsg_CloseMetricsStore(self, msg, sender):
     self.close()
+    self.send(self.myAddress, ActorExitRequest())
Suggestion importance[1-10]: 4

__

Why: While stopping the actor after closing the metrics store is a reasonable concern, the ActorExitRequest import and usage pattern depends on the actor framework specifics. This is a valid improvement but not critical, and the actor framework may handle cleanup differently.

Low
Suggestions up to commit 61ef6dc
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in ISO 8601 parsing method name

The method name from_is8601 appears to be a typo — it should likely be from_iso8601.
This will cause an AttributeError at runtime when CPU-based redline testing is used
with an OsMetricsStore.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method time.from_is8601 is clearly a typo for time.from_iso8601, which will cause an AttributeError at runtime when CPU-based redline testing is used with an OsMetricsStore. This is a critical bug that would break functionality.

High
Guard against uninitialized actor before sending

If self.sample_post_processor_actor is None (e.g., when the actor hasn't been
initialized yet), calling self.send(self.sample_post_processor_actor, ...) will
silently fail or raise an error. A guard check should be added before sending to the
actor.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 5

__

Why: While self.sample_post_processor_actor is initialized to None in __init__ and set in receiveMsg_StartWorker, adding a guard is a reasonable defensive measure, though in normal flow the actor should always be set before send_samples is called.

Low
Guard against None actor reference when closing

If self.target.sample_post_processor_actor is None (e.g., if prepare_benchmark was
never called or failed early), sending a message to it will cause a runtime error. A
guard check should be added to prevent this.

osbenchmark/worker_coordinator/worker_coordinator.py [1329-1330]

 def close_metric_store(self):
-    self.target.send(self.target.sample_post_processor_actor, CloseMetricsStore())
+    if self.target.sample_post_processor_actor:
+        self.target.send(self.target.sample_post_processor_actor, CloseMetricsStore())
Suggestion importance[1-10]: 5

__

Why: If prepare_benchmark was never called or failed early, self.target.sample_post_processor_actor could be None, causing a runtime error. Adding a guard is a reasonable defensive measure for error handling.

Low
Guard against uninitialized state in actor close

The SamplePostProcessorActor.close() method accesses self.metrics_store, but if
receiveMsg_StartSamplePostProcessorActor was never called (e.g., due to an error),
self.metrics_store will not be defined, causing an AttributeError. Add a hasattr
guard or initialize self.metrics_store = None in the actor's init.

osbenchmark/worker_coordinator/worker_coordinator.py [1400-1401]

 def receiveMsg_CloseMetricsStore(self, msg, sender):
-    self.close()
+    if hasattr(self, 'metrics_store'):
+        self.close()
 
+def close(self):
+    if self.metrics_store and self.metrics_store.opened:
+        self.metrics_store.close()
+
Suggestion importance[1-10]: 5

__

Why: Since SamplePostProcessorActor lacks an __init__ that initializes self.metrics_store, calling close() before receiveMsg_StartSamplePostProcessorActor would raise an AttributeError. Adding a hasattr guard or initializing in __init__ would prevent this.

Low
Suggestions up to commit 2367daa
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix typo in method name call

The method name from_is8601 appears to be a typo — it should likely be from_iso8601
(note the missing 'o'). This will cause an AttributeError at runtime when the method
is called.

osbenchmark/worker_coordinator/worker_coordinator.py [1216-1217]

 def index_name(self, test_run_timestamp):
-    ts = time.from_is8601(test_run_timestamp)
+    ts = time.from_iso8601(test_run_timestamp)
     return "benchmark-metrics-%04d-%02d" % (ts.year, ts.month)
Suggestion importance[1-10]: 9

__

Why: The method from_is8601 is clearly a typo for from_iso8601, which would cause an AttributeError at runtime. This is a critical bug that would break CPU-based redline testing functionality.

High
Avoid premature metrics store closure before async retrieval

The metrics store is being closed (CloseMetricsStore sent) immediately after
requesting the externalizable metrics store (GetExternalizableMetricsStore). Since
these are asynchronous actor messages, the close may execute before
to_externalizable completes in SamplePostProcessorActor, potentially resulting in
data loss or errors. The close should happen after the externalizable metrics have
been retrieved and the benchmark complete message has been sent.

osbenchmark/worker_coordinator/worker_coordinator.py [1242-1251]

-def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations):
-    ...
-        if self.finished():
-            self.target.send(self.target.sample_post_processor_actor, StopTelemetry())
-            self.logger.info("All steps completed.")
-            # Some metrics store implementations return None because no external representation is required.
-            # pylint: disable=assignment-from-none
-            self.target.send(self.target.sample_post_processor_actor, GetExternalizableMetricsStore(True, reason=ReasonForExternalizableRequest.BENCHMARK_COMPLETED))
-            self.logger.debug("Closing metrics store...")
-            self.close_metric_store()
-            # immediately clear as we don't need it anymore and it can consume a significant amount of memory
-            self.metrics_store = None
+if self.finished():
+    self.target.send(self.target.sample_post_processor_actor, StopTelemetry())
+    self.logger.info("All steps completed.")
+    # Some metrics store implementations return None because no external representation is required.
+    # pylint: disable=assignment-from-none
+    # Close will be triggered by SamplePostProcessorActor after sending BenchmarkComplete
+    self.target.send(self.target.sample_post_processor_actor, GetExternalizableMetricsStore(True, reason=ReasonForExternalizableRequest.BENCHMARK_COMPLETED))
+    # immediately clear as we don't need it anymore and it can consume a significant amount of memory
+    self.metrics_store = None
Suggestion importance[1-10]: 7

__

Why: The close_metric_store() call sends a CloseMetricsStore message immediately after GetExternalizableMetricsStore, but since actor messages are processed sequentially in thespian, the close would actually happen after the externalizable request is processed. However, the suggestion to move the close into SamplePostProcessorActor after sending BenchmarkComplete is architecturally cleaner and avoids potential ordering issues.

Medium
Ensure metrics store is closed after benchmark completion

When the reason is BENCHMARK_COMPLETED, the metrics store is never closed within
SamplePostProcessorActor. The close() call was removed from the coordinator but is
not present here either, meaning the metrics store may never be properly closed
after benchmark completion. Add a self.close() call after sending BenchmarkComplete.

osbenchmark/worker_coordinator/worker_coordinator.py [1403-1411]

 def receiveMsg_GetExternalizableMetricsStore(self, msg, sender):
     # Some metrics store implementations return None because no external representation is required.
     # pylint: disable=assignment-from-none
     metric_results = self.metrics_store.to_externalizable(clear=msg.clear)
     if msg.reason == ReasonForExternalizableRequest.TASK_FINISHED:
         self.send(self.worker_coordinator_actor, TaskFinished(metric_results, msg.waiting_period))
     elif msg.reason == ReasonForExternalizableRequest.BENCHMARK_COMPLETED:
         self.logger.debug("Sending benchmark results...")
         self.send(self.worker_coordinator_actor, BenchmarkComplete(metric_results))
+        self.logger.debug("Closing metrics store...")
+        self.close()
Suggestion importance[1-10]: 7

__

Why: The CloseMetricsStore message is sent separately from GetExternalizableMetricsStore, but if the close happens before to_externalizable completes, data could be lost. Moving the close into receiveMsg_GetExternalizableMetricsStore after BenchmarkComplete ensures proper ordering within the actor's message queue.

Medium
Guard against null actor reference before sending

If self.sample_post_processor_actor is None (e.g., when the worker is started
without a sample_post_processor_actor), calling
self.send(self.sample_post_processor_actor, ...) will fail. A guard check should be
added before sending to sample_post_processor_actor.

osbenchmark/worker_coordinator/worker_coordinator.py [1939-1946]

 def send_samples(self):
     if self.sampler:
         samples = self.sampler.samples
         if len(samples) > 0:
             self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
-            self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
+            if self.sample_post_processor_actor:
+                self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
         return samples
     return None
Suggestion importance[1-10]: 5

__

Why: If sample_post_processor_actor is None, calling self.send with it could fail. Adding a null check is a defensive improvement, though in normal flow the actor should always be set via StartWorker.

Low

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 92c7eaf.

PathLineSeverityDescription
osbenchmark/benchmark.py36lowimport os added as a new import but is not used anywhere in the visible diff for this file. Unused imports of sensitive modules like os warrant review, though this is likely an oversight from copy-pasting the import block.
osbenchmark/benchmark.py35lowimport linecache added but not used in benchmark.py. linecache can read arbitrary files by path; its usage is justified in worker_coordinator.py for traceback display, but its presence here without usage is anomalous.
osbenchmark/worker_coordinator/worker_coordinator.py47lowimport os added as a new import but is not used in any of the visible diff changes for this file. No clear justification for adding it alongside the tracemalloc profiling code.
osbenchmark/worker_coordinator/worker_coordinator.py1558lowdisplay_top uses print() instead of the existing logger infrastructure. This bypasses log-level filtering and log redirection, causing memory profiling output to always appear on stdout regardless of logging configuration. Could obscure other output or indicate intent to ensure visibility outside normal logging channels.
osbenchmark/worker_coordinator/worker_coordinator.py1671lowlog_memory_usage called on every WakeupMessage in both WorkerCoordinatorActor and Worker actors. WakeupMessages are typically high-frequency in actor systems; calling tracemalloc.take_snapshot() on each wakeup introduces significant performance overhead and generates voluminous stdout output, which could mask other diagnostic output. This appears to be debug code not intended for production.

The table above displays the top 10 most important findings.

Total: 5 | Critical: 0 | High: 0 | Medium: 0 | Low: 5


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 92c7eaf

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit b6de30c.

PathLineSeverityDescription
osbenchmark/worker_coordinator/worker_coordinator.py1600lowThe _write_memory_summary() function writes code_line snippets (captured via linecache) alongside allocation metadata to a local log file. While tracemalloc does not expose memory contents, source code lines written to logs could reveal implementation details if logs are stored in a shared or insufficiently protected location. This is a minor information disclosure risk with no evidence of malicious intent.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit b6de30c

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 88207f8

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 46613ae.

PathLineSeverityDescription
osbenchmark/benchmark.py1324lowtracemalloc.start() is called unconditionally in main() without any configuration flag or guard. This adds memory profiling overhead to all benchmark runs, which is anomalous for a performance benchmarking tool, but no malicious intent is evident. Likely leftover debug instrumentation that was not gated behind a flag.
osbenchmark/worker_coordinator/worker_coordinator.py1610lowThe display_top/log_memory_usage functions capture source code lines (via linecache.getline) and write them to local log files as part of memory profiling. This is the standard Python tracemalloc usage pattern and writes only to the application's own log directory (paths.logs()). No external transmission is present. Flagged as an anomaly only because this profiling code appears partially commented-out, suggesting work-in-progress debug instrumentation inadvertently left active.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 46613ae

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit e255d1a

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 1da9aaf.

PathLineSeverityDescription
osbenchmark/worker_coordinator/worker_coordinator.py691low_report_message_difference() is called on every WakeupMessage, appending to a log file on each invocation. This is an unusual pattern that could cause excessive disk I/O or disk exhaustion over long benchmark runs, but appears to be an incomplete/leftover debugging artifact rather than intentional malice.
osbenchmark/worker_coordinator/worker_coordinator.py925lowSampleUpdaterUnit actor class is defined but appears unused — it only relays UpdateSamples messages to self.parent with no other logic. The class has no parent reference set up and self.parent is never initialized. This seems like an incomplete refactoring artifact rather than a backdoor, but it is dead/unreachable code that is out of place.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 1da9aaf

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 440f088

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit b74b39f

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 2bdab0e

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 2199e7a

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 2367daa

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 61ef6dc

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit e65fa2c

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 401104e

Signed-off-by: Anthony Leong <aj.leong623@gmail.com>
@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit 3bc634a

@ajleong623 ajleong623 marked this pull request as ready for review April 19, 2026 20:13
@ajleong623
Copy link
Copy Markdown
Contributor Author

I have finished adding some unit tests. Please let me know if anyone has any questions, comments, or suggestions.

@github-actions
Copy link
Copy Markdown

Persistent review updated to latest commit e923af6

Comment on lines 1943 to +1944
self.send(self.master, UpdateSamples(self.worker_id, samples, self.profile_sampler.samples))
self.send(self.sample_post_processor_actor, ProcessSamples(samples, self.profile_sampler.samples))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we send samples to master and the new actor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the metrics collector is now in a separate actor, the progress message still needs the samples.

self.most_recent_sample_per_client[s.client_id] = s
. Should the progress bar also be handled in the new actor?

self.update_progress_message()

def index_name(self, test_run_timestamp):
ts = time.from_is8601(test_run_timestamp)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLM caught a typo here :) should be from_iso8601() no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also like that in the metrics file

def from_is8601(ts):
. I think that LLM is correct, though, and both functions should be changed.

self.profile_metrics_post_processor = ProfileMetricsSamplePostprocessor(self.metrics_store,
msg.workload.meta_data,
msg.test_procedure.meta_data)
os_clients = self.create_os_clients()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we create more OS clients here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new actor, telemetry has been moved over due to its dependence on the metric store. OS clients were a requirement for telemetry.

self.workload.meta_data,
self.test_procedure.meta_data)
self.profile_metrics_post_processor(profile_samples)
class SamplePostProcessorActor(actor.BenchmarkActor):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add an ActorExitRequest method to this actor for a clean shutdown on benchmark completion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Thank you for catching that. An edge case that has been bugging me is what happens when the coordinator shuts down before the SamplePostProcessorActor closes the metric store. I think that the clean up steps should be managed through looking out for that request. Additionally, maybe I should have named the new actor MetricStoreActor as well for readability and to tie it to the scope of its responsibilities.

@OVI3D0
Copy link
Copy Markdown
Member

OVI3D0 commented Apr 21, 2026

left a few comments/questions but this is shaping up pretty well! nice work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants