Skip to content

Make datapipe a bit more reusable #1562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 18, 2025
Merged

Conversation

kpom-specter
Copy link
Contributor

@kpom-specter kpom-specter commented Jun 6, 2025

Description

Refactor(datapipe): Move the datapipe logic that is the pipeline awayfrom the daemon/state managment. pipeline.go includes all business logic how how the steps execute. datapipe.go owns the order and timing.

Motivation and Context

Resolves BED-5905

How Has This Been Tested?

Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc.

Screenshots (optional):

Types of changes

  • Chore (a change that does not modify the application functionality)

Checklist:

Summary by CodeRabbit

  • New Features

    • Improved data pipeline management with a new pipeline interface for streamlined ingestion, analysis, pruning, and deletion operations.
    • Enhanced status tracking with new pipeline status values for more granular monitoring.
  • Bug Fixes

    • Improved error logging consistency in pipeline-related operations.
  • Refactor

    • Simplified and modularized the data pipeline daemon, delegating responsibilities to a dedicated pipeline component.
    • Refactored status update methods for clearer separation of concerns and easier maintenance.
    • Updated method signatures and return types for improved clarity and consistency.
    • Removed obsolete feature flag to streamline configuration.
  • Tests

    • Updated and extended tests to reflect changes in status handling and method signatures.

Copy link
Contributor

coderabbitai bot commented Jun 6, 2025

Walkthrough

This change refactors the datapipe daemon by introducing a Pipeline interface that encapsulates all pipeline operations and status checks. The daemon now delegates core logic—such as data pruning, ingestion, deletion, and analysis—to the pipeline implementation, simplifying its structure and responsibilities. Related interfaces, mocks, and tests are updated accordingly.

Changes

File(s) Change Summary
cmd/api/src/daemons/datapipe/datapipe.go Refactored Daemon to use a Pipeline interface; removed embedded services and logic, added status management, and updated the constructor and main loop.
cmd/api/src/daemons/datapipe/pipeline.go Introduced BHCEPipeline struct implementing Pipeline interface; encapsulates all pipeline operations and dependencies.
cmd/api/src/model/datapipestatus.go Added new status constants: DatapipeStatusPruning and DatapipeStatusStarting.
cmd/api/src/services/entrypoint.go Updated daemon initialization to use the new Pipeline abstraction and explicit start delay.
cmd/api/src/daemons/datapipe/agt.go Changed structured log key from "error" to "err" in two functions.
cmd/api/src/database/datapipestatus.go Refactored DatapipeStatusData interface: split status and analysis time update methods; updated implementation.
cmd/api/src/database/datapipestatus_integration_test.go Updated tests to use new status and analysis time update methods; switched some assertions to assert for continued test execution.
cmd/api/src/database/mocks/db.go Updated mock methods to match new interface signatures; added mock for UpdateLastAnalysisCompleteTime.
cmd/api/src/database/analysisrequest.go Changed signature and return order for HasCollectedGraphDataDeletionRequest method in interface and implementation.
cmd/api/src/model/appcfg/flag.go Removed FeaturePGMigrationDualIngest feature flag constant.

Sequence Diagram(s)

sequenceDiagram
    participant Entrypoint
    participant Daemon
    participant Pipeline
    participant Database

    Entrypoint->>Daemon: Initialize with Pipeline, startDelay, tickInterval, db
    Daemon->>Pipeline: Start(ctx)
    Daemon->>Pipeline: PruneData(ctx)
    Daemon->>Pipeline: DeleteData(ctx)
    Daemon->>Pipeline: IngestTasks(ctx)
    Daemon->>Pipeline: Analyze(ctx)
    Pipeline->>Database: (Various DB operations)
Loading

Possibly related PRs

Poem

In the warren where data flows,
The pipeline hops, the daemon knows.
With status set and tasks in line,
The code refactored, looking fine.
A carrot for each interface new,
This bunny’s proud of what you do!

((\
( -.-)
o_(")(")


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 943ba49 and 806519e.

📒 Files selected for processing (1)
  • cmd/api/src/model/appcfg/flag.go (0 hunks)
💤 Files with no reviewable changes (1)
  • cmd/api/src/model/appcfg/flag.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-analysis
  • GitHub Check: build-ui
  • GitHub Check: run-tests
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

55-87: 💡 Verification agent

🧩 Analysis chain

Start receives a ctx it never uses – dead parameter or wrong context?

Inside the method every DB call uses s.ctx, not the parameter ctx.
Either delete the parameter or prefer the incoming ctx to honour caller cancellation. Mixing both will mislead future maintainers.

-func (s *Daemon) Start(ctx context.Context) {
+func (s *Daemon) Start() {            // simplest

(or swap all s.ctx usages to the parameter).
Please align with whichever pattern the other daemons follow.


🏁 Script executed:

#!/bin/bash
set -e
# List daemon directories under cmd/api/src/daemons
find cmd/api/src/daemons -maxdepth 2 -mindepth 1 -type d

# Search for Start methods in all daemon packages
rg -n "func .\+Start" -A2 cmd/api/src/daemons

Length of output: 288


🏁 Script executed:

#!/bin/bash
set -e

echo "== Start method signatures under all daemons =="
rg -n "func.*Start" -A2 cmd/api/src/daemons

echo "== Daemon struct in datapipe =="
rg -n "type Daemon" -A5 cmd/api/src/daemons/datapipe

echo "== s.ctx references in datapipe =="
rg -n "s\.ctx" -n cmd/api/src/daemons/datapipe

Length of output: 5228


Use the incoming ctx in Datapipe.Start

The Start(ctx context.Context) method in cmd/api/src/daemons/datapipe/datapipe.go never uses its ctx parameter—instead it always uses the struct’s s.ctx. The GC daemon, by contrast, honours the passed‐in context. To prevent future confusion and ensure caller cancellations are respected, update all references to s.ctx in Start (and in WithDatapipeStatus) to use the ctx parameter, and drop the struct field if it’s no longer needed.

Locations to update:

  • cmd/api/src/daemons/datapipe/datapipe.go lines 55–87
  • WithDatapipeStatus signature and internals in the same file—switch its uses of s.ctx to the incoming ctx

Example diff in Start:

 func (s *Daemon) Start(ctx context.Context) {
     var (
         datapipeLoopTimer = time.NewTimer(s.tickInterval)
         pruningTicker     = time.NewTicker(pruningInterval)
     )
@@
-   defer datapipeLoopTimer.Stop()
-   defer pruningTicker.Stop()
+   defer datapipeLoopTimer.Stop()
+   defer pruningTicker.Stop()
@@ select {
-   case <-pruningTicker.C:
+   case <-pruningTicker.C:
        s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.clearOrphanedData)
@@
-   case <-datapipeLoopTimer.C:
+   case <-datapipeLoopTimer.C:
        if s.db.HasCollectedGraphDataDeletionRequest(ctx) {
            s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.deleteData)
        }
@@
-   case <-s.ctx.Done():
+   case <-ctx.Done():
        return
 }

And update:

- func (s *Daemon) WithDatapipeStatus(status model.DatapipeStatus, f func())
+ func (s *Daemon) WithDatapipeStatus(ctx context.Context, status model.DatapipeStatus, f func())
🧹 Nitpick comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (1)

83-97: Fire-and-forget goroutine may leak

go s.orphanedFileSweeper.Clear(...) is launched without waiting or error handling.
If the daemon exits early or ctx is cancelled the sweep can continue writing to deleted temp dirs.
Return a WaitGroup/error or run synchronously during startup.

cmd/api/src/daemons/datapipe/datapipe.go (1)

95-108: Early-return in WithDatapipeStatus can leave the pipe in an indeterminate state

If SetDatapipeStatus fails, the function returns without attempting to reset the status to Idle, leaving any previous status lingering forever.
Consider still deferring the reset (best-effort) even when the initial set fails, or at least logging loudly that manual intervention is required.

Nit: fmt.Sprintf inside slog.ErrorContext is redundant – pass structured args instead.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 510fd2f and eb1893f.

📒 Files selected for processing (4)
  • cmd/api/src/daemons/datapipe/datapipe.go (3 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/model/datapipestatus.go (1 hunks)
  • cmd/api/src/services/entrypoint.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-tests
  • GitHub Check: run-analysis
  • GitHub Check: build-ui
🔇 Additional comments (3)
cmd/api/src/model/datapipestatus.go (1)

28-29: Ensure new status is handled throughout the codebase

DatapipeStatusPruning is added here, but callers (UI, API responses, switch statements) must be updated to avoid falling into default branches or returning 500s for unknown status values.

cmd/api/src/services/entrypoint.go (1)

108-112: Constructor arg order looks correct, but type-check the cache parameter

datapipe.NewDaemon now receives graphQueryCache. In the previous design this parameter was a pointer. Double-check the constructor signature matches the concrete type you are passing; an implicit conversion error will only be caught at build time.

cmd/api/src/daemons/datapipe/pipeline.go (1)

129-170: Feature-flag check ignores flag value

GetFlagByKey’s return is discarded; cache reset happens solely based on flag existence.

-if _, err := s.db.GetFlagByKey(s.ctx, appcfg.FeatureEntityPanelCaching); err != nil {
+if flag, err := s.db.GetFlagByKey(s.ctx, appcfg.FeatureEntityPanelCaching); err != nil {
     slog.ErrorContext(s.ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err))
 } else {
-    if err := s.cache.Reset(); err != nil {
+    if flag.Enabled { // assuming typical boolean field
+        if err := s.cache.Reset(); err != nil {
         ...
     }
 }

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
cmd/api/src/daemons/datapipe/pipeline.go (3)

62-79: ⚠️ Potential issue

Fix deferred instrumentation and typo.

This matches a previous review comment. The measure.Measure(...) is executed inside the inner defer, recording ~0 ns instead of the full purge duration, and RequestAnalysis(..., "datapie") has a misspelling.

Apply this diff to fix both issues:

-func (s *BHCEPipeline) DeleteData(ctx context.Context) {
-	defer func() {
-		_ = s.db.DeleteAnalysisRequest(ctx)
-		_ = s.db.RequestAnalysis(ctx, "datapie")
-		measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")()
-	}()
+func (s *BHCEPipeline) DeleteData(ctx context.Context) {
+	defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")()
+	
+	defer func() {
+		_ = s.db.DeleteAnalysisRequest(ctx)
+		_ = s.db.RequestAnalysis(ctx, "datapipe")
+	}()

99-108: 🛠️ Refactor suggestion

No error propagation from job & graphify services.

This matches a previous review comment. ProcessTasks, ProcessStaleIngestJobs, and ProcessFinishedIngestJobs all discard potential errors, masking operational problems.

Have these methods return an error slice and surface at least one in logs to ensure proper error visibility.


112-125: ⚠️ Potential issue

Potential race updating job counters.

This matches a previous review comment. Two workers calling the closure for the same job will read-modify-write TotalFiles and FailedFiles non-atomically.

Prefer a single UPDATE … SET total_files = total_files + ? SQL statement or add DB-level aggregation to ensure atomic updates.

🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)

127-168: Well-structured analysis workflow with proper error handling.

The method correctly handles the analysis workflow with appropriate error handling and early returns.

One minor suggestion: consider extracting the cache reset logic (lines 157-166) into a separate method to improve readability and testability.

+func (s *BHCEPipeline) resetCacheIfEnabled(ctx context.Context) {
+	if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
+		slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err))
+	} else {
+		if err := s.cache.Reset(); err != nil {
+			slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err))
+		} else {
+			slog.Info("Cache successfully reset by datapipe daemon")
+		}
+	}
+}

Then replace lines 157-166 with:

-		if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
-			slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err))
-		} else {
-			if err := s.cache.Reset(); err != nil {
-				slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err))
-			} else {
-				slog.Info("Cache successfully reset by datapipe daemon")
-			}
-		}
+		s.resetCacheIfEnabled(ctx)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eb1893f and 3a180cc.

📒 Files selected for processing (3)
  • cmd/api/src/daemons/datapipe/datapipe.go (2 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/services/entrypoint.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cmd/api/src/services/entrypoint.go
  • cmd/api/src/daemons/datapipe/datapipe.go
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-tests
🔇 Additional comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)

47-58: LGTM! Clean constructor with proper dependency injection.

The constructor properly initializes all dependencies without storing the context, which addresses the previous context ownership concern.


83-95: LGTM! Proper async file cleanup with error handling.

The method correctly handles errors and uses a goroutine for non-blocking file cleanup operations.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
cmd/api/src/daemons/datapipe/pipeline.go (3)

78-82: ⚠️ Potential issue

Deferred instrumentation & typo break purge flow

  1. measure.Measure(...) is executed inside the inner defer, so it records ~0 ns instead of the full purge duration.
  2. RequestAnalysis(..., "datapie") has a misspelling.
-func (s *BHCEPipeline) DeleteData(ctx context.Context) {
-    defer func() {
-        _ = s.db.DeleteAnalysisRequest(ctx)
-        _ = s.db.RequestAnalysis(ctx, "datapie")
-        measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")()
-    }()
+func (s *BHCEPipeline) DeleteData(ctx context.Context) {
+    // Record duration of the whole function.
+    defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")()
+
+    defer func() {
+        _ = s.db.DeleteAnalysisRequest(ctx)
+        _ = s.db.RequestAnalysis(ctx, "datapipe")
+    }()

115-121: 🛠️ Refactor suggestion

No error propagation from job & graphify services

ProcessTasks, ProcessStaleIngestJobs, and ProcessFinishedIngestJobs all discard potential errors, masking operational problems. Have these methods return an error slice and surface at least one in logs.


127-138: ⚠️ Potential issue

Potential race updating job counters

Two workers calling the closure for the same job will read-modify-write TotalFiles and FailedFiles non-atomically. Prefer a single UPDATE … SET total_files = total_files + ? SQL statement or add DB-level aggregation.

🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)

107-107: Uncoordinated goroutine for orphaned file cleanup

The orphaned file sweeper runs asynchronously without any coordination, error handling, or lifecycle management. Consider adding proper context handling and error reporting.

-        go s.orphanedFileSweeper.Clear(ctx, expectedFiles)
+        go func() {
+            if err := s.orphanedFileSweeper.Clear(ctx, expectedFiles); err != nil {
+                slog.ErrorContext(ctx, "Failed to clear orphaned files", "error", err)
+            }
+        }()
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a180cc and 36cc1f8.

📒 Files selected for processing (3)
  • cmd/api/src/daemons/datapipe/datapipe.go (2 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/model/datapipestatus.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-analysis
  • GitHub Check: build-ui
  • GitHub Check: run-tests
🔇 Additional comments (5)
cmd/api/src/model/datapipestatus.go (1)

28-29: Good addition of granular status constants

These new status constants support better operational visibility in the refactored datapipe.

cmd/api/src/daemons/datapipe/pipeline.go (2)

36-44: Well-designed interface for pipeline abstraction

The Pipeline interface cleanly separates the business logic operations from daemon orchestration, improving modularity and testability.


57-68: Constructor correctly encapsulates dependencies

The pipeline constructor properly initializes all required services and dependencies in one place.

cmd/api/src/daemons/datapipe/datapipe.go (2)

33-37: Excellent refactoring - clean separation of concerns

The simplified Daemon struct with Pipeline interface delegation achieves the goal of separating business logic from orchestration.


91-104: Robust status management with proper cleanup

The WithDatapipeStatus helper provides excellent centralized status management with proper cleanup via defer.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
cmd/api/src/daemons/datapipe/pipeline.go (3)

69-74: ⚠️ Potential issue

measure.Measure still records ~0 ns – place it in an outer defer
The instrumentation is executed inside the same defer that performs follow-up DB ops, so it measures only the time taken by that defer (~instantaneous), not the whole purge.
Move the measurement to its own outer defer (or switch to measure.LogAndMeasure) so the duration covers the full DeleteData execution.

-func (s *BHCEPipeline) DeleteData(ctx context.Context) {
-    defer func() {
-        _ = s.db.DeleteAnalysisRequest(ctx)
-        _ = s.db.RequestAnalysis(ctx, "datapipe")
-        measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")()
-    }()
+func (s *BHCEPipeline) DeleteData(ctx context.Context) {
+    // Capture the true duration of the whole purge.
+    defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")()
+
+    defer func() {
+        _ = s.db.DeleteAnalysisRequest(ctx)
+        _ = s.db.RequestAnalysis(ctx, "datapipe")
+    }()

105-113: ⚠️ Potential issue

Still discarding errors from graphify & job services
ProcessTasks, ProcessStaleIngestJobs, and ProcessFinishedIngestJobs may return errors that are silently ignored, masking operational issues and complicating troubleshooting.
Surface these errors (collect and log, or bubble up) so that failures become visible.


118-129: ⚠️ Potential issue

Concurrent counter updates remain non-atomic
The read-modify-write of TotalFiles / FailedFiles can race when multiple workers update the same job. Use a single atomic SQL update (e.g. UPDATE … SET total_files = total_files + ?, …) or DB-level aggregation instead of loading and rewriting the struct.

cmd/api/src/daemons/datapipe/datapipe.go (1)

71-78: ⚠️ Potential issue

Incorrect status constants (Purging used for Start/Prune)
DatapipeStatusStarting should wrap the initial pipeline.Start, and DatapipeStatusPruning should wrap pipeline.PruneData to leverage the new status codes introduced in this PR.

- s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.Start)
+ s.WithDatapipeStatus(ctx, model.DatapipeStatusStarting, s.pipeline.Start)

 ...

- s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.PruneData)
+ s.WithDatapipeStatus(ctx, model.DatapipeStatusPruning, s.pipeline.PruneData)
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)

167-175: Feature flag value ignored when resetting cache
GetFlagByKey is used only to check for an error; the actual flag value (enabled/disabled) is discarded, so the cache is reset even when the feature is turned off. Retrieve the flag and respect its Enabled state.

- if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
+ if flag, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
     slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err))
 } else {
-     if err := s.cache.Reset(); err != nil {
+     if !flag.Enabled {
+         slog.Info("Entity panel caching disabled – skipping cache reset")
+     } else if err := s.cache.Reset(); err != nil {
          ...
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 36cc1f8 and ff85097.

📒 Files selected for processing (2)
  • cmd/api/src/daemons/datapipe/datapipe.go (2 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
cmd/api/src/daemons/datapipe/pipeline.go (7)
cmd/api/src/daemons/datapipe/cleanup.go (3)
  • OrphanFileSweeper (55-59)
  • NewOrphanFileSweeper (61-67)
  • NewOSFileOperations (42-44)
cmd/api/src/services/job/service.go (2)
  • JobService (42-45)
  • NewJobService (47-52)
cmd/api/src/services/graphify/service.go (2)
  • GraphifyService (36-42)
  • NewGraphifyService (44-52)
packages/go/bhlog/measure/measure.go (2)
  • Measure (47-56)
  • LogAndMeasure (75-90)
cmd/api/src/services/graphify/tasks.go (1)
  • UpdateJobFunc (41-41)
cmd/api/src/daemons/datapipe/analysis.go (3)
  • RunAnalysisOperations (42-118)
  • ErrAnalysisFailed (38-38)
  • ErrAnalysisPartiallyCompleted (39-39)
cmd/api/src/model/appcfg/flag.go (1)
  • FeatureEntityPanelCaching (32-32)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-tests
  • GitHub Check: build-ui
  • GitHub Check: run-analysis

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

33-36: Comment/type name mismatch – tidy up the doc-string

The comment still refers to PipelineInterface, but the exported type is Pipeline.
Either rename the comment or the type to keep them in sync and avoid confusion for readers and tooling that mine comments.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ff85097 and 16f9216.

📒 Files selected for processing (1)
  • cmd/api/src/daemons/datapipe/datapipe.go (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-analysis
  • GitHub Check: run-tests
🔇 Additional comments (3)
cmd/api/src/daemons/datapipe/datapipe.go (3)

45-47: Storing the Pipeline as an interface value looks good

Moving from a concrete value copy to an interface solves the state-copy issue flagged earlier. Nothing further here.


71-71: Correct status constants ➜ ✅

DatapipeStatusStarting and DatapipeStatusPruning are now used in the right places — thanks for addressing the earlier feedback.

Also applies to: 77-77


80-82: Missing error handling around deletion-request check

If db.HasCollectedGraphDataDeletionRequest can return an error, it is silently ignored here, potentially masking DB connectivity issues.
Confirm the helper’s signature; if it returns (bool, error) consider:

if ok, err := s.db.HasCollectedGraphDataDeletionRequest(ctx); err != nil {
    slog.ErrorContext(ctx, "failed querying deletion request", "error", err)
} else if ok {
    s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.DeleteData)
}

@kpom-specter kpom-specter marked this pull request as draft June 16, 2025 18:05
@kpom-specter kpom-specter force-pushed the kpom/BED-5905-second_refactor_pass branch 3 times, most recently from e319869 to 96fca6e Compare July 3, 2025 16:30
@kpom-specter kpom-specter force-pushed the kpom/BED-5905-second_refactor_pass branch from 96fca6e to e73f97b Compare July 7, 2025 19:17
@kpom-specter kpom-specter marked this pull request as ready for review July 7, 2025 20:25
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

56-63: Add input validation for robustness.

The constructor should validate its inputs to prevent runtime issues. Consider adding checks for nil pipeline and invalid time intervals.

 func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon {
+	if pipeline == nil {
+		panic("datapipe.NewDaemon: pipeline must not be nil")
+	}
+	if tickInterval <= 0 {
+		panic("datapipe.NewDaemon: tickInterval must be > 0")
+	}
+	if startDelay < 0 {
+		panic("datapipe.NewDaemon: startDelay must be >= 0")
+	}
+
 	return &Daemon{
 		db:           db,
 		tickInterval: tickInterval,
 		pipeline:     pipeline,
 		startDelay:   startDelay,
 	}
 }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

33-43: Improve the interface comment for clarity.

The comment about "instance state" and "whatever is needed by the pipe to do the work" could be clearer. Consider revising to better explain the interface's purpose and the boolean return values.

-// Pipeline defines instance methods that operate on pipeline state.
-// These methods require a fully initialized Pipeline instance including
-// graph and db connections. Whatever is neeeded by the pipe to do the work
+// Pipeline defines methods for executing datapipe operations.
+// These methods require a fully initialized Pipeline instance with all
+// necessary dependencies (graph DB, cache, etc.). Each method returns
+// true on successful completion, false otherwise.

Note: There's also a typo "neeeded" that should be "needed" in the current comment.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 16f9216 and e73f97b.

📒 Files selected for processing (3)
  • cmd/api/src/daemons/datapipe/datapipe.go (1 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/model/datapipestatus.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cmd/api/src/model/datapipestatus.go
  • cmd/api/src/daemons/datapipe/pipeline.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/datapipe.go (5)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
🔇 Additional comments (3)
cmd/api/src/daemons/datapipe/datapipe.go (3)

45-50: Good refactoring - clean and focused struct design.

The simplified Daemon struct effectively separates concerns by delegating pipeline operations to the Pipeline interface while maintaining only the essential orchestration fields.


65-97: Excellent refactoring - clean separation of concerns.

The Start method effectively uses the Pipeline interface and centralizes status management through WithDatapipeStatus. The appropriate status constants are used for each operation, addressing previous review feedback.


103-127: Good centralization of status management.

The WithDatapipeStatus method effectively centralizes status management around pipeline operations, ensuring consistent status transitions and proper context usage. The design aligns well with the established architecture where only the datapipe manages its own status.

@kpom-specter kpom-specter force-pushed the kpom/BED-5905-second_refactor_pass branch from e73f97b to a34ebb3 Compare July 8, 2025 19:31
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)

77-86: Fix the else-if chain that prevents complete cleanup on failures

The else-if structure causes cleanup to stop after the first failure, potentially leaving the system in an inconsistent state. If CancelAllIngestJobs fails, the subsequent cleanup steps (DeleteAllIngestTasks and DeleteCollectedGraphData) won't execute.

Execute each cleanup step independently to ensure all operations are attempted:

-if err := s.db.CancelAllIngestJobs(ctx); err != nil {
-    slog.ErrorContext(ctx, fmt.Sprintf("Error cancelling jobs during data deletion: %v", err))
-    return false
-} else if err := s.db.DeleteAllIngestTasks(ctx); err != nil {
-    slog.ErrorContext(ctx, fmt.Sprintf("Error deleting ingest tasks during data deletion: %v", err))
-    return false
-} else if err := DeleteCollectedGraphData(ctx, s.graphdb); err != nil {
-    slog.ErrorContext(ctx, fmt.Sprintf("Error deleting graph data: %v", err))
-    return false
-}
+var hasErrors bool
+if err := s.db.CancelAllIngestJobs(ctx); err != nil {
+    slog.ErrorContext(ctx, "error cancelling ingest jobs", "error", err)
+    hasErrors = true
+}
+if err := s.db.DeleteAllIngestTasks(ctx); err != nil {
+    slog.ErrorContext(ctx, "error deleting ingest tasks", "error", err)
+    hasErrors = true
+}
+if err := DeleteCollectedGraphData(ctx, s.graphdb); err != nil {
+    slog.ErrorContext(ctx, "error deleting graph data", "error", err)
+    hasErrors = true
+}
+return !hasErrors

112-118: Add error handling for service method calls

The calls to ProcessTasks, ProcessStaleIngestJobs, and ProcessFinishedIngestJobs discard potential errors, which could mask operational problems and make debugging difficult.

Consider modifying these service methods to return errors and handle them appropriately:

-// Ingest all available ingest tasks
-s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db))
-
-// Manage time-out state progression for ingest jobs
-s.jobService.ProcessStaleIngestJobs()
-
-// Manage nominal state transitions for ingest jobs
-s.jobService.ProcessFinishedIngestJobs()
+// Ingest all available ingest tasks
+if err := s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)); err != nil {
+    slog.ErrorContext(ctx, "error processing ingest tasks", "error", err)
+}
+
+// Manage time-out state progression for ingest jobs  
+if err := s.jobService.ProcessStaleIngestJobs(); err != nil {
+    slog.ErrorContext(ctx, "error processing stale ingest jobs", "error", err)
+}
+
+// Manage nominal state transitions for ingest jobs
+if err := s.jobService.ProcessFinishedIngestJobs(); err != nil {
+    slog.ErrorContext(ctx, "error processing finished ingest jobs", "error", err)
+}
🧹 Nitpick comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)

78-84: Improve error logging consistency

The error logging uses fmt.Sprintf with string interpolation, which is less efficient and doesn't leverage structured logging benefits. Consider using structured logging fields for better observability.

-slog.ErrorContext(ctx, fmt.Sprintf("Error cancelling jobs during data deletion: %v", err))
+slog.ErrorContext(ctx, "error cancelling jobs during data deletion", "error", err)

Apply similar changes to the other error logging statements in this method.


103-103: Consider adding error handling for goroutine

The orphaned file sweeper is launched as a goroutine without error handling. While this may be acceptable for background cleanup, consider whether errors should be logged or handled.

-go s.orphanedFileSweeper.Clear(ctx, expectedFiles)
+go func() {
+    if err := s.orphanedFileSweeper.Clear(ctx, expectedFiles); err != nil {
+        slog.ErrorContext(ctx, "error clearing orphaned files", "error", err)
+    }
+}()

This assumes the Clear method returns an error. If it doesn't, you may want to consider modifying it to return errors for better observability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e73f97b and a34ebb3.

📒 Files selected for processing (4)
  • cmd/api/src/daemons/datapipe/datapipe.go (1 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/model/datapipestatus.go (1 hunks)
  • cmd/api/src/services/entrypoint.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • cmd/api/src/model/datapipestatus.go
  • cmd/api/src/services/entrypoint.go
  • cmd/api/src/daemons/datapipe/datapipe.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/pipeline.go (9)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:66-77
Timestamp: 2025-05-27T17:28:28.139Z
Learning: Avoid using if/else pipeline structures when working with resources that have a Close() method, as this pattern can lead to resource leaks. Nested pipelines should be avoided in favor of more explicit resource management patterns with proper cleanup.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-tests
  • GitHub Check: build-ui
  • GitHub Check: run-analysis
🔇 Additional comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)

37-46: LGTM! Well-structured pipeline with proper dependency injection

The BHCEPipeline struct is well-designed with clear separation of concerns and proper dependency injection. The struct contains all necessary dependencies for pipeline operations without storing long-lived contexts.


48-59: LGTM! Constructor properly addresses context ownership

The constructor correctly takes a context parameter for initialization but doesn't store it in the struct, addressing the context ownership concern from previous reviews. Each method will receive its own context parameter.

@kpom-specter kpom-specter force-pushed the kpom/BED-5905-second_refactor_pass branch from 398882c to cdf1484 Compare July 11, 2025 15:26
@kpom-specter kpom-specter changed the title [DRAFT] Make datapipe a bit more reusable Make datapipe a bit more reusable Jul 11, 2025
@kpom-specter kpom-specter force-pushed the kpom/BED-5905-second_refactor_pass branch 2 times, most recently from 7297f39 to 81f8cf7 Compare July 14, 2025 04:42
@bsheth711 bsheth711 self-requested a review July 14, 2025 20:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
cmd/api/src/daemons/datapipe/pipeline.go (3)

1-1: Update copyright year to current year.

Based on past review feedback, the copyright year should be updated to reflect the current year.


110-120: Fix return type and add error propagation.

Two issues need addressing:

  1. Return type mismatch: Function returns bool but signature declares error
  2. Missing error propagation: Service calls discard potential errors

Apply this diff:

-func (s *BHCEPipeline) IngestTasks(ctx context.Context) error {
+func (s *BHCEPipeline) IngestTasks(ctx context.Context) error {
 	// Ingest all available ingest tasks
-	s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db))
+	if err := s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)); err != nil {
+		slog.ErrorContext(ctx, "error processing tasks", "error", err)
+		return err
+	}
 
 	// Manage time-out state progression for ingest jobs
-	s.jobService.ProcessStaleIngestJobs()
+	if err := s.jobService.ProcessStaleIngestJobs(); err != nil {
+		slog.ErrorContext(ctx, "error processing stale ingest jobs", "error", err)
+		return err
+	}
 
 	// Manage nominal state transitions for ingest jobs
-	s.jobService.ProcessFinishedIngestJobs()
-	return true
+	if err := s.jobService.ProcessFinishedIngestJobs(); err != nil {
+		slog.ErrorContext(ctx, "error processing finished ingest jobs", "error", err)
+		return err
+	}
+	return nil
 }

144-190: Fix return type and cache reset logic.

Two critical issues:

  1. Return type mismatch: Function returns bool but signature declares error
  2. Incorrect cache reset logic: Cache resets when flag retrieval fails instead of when flag is enabled

The cache reset logic contradicts the learning that cache should only reset when flag retrieval succeeds AND flag is enabled.

Apply this diff:

 func (s *BHCEPipeline) Analyze(ctx context.Context) error {
 
 	// If there are completed ingest jobs or if analysis was user-requested, perform analysis.
 	if hasJobsWaitingForAnalysis, err := s.jobService.HasIngestJobsWaitingForAnalysis(); err != nil {
-		slog.ErrorContext(ctx, fmt.Sprintf("Failed looking up jobs waiting for analysis: %v", err))
-		return false
+		slog.ErrorContext(ctx, "failed looking up jobs waiting for analysis", "error", err)
+		return err
 	} else if hasJobsWaitingForAnalysis || s.db.HasAnalysisRequest(ctx) {
 
 		// Ensure that the user-requested analysis switch is deleted. This is done at the beginning of the
 		// function so that any re-analysis requests are caught while analysis is in-progress.
 		if err := s.db.DeleteAnalysisRequest(ctx); err != nil {
-			slog.ErrorContext(ctx, fmt.Sprintf("Error deleting analysis request: %v", err))
-			return false
+			slog.ErrorContext(ctx, "error deleting analysis request", "error", err)
+			return err
 		}
 
 		if s.cfg.DisableAnalysis {
-			return false
+			return nil
 		}
 
 		defer measure.LogAndMeasure(slog.LevelInfo, "Graph Analysis")()
 
 		if err := RunAnalysisOperations(ctx, s.db, s.graphdb, s.cfg); err != nil {
 			if errors.Is(err, ErrAnalysisFailed) {
 				s.jobService.FailAnalyzedIngestJobs()
 
 			} else if errors.Is(err, ErrAnalysisPartiallyCompleted) {
 				s.jobService.PartialCompleteIngestJobs()
 			}
-			return false
+			return err
 		} else {
 			s.jobService.CompleteAnalyzedIngestJobs()
 
 			// This is cacheclearing. The analysis is still successful here
-			if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
-				slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err))
-			} else {
-				if err := s.cache.Reset(); err != nil {
-					slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err))
-				} else {
-					slog.Info("Cache successfully reset by datapipe daemon")
-				}
+			if flag, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil {
+				slog.ErrorContext(ctx, "error retrieving entity panel caching flag", "error", err)
+			} else if flag.Enabled {
+				if err := s.cache.Reset(); err != nil {
+					slog.Error("error while resetting the cache", "error", err)
+				} else {
+					slog.Info("cache successfully reset by datapipe daemon")
+				}
 			}
-			return true
+			return nil
 		}
 	}
-	return false
+	return nil
 }
cmd/api/src/daemons/datapipe/datapipe.go (2)

33-43: Improve interface documentation.

The comment "These methods require a fully initialized Pipeline instance including graph and db connections. Whatever is needed by the pipe to do the work" could be clearer about what constitutes "pipeline state" and why these methods are instance-dependent.

Consider a more precise comment:

-// Pipeline defines instance methods that operate on pipeline state.
-// These methods require a fully initialized Pipeline instance including
-// graph and db connections. Whatever is needed by the pipe to do the work
+// Pipeline defines the contract for data pipeline operations.
+// Implementations must maintain connections to database and graph storage
+// to execute ingestion, analysis, pruning, and deletion workflows.

56-63: Add input validation for robustness.

Consider adding validation to prevent runtime issues from invalid inputs.

 func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon {
+	if pipeline == nil {
+		panic("datapipe.NewDaemon: pipeline must not be nil")
+	}
+	if tickInterval <= 0 {
+		panic("datapipe.NewDaemon: tickInterval must be > 0")
+	}
+
 	return &Daemon{
 		db:           db,
 		tickInterval: tickInterval,
 		pipeline:     pipeline,
 		startDelay:   startDelay,
 	}
 }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

105-133: Solid status management implementation with minor improvement opportunity.

The WithDatapipeStatus wrapper correctly manages datapipe status transitions and respects the architecture where only the datapipe mutates its own status. The flagAsAnalyzedComplete logic is properly implemented.

Consider improving error handling consistency by using structured logging:

-		slog.ErrorContext(pipelineContext, "failed to reset datapipe status", "err", err)
+		slog.ErrorContext(pipelineContext, "failed to reset datapipe status", "error", err)

And:

-		slog.ErrorContext(pipelineContext, "Datapipe action failed", slog.String("err", err.Error()))
+		slog.ErrorContext(pipelineContext, "datapipe action failed", "error", err)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 81f8cf7 and 5e1c33b.

📒 Files selected for processing (3)
  • cmd/api/src/daemons/datapipe/agt.go (3 hunks)
  • cmd/api/src/daemons/datapipe/datapipe.go (1 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cmd/api/src/daemons/datapipe/agt.go
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/pipeline.go (15)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:66-77
Timestamp: 2025-05-27T17:28:28.139Z
Learning: Avoid using if/else pipeline structures when working with resources that have a Close() method, as this pattern can lead to resource leaks. Nested pipelines should be avoided in favor of more explicit resource management patterns with proper cleanup.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: cmd/api/src/analysis/ad/adcs_integration_test.go:3687-3687
Timestamp: 2025-07-10T14:33:20.317Z
Learning: When reviewing Go code, functions defined in one file within a package are accessible from other files in the same package. Before flagging missing functions as compilation errors, check if they exist in other files within the same package directory.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1509
File: packages/go/stbernard/command/license/internal/utils.go:77-86
Timestamp: 2025-05-29T18:42:23.137Z
Learning: Copyright years in license headers should typically preserve the original year when the work was created, not be updated to the current year when the license is added later. The engineering team (#bhe-engineering, @bhe-ec) should provide guidance on the copyright year policy for the BloodHound project.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: .golangci.json:3-5
Timestamp: 2025-06-25T18:23:53.103Z
Learning: In the BloodHound project's .golangci.json, errcheck linter is intentionally disabled because it was too noisy, but the severity override settings are preserved for when it gets re-enabled in the future.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
cmd/api/src/daemons/datapipe/datapipe.go (10)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T18:24:25.014Z
Learning: In BloodHound Go code, for error types in slog structured logging, prefer using slog.String("key", err.Error()) over slog.Any("key", err). The explicit string conversion with err.Error() is preferred over using slog.Any() for error types.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T17:52:33.291Z
Learning: In BloodHound Go code, prefer using explicit slog type functions like slog.Any(), slog.String(), slog.Int(), etc. over simple key-value pairs for structured logging. This provides better type safety and makes key-value pairs more visually distinct. For error types, use slog.Any("key", err) or slog.String("key", err.Error()).
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: packages/go/analysis/ad/esc16.go:65-69
Timestamp: 2025-07-10T14:31:01.781Z
Learning: In BloodHound ADCS ESC analysis functions (PostADCSESC1, PostADCSESC4, PostADCSESC6, PostADCSESC9, PostADCSESC10, PostADCSESC13, PostADCSESC16, etc.), the established pattern for handling cert template validation errors and user filtering errors is to use slog.WarnContext to log warnings and continue processing with the next template, rather than returning errors to the caller. This pattern is consistently applied across all ADCS ESC edge implementations.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: run-tests
  • GitHub Check: build-ui
  • GitHub Check: run-analysis
🔇 Additional comments (5)
cmd/api/src/daemons/datapipe/pipeline.go (3)

37-59: Well-structured pipeline encapsulation.

The BHCEPipeline struct properly encapsulates all dependencies and the constructor correctly initializes all services. This separation of concerns aligns well with the refactoring objectives.


122-137: Correct implementation for single-worker environment.

The updateJobFunc helper properly implements job counter updates. Since this runs in a single-worker environment (per previous learnings), the read-modify-write pattern is safe from race conditions.


139-142: Correct implementation for BHCE single pipeline.

Returning true is expected behavior for BHCE, which has only a single pipeline that is always active by default, unlike BHE which may have multiple pipelines.

cmd/api/src/daemons/datapipe/datapipe.go (2)

45-50: Excellent simplification of daemon responsibilities.

The simplified Daemon struct properly focuses on coordination concerns (timing, status management) while delegating business logic to the Pipeline interface. This separation of concerns improves maintainability and testability.


65-97: Well-structured daemon loop with proper status management.

The Start method correctly uses the new status constants and consistently applies the WithDatapipeStatus wrapper for all pipeline operations. The timer and ticker management is appropriate.

@superlinkx superlinkx force-pushed the kpom/BED-5905-second_refactor_pass branch from 5e1c33b to 726e11d Compare July 17, 2025 10:54
@superlinkx superlinkx force-pushed the kpom/BED-5905-second_refactor_pass branch from 726e11d to 31349b2 Compare July 17, 2025 10:56
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

56-63: Add input validation to prevent runtime panics.

The constructor should validate that critical parameters are not nil or invalid, as suggested in previous reviews.

 func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon {
+	if pipeline == nil {
+		panic("datapipe.NewDaemon: pipeline must not be nil")
+	}
+	if tickInterval <= 0 {
+		panic("datapipe.NewDaemon: tickInterval must be > 0")
+	}
+	if db == nil {
+		panic("datapipe.NewDaemon: db must not be nil")
+	}
+
 	return &Daemon{
 		db:           db,
 		tickInterval: tickInterval,
 		pipeline:     pipeline,
 		startDelay:   startDelay,
 	}
 }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)

33-43: Consider clarifying the Pipeline interface documentation.

The comment mentions "instance state" but doesn't clearly explain what this means in the context of the pipeline. The IsActive method's return signature (bool, context.Context) is unusual and would benefit from documentation explaining why it returns a context.

Consider improving the comment:

-// Pipeline defines instance methods that operate on pipeline state.
-// These methods require a fully initialized Pipeline instance including
-// graph and db connections. Whatever is needed by the pipe to do the work
+// Pipeline defines methods for datapipe operations that require access to
+// database and graph connections. Implementations should handle their own
+// initialization and resource management.

Also consider documenting the IsActive method:

 type Pipeline interface {
 	PruneData(context.Context) error
 	DeleteData(context.Context) error
 	IngestTasks(context.Context) error
 	Analyze(context.Context) error
 	Start(context.Context) error
+	// IsActive checks if the pipeline can execute the given status operation
+	// and returns the appropriate context for the operation.
 	IsActive(context.Context, model.DatapipeStatus) (bool, context.Context)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e1c33b and 726e11d.

📒 Files selected for processing (11)
  • cmd/api/src/daemons/datapipe/agt.go (3 hunks)
  • cmd/api/src/daemons/datapipe/datapipe.go (1 hunks)
  • cmd/api/src/daemons/datapipe/pipeline.go (1 hunks)
  • cmd/api/src/database/datapipestatus.go (1 hunks)
  • cmd/api/src/database/datapipestatus_integration_test.go (2 hunks)
  • cmd/api/src/database/mocks/db.go (2 hunks)
  • packages/go/analysis/ad/membership.go (1 hunks)
  • packages/go/analysis/impact/aggregator_test.go (1 hunks)
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsComboBox.tsx (1 hunks)
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsListItem.tsx (1 hunks)
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/explore-table-utils.ts (1 hunks)
✅ Files skipped from review due to trivial changes (6)
  • packages/go/analysis/impact/aggregator_test.go
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsComboBox.tsx
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsListItem.tsx
  • packages/javascript/bh-shared-ui/src/components/ExploreTable/explore-table-utils.ts
  • packages/go/analysis/ad/membership.go
  • cmd/api/src/daemons/datapipe/agt.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • cmd/api/src/daemons/datapipe/pipeline.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/database/datapipestatus.go (1)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
cmd/api/src/database/datapipestatus_integration_test.go (4)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: LawsonWillard
PR: SpecterOps/BloodHound#1595
File: cmd/api/src/api/v2/saved_queries_test.go:2594-2594
Timestamp: 2025-06-17T22:37:36.389Z
Learning: In Go table-driven tests, there's a distinction between main test function parallelism and subtest parallelism. Main test functions can safely use t.Parallel() for performance benefits, but individual subtests within table-driven tests may need to run sequentially to avoid race conditions with mocks, deferred functions, or shared resources.
cmd/api/src/daemons/datapipe/datapipe.go (10)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T18:24:25.014Z
Learning: In BloodHound Go code, for error types in slog structured logging, prefer using slog.String("key", err.Error()) over slog.Any("key", err). The explicit string conversion with err.Error() is preferred over using slog.Any() for error types.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T17:52:33.291Z
Learning: In BloodHound Go code, prefer using explicit slog type functions like slog.Any(), slog.String(), slog.Int(), etc. over simple key-value pairs for structured logging. This provides better type safety and makes key-value pairs more visually distinct. For error types, use slog.Any("key", err) or slog.String("key", err.Error()).
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: packages/go/analysis/ad/esc16.go:65-69
Timestamp: 2025-07-10T14:31:01.781Z
Learning: In BloodHound ADCS ESC analysis functions (PostADCSESC1, PostADCSESC4, PostADCSESC6, PostADCSESC9, PostADCSESC10, PostADCSESC13, PostADCSESC16, etc.), the established pattern for handling cert template validation errors and user filtering errors is to use slog.WarnContext to log warnings and continue processing with the next template, rather than returning errors to the caller. This pattern is consistently applied across all ADCS ESC edge implementations.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
cmd/api/src/database/mocks/db.go (1)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
🧬 Code Graph Analysis (3)
cmd/api/src/database/datapipestatus.go (2)
cmd/api/src/model/datapipestatus.go (2)
  • DatapipeStatus (21-21)
  • DatapipeStatusWrapper (32-37)
cmd/api/src/database/db.go (1)
  • BloodhoundDB (176-179)
cmd/api/src/database/datapipestatus_integration_test.go (1)
cmd/api/src/model/datapipestatus.go (2)
  • DatapipeStatusIdle (24-24)
  • DatapipeStatusAnalyzing (26-26)
cmd/api/src/database/mocks/db.go (1)
cmd/api/src/model/datapipestatus.go (1)
  • DatapipeStatus (21-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build BloodHound Container Image / Build and Package Container
  • GitHub Check: build-ui
  • GitHub Check: run-analysis
  • GitHub Check: run-tests
🔇 Additional comments (11)
cmd/api/src/database/datapipestatus_integration_test.go (4)

28-28: LGTM - Import addition supports better test execution flow.

Adding the assert import allows the test to continue execution after assertion failures, which is beneficial for integration tests to capture multiple issues in a single run.


40-40: LGTM - Improved test execution flow.

Switching from require.Equal to assert.Equal for status checks allows the test to continue execution after failures, potentially revealing multiple issues in a single run.

Also applies to: 47-47


43-43: LGTM - Interface changes reflect improved separation of concerns.

The simplified SetDatapipeStatus calls and explicit UpdateLastAnalysisCompleteTime call align with the refactored interface that separates status updates from analysis completion time management.

Also applies to: 51-53


48-49: LGTM - Proper verification of the new interface behavior.

The new assertions correctly verify that:

  1. Setting status to analyzing doesn't update the completion timestamp (line 49)
  2. Only explicit calls to UpdateLastAnalysisCompleteTime update that field (line 57)

This properly tests the separation of concerns in the refactored interface.

Also applies to: 57-57

cmd/api/src/database/datapipestatus.go (3)

27-28: LGTM - Interface refactoring improves API clarity.

The separation of UpdateLastAnalysisCompleteTime from SetDatapipeStatus creates a clearer API where each method has a single, well-defined responsibility.


32-35: LGTM - Clean implementation of the new method.

The UpdateLastAnalysisCompleteTime method correctly updates both timestamps using UTC time and follows the established patterns in the codebase.


37-51: LGTM - Simplified method preserves important behavior.

The refactored SetDatapipeStatus method maintains the correct behavior for updating last_analysis_run_at when starting analysis while removing the complexity of the boolean parameter.

cmd/api/src/database/mocks/db.go (2)

1978-1989: LGTM - Mock correctly updated for interface changes.

The SetDatapipeStatus mock method and recorder are correctly updated to reflect the simplified method signature, ensuring consistency with the refactored interface.


2173-2185: LGTM - New mock method properly implements the interface.

The UpdateLastAnalysisCompleteTime mock method correctly implements the new interface method and follows the established patterns in the mock file.

cmd/api/src/daemons/datapipe/datapipe.go (2)

65-97: LGTM! The Start method implementation looks correct.

The method properly:

  • Uses the correct status constants (DatapipeStatusStarting, DatapipeStatusPruning) as addressed in previous reviews
  • Handles timer cleanup with defer statements
  • Delegates operations to the pipeline interface
  • Handles context cancellation appropriately

103-119: LGTM! The status management implementation is correct.

The method properly:

  • Checks pipeline activity before proceeding
  • Sets the datapipe status appropriately
  • Logs action errors with structured logging
  • Follows the architectural pattern where only the datapipe manages its own status (as clarified in previous reviews)

@superlinkx superlinkx force-pushed the kpom/BED-5905-second_refactor_pass branch from 31349b2 to f105ea2 Compare July 17, 2025 11:26
Copy link
Contributor

@irshadaj irshadaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

kpom-specter and others added 2 commits July 18, 2025 12:11
from the daemon/state managment. pipeline.go includes all business
logic how how the steps execute. datapipe.go owns the order and timing.

refactor (datapipe): Make an extendable interface for the pipeline, and allow context to be passed intead of static

whoops (forgot to add entrypoint change

refactor (datapipe): Add a start and move the pipeline interface

refactor (datapipe): Add a IsActive() method for future extensibility; moved interface back to the consumer

fix (mislabelled status): Use the correct DatapipeStatus for the steps

Add a start parameter to delay startup analysis time

chore: Fix naming to be more descriptive

chore: Fix imports to match the new style

fix (invalid last_analysis date): Set on successful pipeline analysis. Changes the interface

fix (naming): Move some variables around for cleanliness

fix(typo): Var name pointed out by code rabbit
@superlinkx superlinkx force-pushed the kpom/BED-5905-second_refactor_pass branch from 9fb2954 to 8175b62 Compare July 18, 2025 19:29
chore: add godocs to the methods of the Pipeline interface
@superlinkx superlinkx force-pushed the kpom/BED-5905-second_refactor_pass branch from 8175b62 to 943ba49 Compare July 18, 2025 19:51
@superlinkx superlinkx merged commit 046e4ff into main Jul 18, 2025
9 checks passed
@superlinkx superlinkx deleted the kpom/BED-5905-second_refactor_pass branch July 18, 2025 21:28
@github-actions github-actions bot locked and limited conversation to collaborators Jul 18, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants