-
Notifications
You must be signed in to change notification settings - Fork 197
Make datapipe a bit more reusable #1562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change refactors the datapipe daemon by introducing a Changes
Sequence Diagram(s)sequenceDiagram
participant Entrypoint
participant Daemon
participant Pipeline
participant Database
Entrypoint->>Daemon: Initialize with Pipeline, startDelay, tickInterval, db
Daemon->>Pipeline: Start(ctx)
Daemon->>Pipeline: PruneData(ctx)
Daemon->>Pipeline: DeleteData(ctx)
Daemon->>Pipeline: IngestTasks(ctx)
Daemon->>Pipeline: Analyze(ctx)
Pipeline->>Database: (Various DB operations)
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
💤 Files with no reviewable changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
55-87
: 💡 Verification agent🧩 Analysis chain
Start
receives actx
it never uses – dead parameter or wrong context?Inside the method every DB call uses
s.ctx
, not the parameterctx
.
Either delete the parameter or prefer the incomingctx
to honour caller cancellation. Mixing both will mislead future maintainers.-func (s *Daemon) Start(ctx context.Context) { +func (s *Daemon) Start() { // simplest(or swap all
s.ctx
usages to the parameter).
Please align with whichever pattern the other daemons follow.
🏁 Script executed:
#!/bin/bash set -e # List daemon directories under cmd/api/src/daemons find cmd/api/src/daemons -maxdepth 2 -mindepth 1 -type d # Search for Start methods in all daemon packages rg -n "func .\+Start" -A2 cmd/api/src/daemonsLength of output: 288
🏁 Script executed:
#!/bin/bash set -e echo "== Start method signatures under all daemons ==" rg -n "func.*Start" -A2 cmd/api/src/daemons echo "== Daemon struct in datapipe ==" rg -n "type Daemon" -A5 cmd/api/src/daemons/datapipe echo "== s.ctx references in datapipe ==" rg -n "s\.ctx" -n cmd/api/src/daemons/datapipeLength of output: 5228
Use the incoming
ctx
inDatapipe.Start
The
Start(ctx context.Context)
method incmd/api/src/daemons/datapipe/datapipe.go
never uses itsctx
parameter—instead it always uses the struct’ss.ctx
. The GC daemon, by contrast, honours the passed‐in context. To prevent future confusion and ensure caller cancellations are respected, update all references tos.ctx
inStart
(and inWithDatapipeStatus
) to use thectx
parameter, and drop the struct field if it’s no longer needed.Locations to update:
cmd/api/src/daemons/datapipe/datapipe.go
lines 55–87WithDatapipeStatus
signature and internals in the same file—switch its uses ofs.ctx
to the incomingctx
Example diff in
Start
:func (s *Daemon) Start(ctx context.Context) { var ( datapipeLoopTimer = time.NewTimer(s.tickInterval) pruningTicker = time.NewTicker(pruningInterval) ) @@ - defer datapipeLoopTimer.Stop() - defer pruningTicker.Stop() + defer datapipeLoopTimer.Stop() + defer pruningTicker.Stop() @@ select { - case <-pruningTicker.C: + case <-pruningTicker.C: s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.clearOrphanedData) @@ - case <-datapipeLoopTimer.C: + case <-datapipeLoopTimer.C: if s.db.HasCollectedGraphDataDeletionRequest(ctx) { s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.deleteData) } @@ - case <-s.ctx.Done(): + case <-ctx.Done(): return }And update:
- func (s *Daemon) WithDatapipeStatus(status model.DatapipeStatus, f func()) + func (s *Daemon) WithDatapipeStatus(ctx context.Context, status model.DatapipeStatus, f func())
🧹 Nitpick comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (1)
83-97
: Fire-and-forget goroutine may leak
go s.orphanedFileSweeper.Clear(...)
is launched without waiting or error handling.
If the daemon exits early orctx
is cancelled the sweep can continue writing to deleted temp dirs.
Return aWaitGroup
/error or run synchronously during startup.cmd/api/src/daemons/datapipe/datapipe.go (1)
95-108
: Early-return inWithDatapipeStatus
can leave the pipe in an indeterminate stateIf
SetDatapipeStatus
fails, the function returns without attempting to reset the status toIdle
, leaving any previous status lingering forever.
Consider still deferring the reset (best-effort) even when the initial set fails, or at least logging loudly that manual intervention is required.Nit:
fmt.Sprintf
insideslog.ErrorContext
is redundant – pass structured args instead.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
cmd/api/src/daemons/datapipe/datapipe.go
(3 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/model/datapipestatus.go
(1 hunks)cmd/api/src/services/entrypoint.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-tests
- GitHub Check: run-analysis
- GitHub Check: build-ui
🔇 Additional comments (3)
cmd/api/src/model/datapipestatus.go (1)
28-29
: Ensure new status is handled throughout the codebase
DatapipeStatusPruning
is added here, but callers (UI, API responses, switch statements) must be updated to avoid falling into default branches or returning500
s for unknown status values.cmd/api/src/services/entrypoint.go (1)
108-112
: Constructor arg order looks correct, but type-check the cache parameter
datapipe.NewDaemon
now receivesgraphQueryCache
. In the previous design this parameter was a pointer. Double-check the constructor signature matches the concrete type you are passing; an implicit conversion error will only be caught at build time.cmd/api/src/daemons/datapipe/pipeline.go (1)
129-170
: Feature-flag check ignores flag value
GetFlagByKey
’s return is discarded; cache reset happens solely based on flag existence.-if _, err := s.db.GetFlagByKey(s.ctx, appcfg.FeatureEntityPanelCaching); err != nil { +if flag, err := s.db.GetFlagByKey(s.ctx, appcfg.FeatureEntityPanelCaching); err != nil { slog.ErrorContext(s.ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err)) } else { - if err := s.cache.Reset(); err != nil { + if flag.Enabled { // assuming typical boolean field + if err := s.cache.Reset(); err != nil { ... } }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
cmd/api/src/daemons/datapipe/pipeline.go (3)
62-79
:⚠️ Potential issueFix deferred instrumentation and typo.
This matches a previous review comment. The
measure.Measure(...)
is executed inside the inner defer, recording ~0 ns instead of the full purge duration, andRequestAnalysis(..., "datapie")
has a misspelling.Apply this diff to fix both issues:
-func (s *BHCEPipeline) DeleteData(ctx context.Context) { - defer func() { - _ = s.db.DeleteAnalysisRequest(ctx) - _ = s.db.RequestAnalysis(ctx, "datapie") - measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")() - }() +func (s *BHCEPipeline) DeleteData(ctx context.Context) { + defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")() + + defer func() { + _ = s.db.DeleteAnalysisRequest(ctx) + _ = s.db.RequestAnalysis(ctx, "datapipe") + }()
99-108
: 🛠️ Refactor suggestionNo error propagation from job & graphify services.
This matches a previous review comment.
ProcessTasks
,ProcessStaleIngestJobs
, andProcessFinishedIngestJobs
all discard potential errors, masking operational problems.Have these methods return an error slice and surface at least one in logs to ensure proper error visibility.
112-125
:⚠️ Potential issuePotential race updating job counters.
This matches a previous review comment. Two workers calling the closure for the same job will read-modify-write
TotalFiles
andFailedFiles
non-atomically.Prefer a single
UPDATE … SET total_files = total_files + ?
SQL statement or add DB-level aggregation to ensure atomic updates.
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)
127-168
: Well-structured analysis workflow with proper error handling.The method correctly handles the analysis workflow with appropriate error handling and early returns.
One minor suggestion: consider extracting the cache reset logic (lines 157-166) into a separate method to improve readability and testability.
+func (s *BHCEPipeline) resetCacheIfEnabled(ctx context.Context) { + if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { + slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err)) + } else { + if err := s.cache.Reset(); err != nil { + slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err)) + } else { + slog.Info("Cache successfully reset by datapipe daemon") + } + } +}Then replace lines 157-166 with:
- if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err)) - } else { - if err := s.cache.Reset(); err != nil { - slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err)) - } else { - slog.Info("Cache successfully reset by datapipe daemon") - } - } + s.resetCacheIfEnabled(ctx)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/api/src/daemons/datapipe/datapipe.go
(2 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/services/entrypoint.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cmd/api/src/services/entrypoint.go
- cmd/api/src/daemons/datapipe/datapipe.go
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-tests
🔇 Additional comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)
47-58
: LGTM! Clean constructor with proper dependency injection.The constructor properly initializes all dependencies without storing the context, which addresses the previous context ownership concern.
83-95
: LGTM! Proper async file cleanup with error handling.The method correctly handles errors and uses a goroutine for non-blocking file cleanup operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
cmd/api/src/daemons/datapipe/pipeline.go (3)
78-82
:⚠️ Potential issueDeferred instrumentation & typo break purge flow
measure.Measure(...)
is executed inside the inner defer, so it records ~0 ns instead of the full purge duration.RequestAnalysis(..., "datapie")
has a misspelling.-func (s *BHCEPipeline) DeleteData(ctx context.Context) { - defer func() { - _ = s.db.DeleteAnalysisRequest(ctx) - _ = s.db.RequestAnalysis(ctx, "datapie") - measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")() - }() +func (s *BHCEPipeline) DeleteData(ctx context.Context) { + // Record duration of the whole function. + defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")() + + defer func() { + _ = s.db.DeleteAnalysisRequest(ctx) + _ = s.db.RequestAnalysis(ctx, "datapipe") + }()
115-121
: 🛠️ Refactor suggestionNo error propagation from job & graphify services
ProcessTasks
,ProcessStaleIngestJobs
, andProcessFinishedIngestJobs
all discard potential errors, masking operational problems. Have these methods return an error slice and surface at least one in logs.
127-138
:⚠️ Potential issuePotential race updating job counters
Two workers calling the closure for the same job will read-modify-write
TotalFiles
andFailedFiles
non-atomically. Prefer a singleUPDATE … SET total_files = total_files + ?
SQL statement or add DB-level aggregation.
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)
107-107
: Uncoordinated goroutine for orphaned file cleanupThe orphaned file sweeper runs asynchronously without any coordination, error handling, or lifecycle management. Consider adding proper context handling and error reporting.
- go s.orphanedFileSweeper.Clear(ctx, expectedFiles) + go func() { + if err := s.orphanedFileSweeper.Clear(ctx, expectedFiles); err != nil { + slog.ErrorContext(ctx, "Failed to clear orphaned files", "error", err) + } + }()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/api/src/daemons/datapipe/datapipe.go
(2 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/model/datapipestatus.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-analysis
- GitHub Check: build-ui
- GitHub Check: run-tests
🔇 Additional comments (5)
cmd/api/src/model/datapipestatus.go (1)
28-29
: Good addition of granular status constantsThese new status constants support better operational visibility in the refactored datapipe.
cmd/api/src/daemons/datapipe/pipeline.go (2)
36-44
: Well-designed interface for pipeline abstractionThe Pipeline interface cleanly separates the business logic operations from daemon orchestration, improving modularity and testability.
57-68
: Constructor correctly encapsulates dependenciesThe pipeline constructor properly initializes all required services and dependencies in one place.
cmd/api/src/daemons/datapipe/datapipe.go (2)
33-37
: Excellent refactoring - clean separation of concernsThe simplified Daemon struct with Pipeline interface delegation achieves the goal of separating business logic from orchestration.
91-104
: Robust status management with proper cleanupThe WithDatapipeStatus helper provides excellent centralized status management with proper cleanup via defer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
cmd/api/src/daemons/datapipe/pipeline.go (3)
69-74
:⚠️ Potential issue
measure.Measure
still records ~0 ns – place it in an outer defer
The instrumentation is executed inside the same defer that performs follow-up DB ops, so it measures only the time taken by that defer (~instantaneous), not the whole purge.
Move the measurement to its own outer defer (or switch tomeasure.LogAndMeasure
) so the duration covers the fullDeleteData
execution.-func (s *BHCEPipeline) DeleteData(ctx context.Context) { - defer func() { - _ = s.db.DeleteAnalysisRequest(ctx) - _ = s.db.RequestAnalysis(ctx, "datapipe") - measure.Measure(slog.LevelInfo, "Purge Graph Data Completed")() - }() +func (s *BHCEPipeline) DeleteData(ctx context.Context) { + // Capture the true duration of the whole purge. + defer measure.LogAndMeasure(slog.LevelInfo, "Purge Graph Data")() + + defer func() { + _ = s.db.DeleteAnalysisRequest(ctx) + _ = s.db.RequestAnalysis(ctx, "datapipe") + }()
105-113
:⚠️ Potential issueStill discarding errors from graphify & job services
ProcessTasks
,ProcessStaleIngestJobs
, andProcessFinishedIngestJobs
may return errors that are silently ignored, masking operational issues and complicating troubleshooting.
Surface these errors (collect and log, or bubble up) so that failures become visible.
118-129
:⚠️ Potential issueConcurrent counter updates remain non-atomic
The read-modify-write ofTotalFiles
/FailedFiles
can race when multiple workers update the same job. Use a single atomic SQL update (e.g.UPDATE … SET total_files = total_files + ?, …
) or DB-level aggregation instead of loading and rewriting the struct.cmd/api/src/daemons/datapipe/datapipe.go (1)
71-78
:⚠️ Potential issueIncorrect status constants (
Purging
used for Start/Prune)
DatapipeStatusStarting
should wrap the initialpipeline.Start
, andDatapipeStatusPruning
should wrappipeline.PruneData
to leverage the new status codes introduced in this PR.- s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.Start) + s.WithDatapipeStatus(ctx, model.DatapipeStatusStarting, s.pipeline.Start) ... - s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.PruneData) + s.WithDatapipeStatus(ctx, model.DatapipeStatusPruning, s.pipeline.PruneData)
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/pipeline.go (1)
167-175
: Feature flag value ignored when resetting cache
GetFlagByKey
is used only to check for an error; the actual flag value (enabled/disabled) is discarded, so the cache is reset even when the feature is turned off. Retrieve the flag and respect itsEnabled
state.- if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { + if flag, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err)) } else { - if err := s.cache.Reset(); err != nil { + if !flag.Enabled { + slog.Info("Entity panel caching disabled – skipping cache reset") + } else if err := s.cache.Reset(); err != nil { ...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
cmd/api/src/daemons/datapipe/datapipe.go
(2 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
cmd/api/src/daemons/datapipe/pipeline.go (7)
cmd/api/src/daemons/datapipe/cleanup.go (3)
OrphanFileSweeper
(55-59)NewOrphanFileSweeper
(61-67)NewOSFileOperations
(42-44)cmd/api/src/services/job/service.go (2)
JobService
(42-45)NewJobService
(47-52)cmd/api/src/services/graphify/service.go (2)
GraphifyService
(36-42)NewGraphifyService
(44-52)packages/go/bhlog/measure/measure.go (2)
Measure
(47-56)LogAndMeasure
(75-90)cmd/api/src/services/graphify/tasks.go (1)
UpdateJobFunc
(41-41)cmd/api/src/daemons/datapipe/analysis.go (3)
RunAnalysisOperations
(42-118)ErrAnalysisFailed
(38-38)ErrAnalysisPartiallyCompleted
(39-39)cmd/api/src/model/appcfg/flag.go (1)
FeatureEntityPanelCaching
(32-32)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-tests
- GitHub Check: build-ui
- GitHub Check: run-analysis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
33-36
: Comment/type name mismatch – tidy up the doc-stringThe comment still refers to
PipelineInterface
, but the exported type isPipeline
.
Either rename the comment or the type to keep them in sync and avoid confusion for readers and tooling that mine comments.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
cmd/api/src/daemons/datapipe/datapipe.go
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-analysis
- GitHub Check: run-tests
🔇 Additional comments (3)
cmd/api/src/daemons/datapipe/datapipe.go (3)
45-47
: Storing thePipeline
as an interface value looks goodMoving from a concrete value copy to an interface solves the state-copy issue flagged earlier. Nothing further here.
71-71
: Correct status constants ➜ ✅
DatapipeStatusStarting
andDatapipeStatusPruning
are now used in the right places — thanks for addressing the earlier feedback.Also applies to: 77-77
80-82
: Missing error handling around deletion-request checkIf
db.HasCollectedGraphDataDeletionRequest
can return an error, it is silently ignored here, potentially masking DB connectivity issues.
Confirm the helper’s signature; if it returns(bool, error)
consider:if ok, err := s.db.HasCollectedGraphDataDeletionRequest(ctx); err != nil { slog.ErrorContext(ctx, "failed querying deletion request", "error", err) } else if ok { s.WithDatapipeStatus(ctx, model.DatapipeStatusPurging, s.pipeline.DeleteData) }
e319869
to
96fca6e
Compare
96fca6e
to
e73f97b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
56-63
: Add input validation for robustness.The constructor should validate its inputs to prevent runtime issues. Consider adding checks for nil pipeline and invalid time intervals.
func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon { + if pipeline == nil { + panic("datapipe.NewDaemon: pipeline must not be nil") + } + if tickInterval <= 0 { + panic("datapipe.NewDaemon: tickInterval must be > 0") + } + if startDelay < 0 { + panic("datapipe.NewDaemon: startDelay must be >= 0") + } + return &Daemon{ db: db, tickInterval: tickInterval, pipeline: pipeline, startDelay: startDelay, } }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
33-43
: Improve the interface comment for clarity.The comment about "instance state" and "whatever is needed by the pipe to do the work" could be clearer. Consider revising to better explain the interface's purpose and the boolean return values.
-// Pipeline defines instance methods that operate on pipeline state. -// These methods require a fully initialized Pipeline instance including -// graph and db connections. Whatever is neeeded by the pipe to do the work +// Pipeline defines methods for executing datapipe operations. +// These methods require a fully initialized Pipeline instance with all +// necessary dependencies (graph DB, cache, etc.). Each method returns +// true on successful completion, false otherwise.Note: There's also a typo "neeeded" that should be "needed" in the current comment.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/api/src/daemons/datapipe/datapipe.go
(1 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/model/datapipestatus.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cmd/api/src/model/datapipestatus.go
- cmd/api/src/daemons/datapipe/pipeline.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/datapipe.go (5)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
🔇 Additional comments (3)
cmd/api/src/daemons/datapipe/datapipe.go (3)
45-50
: Good refactoring - clean and focused struct design.The simplified Daemon struct effectively separates concerns by delegating pipeline operations to the Pipeline interface while maintaining only the essential orchestration fields.
65-97
: Excellent refactoring - clean separation of concerns.The Start method effectively uses the Pipeline interface and centralizes status management through WithDatapipeStatus. The appropriate status constants are used for each operation, addressing previous review feedback.
103-127
: Good centralization of status management.The WithDatapipeStatus method effectively centralizes status management around pipeline operations, ensuring consistent status transitions and proper context usage. The design aligns well with the established architecture where only the datapipe manages its own status.
e73f97b
to
a34ebb3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)
77-86
: Fix the else-if chain that prevents complete cleanup on failuresThe else-if structure causes cleanup to stop after the first failure, potentially leaving the system in an inconsistent state. If
CancelAllIngestJobs
fails, the subsequent cleanup steps (DeleteAllIngestTasks
andDeleteCollectedGraphData
) won't execute.Execute each cleanup step independently to ensure all operations are attempted:
-if err := s.db.CancelAllIngestJobs(ctx); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error cancelling jobs during data deletion: %v", err)) - return false -} else if err := s.db.DeleteAllIngestTasks(ctx); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error deleting ingest tasks during data deletion: %v", err)) - return false -} else if err := DeleteCollectedGraphData(ctx, s.graphdb); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error deleting graph data: %v", err)) - return false -} +var hasErrors bool +if err := s.db.CancelAllIngestJobs(ctx); err != nil { + slog.ErrorContext(ctx, "error cancelling ingest jobs", "error", err) + hasErrors = true +} +if err := s.db.DeleteAllIngestTasks(ctx); err != nil { + slog.ErrorContext(ctx, "error deleting ingest tasks", "error", err) + hasErrors = true +} +if err := DeleteCollectedGraphData(ctx, s.graphdb); err != nil { + slog.ErrorContext(ctx, "error deleting graph data", "error", err) + hasErrors = true +} +return !hasErrors
112-118
: Add error handling for service method callsThe calls to
ProcessTasks
,ProcessStaleIngestJobs
, andProcessFinishedIngestJobs
discard potential errors, which could mask operational problems and make debugging difficult.Consider modifying these service methods to return errors and handle them appropriately:
-// Ingest all available ingest tasks -s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)) - -// Manage time-out state progression for ingest jobs -s.jobService.ProcessStaleIngestJobs() - -// Manage nominal state transitions for ingest jobs -s.jobService.ProcessFinishedIngestJobs() +// Ingest all available ingest tasks +if err := s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)); err != nil { + slog.ErrorContext(ctx, "error processing ingest tasks", "error", err) +} + +// Manage time-out state progression for ingest jobs +if err := s.jobService.ProcessStaleIngestJobs(); err != nil { + slog.ErrorContext(ctx, "error processing stale ingest jobs", "error", err) +} + +// Manage nominal state transitions for ingest jobs +if err := s.jobService.ProcessFinishedIngestJobs(); err != nil { + slog.ErrorContext(ctx, "error processing finished ingest jobs", "error", err) +}
🧹 Nitpick comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)
78-84
: Improve error logging consistencyThe error logging uses
fmt.Sprintf
with string interpolation, which is less efficient and doesn't leverage structured logging benefits. Consider using structured logging fields for better observability.-slog.ErrorContext(ctx, fmt.Sprintf("Error cancelling jobs during data deletion: %v", err)) +slog.ErrorContext(ctx, "error cancelling jobs during data deletion", "error", err)Apply similar changes to the other error logging statements in this method.
103-103
: Consider adding error handling for goroutineThe orphaned file sweeper is launched as a goroutine without error handling. While this may be acceptable for background cleanup, consider whether errors should be logged or handled.
-go s.orphanedFileSweeper.Clear(ctx, expectedFiles) +go func() { + if err := s.orphanedFileSweeper.Clear(ctx, expectedFiles); err != nil { + slog.ErrorContext(ctx, "error clearing orphaned files", "error", err) + } +}()This assumes the
Clear
method returns an error. If it doesn't, you may want to consider modifying it to return errors for better observability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
cmd/api/src/daemons/datapipe/datapipe.go
(1 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/model/datapipestatus.go
(1 hunks)cmd/api/src/services/entrypoint.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- cmd/api/src/model/datapipestatus.go
- cmd/api/src/services/entrypoint.go
- cmd/api/src/daemons/datapipe/datapipe.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/pipeline.go (9)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:66-77
Timestamp: 2025-05-27T17:28:28.139Z
Learning: Avoid using if/else pipeline structures when working with resources that have a Close() method, as this pattern can lead to resource leaks. Nested pipelines should be avoided in favor of more explicit resource management patterns with proper cleanup.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-tests
- GitHub Check: build-ui
- GitHub Check: run-analysis
🔇 Additional comments (2)
cmd/api/src/daemons/datapipe/pipeline.go (2)
37-46
: LGTM! Well-structured pipeline with proper dependency injectionThe
BHCEPipeline
struct is well-designed with clear separation of concerns and proper dependency injection. The struct contains all necessary dependencies for pipeline operations without storing long-lived contexts.
48-59
: LGTM! Constructor properly addresses context ownershipThe constructor correctly takes a context parameter for initialization but doesn't store it in the struct, addressing the context ownership concern from previous reviews. Each method will receive its own context parameter.
398882c
to
cdf1484
Compare
7297f39
to
81f8cf7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (5)
cmd/api/src/daemons/datapipe/pipeline.go (3)
1-1
: Update copyright year to current year.Based on past review feedback, the copyright year should be updated to reflect the current year.
110-120
: Fix return type and add error propagation.Two issues need addressing:
- Return type mismatch: Function returns
bool
but signature declareserror
- Missing error propagation: Service calls discard potential errors
Apply this diff:
-func (s *BHCEPipeline) IngestTasks(ctx context.Context) error { +func (s *BHCEPipeline) IngestTasks(ctx context.Context) error { // Ingest all available ingest tasks - s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)) + if err := s.graphifyService.ProcessTasks(updateJobFunc(ctx, s.db)); err != nil { + slog.ErrorContext(ctx, "error processing tasks", "error", err) + return err + } // Manage time-out state progression for ingest jobs - s.jobService.ProcessStaleIngestJobs() + if err := s.jobService.ProcessStaleIngestJobs(); err != nil { + slog.ErrorContext(ctx, "error processing stale ingest jobs", "error", err) + return err + } // Manage nominal state transitions for ingest jobs - s.jobService.ProcessFinishedIngestJobs() - return true + if err := s.jobService.ProcessFinishedIngestJobs(); err != nil { + slog.ErrorContext(ctx, "error processing finished ingest jobs", "error", err) + return err + } + return nil }
144-190
: Fix return type and cache reset logic.Two critical issues:
- Return type mismatch: Function returns
bool
but signature declareserror
- Incorrect cache reset logic: Cache resets when flag retrieval fails instead of when flag is enabled
The cache reset logic contradicts the learning that cache should only reset when flag retrieval succeeds AND flag is enabled.
Apply this diff:
func (s *BHCEPipeline) Analyze(ctx context.Context) error { // If there are completed ingest jobs or if analysis was user-requested, perform analysis. if hasJobsWaitingForAnalysis, err := s.jobService.HasIngestJobsWaitingForAnalysis(); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Failed looking up jobs waiting for analysis: %v", err)) - return false + slog.ErrorContext(ctx, "failed looking up jobs waiting for analysis", "error", err) + return err } else if hasJobsWaitingForAnalysis || s.db.HasAnalysisRequest(ctx) { // Ensure that the user-requested analysis switch is deleted. This is done at the beginning of the // function so that any re-analysis requests are caught while analysis is in-progress. if err := s.db.DeleteAnalysisRequest(ctx); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error deleting analysis request: %v", err)) - return false + slog.ErrorContext(ctx, "error deleting analysis request", "error", err) + return err } if s.cfg.DisableAnalysis { - return false + return nil } defer measure.LogAndMeasure(slog.LevelInfo, "Graph Analysis")() if err := RunAnalysisOperations(ctx, s.db, s.graphdb, s.cfg); err != nil { if errors.Is(err, ErrAnalysisFailed) { s.jobService.FailAnalyzedIngestJobs() } else if errors.Is(err, ErrAnalysisPartiallyCompleted) { s.jobService.PartialCompleteIngestJobs() } - return false + return err } else { s.jobService.CompleteAnalyzedIngestJobs() // This is cacheclearing. The analysis is still successful here - if _, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("Error retrieving entity panel caching flag: %v", err)) - } else { - if err := s.cache.Reset(); err != nil { - slog.Error(fmt.Sprintf("Error while resetting the cache: %v", err)) - } else { - slog.Info("Cache successfully reset by datapipe daemon") - } + if flag, err := s.db.GetFlagByKey(ctx, appcfg.FeatureEntityPanelCaching); err != nil { + slog.ErrorContext(ctx, "error retrieving entity panel caching flag", "error", err) + } else if flag.Enabled { + if err := s.cache.Reset(); err != nil { + slog.Error("error while resetting the cache", "error", err) + } else { + slog.Info("cache successfully reset by datapipe daemon") + } } - return true + return nil } } - return false + return nil }cmd/api/src/daemons/datapipe/datapipe.go (2)
33-43
: Improve interface documentation.The comment "These methods require a fully initialized Pipeline instance including graph and db connections. Whatever is needed by the pipe to do the work" could be clearer about what constitutes "pipeline state" and why these methods are instance-dependent.
Consider a more precise comment:
-// Pipeline defines instance methods that operate on pipeline state. -// These methods require a fully initialized Pipeline instance including -// graph and db connections. Whatever is needed by the pipe to do the work +// Pipeline defines the contract for data pipeline operations. +// Implementations must maintain connections to database and graph storage +// to execute ingestion, analysis, pruning, and deletion workflows.
56-63
: Add input validation for robustness.Consider adding validation to prevent runtime issues from invalid inputs.
func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon { + if pipeline == nil { + panic("datapipe.NewDaemon: pipeline must not be nil") + } + if tickInterval <= 0 { + panic("datapipe.NewDaemon: tickInterval must be > 0") + } + return &Daemon{ db: db, tickInterval: tickInterval, pipeline: pipeline, startDelay: startDelay, } }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
105-133
: Solid status management implementation with minor improvement opportunity.The WithDatapipeStatus wrapper correctly manages datapipe status transitions and respects the architecture where only the datapipe mutates its own status. The flagAsAnalyzedComplete logic is properly implemented.
Consider improving error handling consistency by using structured logging:
- slog.ErrorContext(pipelineContext, "failed to reset datapipe status", "err", err) + slog.ErrorContext(pipelineContext, "failed to reset datapipe status", "error", err)And:
- slog.ErrorContext(pipelineContext, "Datapipe action failed", slog.String("err", err.Error())) + slog.ErrorContext(pipelineContext, "datapipe action failed", "error", err)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/api/src/daemons/datapipe/agt.go
(3 hunks)cmd/api/src/daemons/datapipe/datapipe.go
(1 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- cmd/api/src/daemons/datapipe/agt.go
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/daemons/datapipe/pipeline.go (15)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:114-118
Timestamp: 2025-05-27T17:28:46.899Z
Learning: When processing files in a loop where resources need to be cleaned up, extract the file processing into a separate function that uses `defer` for resource management rather than trying to handle cleanup in the loop body. This ensures proper resource cleanup while maintaining error handling robustness.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:66-77
Timestamp: 2025-05-27T17:28:28.139Z
Learning: Avoid using if/else pipeline structures when working with resources that have a Close() method, as this pattern can lead to resource leaks. Nested pipelines should be avoided in favor of more explicit resource management patterns with proper cleanup.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: cmd/api/src/analysis/ad/adcs_integration_test.go:3687-3687
Timestamp: 2025-07-10T14:33:20.317Z
Learning: When reviewing Go code, functions defined in one file within a package are accessible from other files in the same package. Before flagging missing functions as compilation errors, check if they exist in other files within the same package directory.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1509
File: packages/go/stbernard/command/license/internal/utils.go:77-86
Timestamp: 2025-05-29T18:42:23.137Z
Learning: Copyright years in license headers should typically preserve the original year when the work was created, not be updated to the current year when the license is added later. The engineering team (#bhe-engineering, @bhe-ec) should provide guidance on the copyright year policy for the BloodHound project.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: .golangci.json:3-5
Timestamp: 2025-06-25T18:23:53.103Z
Learning: In the BloodHound project's .golangci.json, errcheck linter is intentionally disabled because it was too noisy, but the severity override settings are preserved for when it gets re-enabled in the future.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
cmd/api/src/daemons/datapipe/datapipe.go (10)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T18:24:25.014Z
Learning: In BloodHound Go code, for error types in slog structured logging, prefer using slog.String("key", err.Error()) over slog.Any("key", err). The explicit string conversion with err.Error() is preferred over using slog.Any() for error types.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T17:52:33.291Z
Learning: In BloodHound Go code, prefer using explicit slog type functions like slog.Any(), slog.String(), slog.Int(), etc. over simple key-value pairs for structured logging. This provides better type safety and makes key-value pairs more visually distinct. For error types, use slog.Any("key", err) or slog.String("key", err.Error()).
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: packages/go/analysis/ad/esc16.go:65-69
Timestamp: 2025-07-10T14:31:01.781Z
Learning: In BloodHound ADCS ESC analysis functions (PostADCSESC1, PostADCSESC4, PostADCSESC6, PostADCSESC9, PostADCSESC10, PostADCSESC13, PostADCSESC16, etc.), the established pattern for handling cert template validation errors and user filtering errors is to use slog.WarnContext to log warnings and continue processing with the next template, rather than returning errors to the caller. This pattern is consistently applied across all ADCS ESC edge implementations.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: run-tests
- GitHub Check: build-ui
- GitHub Check: run-analysis
🔇 Additional comments (5)
cmd/api/src/daemons/datapipe/pipeline.go (3)
37-59
: Well-structured pipeline encapsulation.The BHCEPipeline struct properly encapsulates all dependencies and the constructor correctly initializes all services. This separation of concerns aligns well with the refactoring objectives.
122-137
: Correct implementation for single-worker environment.The updateJobFunc helper properly implements job counter updates. Since this runs in a single-worker environment (per previous learnings), the read-modify-write pattern is safe from race conditions.
139-142
: Correct implementation for BHCE single pipeline.Returning
true
is expected behavior for BHCE, which has only a single pipeline that is always active by default, unlike BHE which may have multiple pipelines.cmd/api/src/daemons/datapipe/datapipe.go (2)
45-50
: Excellent simplification of daemon responsibilities.The simplified Daemon struct properly focuses on coordination concerns (timing, status management) while delegating business logic to the Pipeline interface. This separation of concerns improves maintainability and testability.
65-97
: Well-structured daemon loop with proper status management.The Start method correctly uses the new status constants and consistently applies the WithDatapipeStatus wrapper for all pipeline operations. The timer and ticker management is appropriate.
5e1c33b
to
726e11d
Compare
726e11d
to
31349b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
56-63
: Add input validation to prevent runtime panics.The constructor should validate that critical parameters are not nil or invalid, as suggested in previous reviews.
func NewDaemon(pipeline Pipeline, startDelay time.Duration, tickInterval time.Duration, db database.Database) *Daemon { + if pipeline == nil { + panic("datapipe.NewDaemon: pipeline must not be nil") + } + if tickInterval <= 0 { + panic("datapipe.NewDaemon: tickInterval must be > 0") + } + if db == nil { + panic("datapipe.NewDaemon: db must not be nil") + } + return &Daemon{ db: db, tickInterval: tickInterval, pipeline: pipeline, startDelay: startDelay, } }
🧹 Nitpick comments (1)
cmd/api/src/daemons/datapipe/datapipe.go (1)
33-43
: Consider clarifying the Pipeline interface documentation.The comment mentions "instance state" but doesn't clearly explain what this means in the context of the pipeline. The
IsActive
method's return signature(bool, context.Context)
is unusual and would benefit from documentation explaining why it returns a context.Consider improving the comment:
-// Pipeline defines instance methods that operate on pipeline state. -// These methods require a fully initialized Pipeline instance including -// graph and db connections. Whatever is needed by the pipe to do the work +// Pipeline defines methods for datapipe operations that require access to +// database and graph connections. Implementations should handle their own +// initialization and resource management.Also consider documenting the
IsActive
method:type Pipeline interface { PruneData(context.Context) error DeleteData(context.Context) error IngestTasks(context.Context) error Analyze(context.Context) error Start(context.Context) error + // IsActive checks if the pipeline can execute the given status operation + // and returns the appropriate context for the operation. IsActive(context.Context, model.DatapipeStatus) (bool, context.Context) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
cmd/api/src/daemons/datapipe/agt.go
(3 hunks)cmd/api/src/daemons/datapipe/datapipe.go
(1 hunks)cmd/api/src/daemons/datapipe/pipeline.go
(1 hunks)cmd/api/src/database/datapipestatus.go
(1 hunks)cmd/api/src/database/datapipestatus_integration_test.go
(2 hunks)cmd/api/src/database/mocks/db.go
(2 hunks)packages/go/analysis/ad/membership.go
(1 hunks)packages/go/analysis/impact/aggregator_test.go
(1 hunks)packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsComboBox.tsx
(1 hunks)packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsListItem.tsx
(1 hunks)packages/javascript/bh-shared-ui/src/components/ExploreTable/explore-table-utils.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (6)
- packages/go/analysis/impact/aggregator_test.go
- packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsComboBox.tsx
- packages/javascript/bh-shared-ui/src/components/ExploreTable/ManageColumnsComboBox/ManageColumnsListItem.tsx
- packages/javascript/bh-shared-ui/src/components/ExploreTable/explore-table-utils.ts
- packages/go/analysis/ad/membership.go
- cmd/api/src/daemons/datapipe/agt.go
🚧 Files skipped from review as they are similar to previous changes (1)
- cmd/api/src/daemons/datapipe/pipeline.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: benwaples
PR: SpecterOps/BloodHound#1675
File: packages/javascript/bh-shared-ui/src/hooks/useZoneParams/useZoneQueryParams.test.tsx:84-86
Timestamp: 2025-07-14T19:28:45.566Z
Learning: In backmerge PRs, it's appropriate to defer non-critical issues (like missing await in tests) to avoid complicating the merge process. The focus should be on the merge itself rather than additional fixes.
Learnt from: elikmiller
PR: SpecterOps/BloodHound#1563
File: packages/go/graphschema/azure/azure.go:24-24
Timestamp: 2025-06-06T23:12:14.181Z
Learning: In BloodHound, files in packages/go/graphschema/*/`*.go` are generated from CUE schemas. When `just prepare-for-codereview` is run, it triggers code generation that may automatically add import aliases or other formatting changes. These changes are legitimate outputs of the generation process, not manual edits that would be overwritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
cmd/api/src/database/datapipestatus.go (1)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
cmd/api/src/database/datapipestatus_integration_test.go (4)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: LawsonWillard
PR: SpecterOps/BloodHound#1595
File: cmd/api/src/api/v2/saved_queries_test.go:2594-2594
Timestamp: 2025-06-17T22:37:36.389Z
Learning: In Go table-driven tests, there's a distinction between main test function parallelism and subtest parallelism. Main test functions can safely use t.Parallel() for performance benefits, but individual subtests within table-driven tests may need to run sequentially to avoid race conditions with mocks, deferred functions, or shared resources.
cmd/api/src/daemons/datapipe/datapipe.go (10)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:115-130
Timestamp: 2025-06-10T21:46:07.011Z
Learning: The datapipe pipeline currently executes in a single-worker environment, so concurrency-related race conditions (e.g., when updating ingest-job counters in updateJobFunc) are not a concern.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/graphify/tasks.go:68-80
Timestamp: 2025-05-27T17:28:48.241Z
Learning: In Go, avoid using pipelined if/else statements when creating resources that need cleanup (like files, network connections, etc.). The defer semantics work best when resources are created and immediately deferred for cleanup in the same scope. Instead, extract resource management into separate functions where proper defer patterns can be used effectively.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/pipeline.go:177-185
Timestamp: 2025-07-14T22:41:57.442Z
Learning: In BloodHound's datapipe pipeline, when checking feature flags before performing actions (like cache reset), the correct pattern is to verify both that the flag retrieval succeeds AND that the flag is enabled. Performing the action when flag retrieval fails is incorrect logic.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/job/jobs_test.go:19-143
Timestamp: 2025-05-27T16:58:33.295Z
Learning: Tests in cmd/api/src/services/job/jobs_test.go have been found to be flaky in the past and are due for rewrite. They should be skipped with t.Skip() until they can be properly rewritten.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T18:24:25.014Z
Learning: In BloodHound Go code, for error types in slog structured logging, prefer using slog.String("key", err.Error()) over slog.Any("key", err). The explicit string conversion with err.Error() is preferred over using slog.Any() for error types.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1606
File: cmd/api/src/analysis/azure/post.go:33-35
Timestamp: 2025-06-25T17:52:33.291Z
Learning: In BloodHound Go code, prefer using explicit slog type functions like slog.Any(), slog.String(), slog.Int(), etc. over simple key-value pairs for structured logging. This provides better type safety and makes key-value pairs more visually distinct. For error types, use slog.Any("key", err) or slog.String("key", err.Error()).
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:70-84
Timestamp: 2025-05-27T17:28:37.963Z
Learning: The current pattern in upload.go of passing both io.ReadCloser and *os.File to validation functions creates unclear resource ownership and makes lifecycle management error-prone. The user identified this as a "logical mess" that should be refactored to separate validation concerns from file I/O management.
Learnt from: JonasBK
PR: SpecterOps/BloodHound#1671
File: packages/go/analysis/ad/esc16.go:65-69
Timestamp: 2025-07-10T14:31:01.781Z
Learning: In BloodHound ADCS ESC analysis functions (PostADCSESC1, PostADCSESC4, PostADCSESC6, PostADCSESC9, PostADCSESC10, PostADCSESC13, PostADCSESC16, etc.), the established pattern for handling cert template validation errors and user filtering errors is to use slog.WarnContext to log warnings and continue processing with the next template, rather than returning errors to the caller. This pattern is consistently applied across all ADCS ESC edge implementations.
Learnt from: superlinkx
PR: SpecterOps/BloodHound#1503
File: cmd/api/src/services/upload/upload.go:36-67
Timestamp: 2025-05-27T17:30:55.548Z
Learning: In the BloodHound codebase, SaveIngestFile function should only handle file operations (save + validate) and should not populate RequestID/JobID fields, as these are properly handled by the API handler layer which has access to request context and URL parameters. The API handler ProcessIngestTask already has defer request.Body.Close() but adding it to SaveIngestFile is still safe since Close() is idempotent.
cmd/api/src/database/mocks/db.go (1)
Learnt from: kpom-specter
PR: SpecterOps/BloodHound#1562
File: cmd/api/src/daemons/datapipe/datapipe.go:100-120
Timestamp: 2025-06-11T20:49:35.177Z
Learning: In BloodHound's datapipe architecture, only the datapipe (e.g., `WithDatapipeStatus`) is responsible for mutating the datapipe status; pipeline actions themselves do not alter the status and instead communicate any job-level state elsewhere.
🧬 Code Graph Analysis (3)
cmd/api/src/database/datapipestatus.go (2)
cmd/api/src/model/datapipestatus.go (2)
DatapipeStatus
(21-21)DatapipeStatusWrapper
(32-37)cmd/api/src/database/db.go (1)
BloodhoundDB
(176-179)
cmd/api/src/database/datapipestatus_integration_test.go (1)
cmd/api/src/model/datapipestatus.go (2)
DatapipeStatusIdle
(24-24)DatapipeStatusAnalyzing
(26-26)
cmd/api/src/database/mocks/db.go (1)
cmd/api/src/model/datapipestatus.go (1)
DatapipeStatus
(21-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build BloodHound Container Image / Build and Package Container
- GitHub Check: build-ui
- GitHub Check: run-analysis
- GitHub Check: run-tests
🔇 Additional comments (11)
cmd/api/src/database/datapipestatus_integration_test.go (4)
28-28
: LGTM - Import addition supports better test execution flow.Adding the
assert
import allows the test to continue execution after assertion failures, which is beneficial for integration tests to capture multiple issues in a single run.
40-40
: LGTM - Improved test execution flow.Switching from
require.Equal
toassert.Equal
for status checks allows the test to continue execution after failures, potentially revealing multiple issues in a single run.Also applies to: 47-47
43-43
: LGTM - Interface changes reflect improved separation of concerns.The simplified
SetDatapipeStatus
calls and explicitUpdateLastAnalysisCompleteTime
call align with the refactored interface that separates status updates from analysis completion time management.Also applies to: 51-53
48-49
: LGTM - Proper verification of the new interface behavior.The new assertions correctly verify that:
- Setting status to analyzing doesn't update the completion timestamp (line 49)
- Only explicit calls to
UpdateLastAnalysisCompleteTime
update that field (line 57)This properly tests the separation of concerns in the refactored interface.
Also applies to: 57-57
cmd/api/src/database/datapipestatus.go (3)
27-28
: LGTM - Interface refactoring improves API clarity.The separation of
UpdateLastAnalysisCompleteTime
fromSetDatapipeStatus
creates a clearer API where each method has a single, well-defined responsibility.
32-35
: LGTM - Clean implementation of the new method.The
UpdateLastAnalysisCompleteTime
method correctly updates both timestamps using UTC time and follows the established patterns in the codebase.
37-51
: LGTM - Simplified method preserves important behavior.The refactored
SetDatapipeStatus
method maintains the correct behavior for updatinglast_analysis_run_at
when starting analysis while removing the complexity of the boolean parameter.cmd/api/src/database/mocks/db.go (2)
1978-1989
: LGTM - Mock correctly updated for interface changes.The
SetDatapipeStatus
mock method and recorder are correctly updated to reflect the simplified method signature, ensuring consistency with the refactored interface.
2173-2185
: LGTM - New mock method properly implements the interface.The
UpdateLastAnalysisCompleteTime
mock method correctly implements the new interface method and follows the established patterns in the mock file.cmd/api/src/daemons/datapipe/datapipe.go (2)
65-97
: LGTM! The Start method implementation looks correct.The method properly:
- Uses the correct status constants (DatapipeStatusStarting, DatapipeStatusPruning) as addressed in previous reviews
- Handles timer cleanup with defer statements
- Delegates operations to the pipeline interface
- Handles context cancellation appropriately
103-119
: LGTM! The status management implementation is correct.The method properly:
- Checks pipeline activity before proceeding
- Sets the datapipe status appropriately
- Logs action errors with structured logging
- Follows the architectural pattern where only the datapipe manages its own status (as clarified in previous reviews)
31349b2
to
f105ea2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
from the daemon/state managment. pipeline.go includes all business logic how how the steps execute. datapipe.go owns the order and timing. refactor (datapipe): Make an extendable interface for the pipeline, and allow context to be passed intead of static whoops (forgot to add entrypoint change refactor (datapipe): Add a start and move the pipeline interface refactor (datapipe): Add a IsActive() method for future extensibility; moved interface back to the consumer fix (mislabelled status): Use the correct DatapipeStatus for the steps Add a start parameter to delay startup analysis time chore: Fix naming to be more descriptive chore: Fix imports to match the new style fix (invalid last_analysis date): Set on successful pipeline analysis. Changes the interface fix (naming): Move some variables around for cleanliness fix(typo): Var name pointed out by code rabbit
9fb2954
to
8175b62
Compare
chore: add godocs to the methods of the Pipeline interface
8175b62
to
943ba49
Compare
Description
Refactor(datapipe): Move the datapipe logic that is the pipeline awayfrom the daemon/state managment. pipeline.go includes all business logic how how the steps execute. datapipe.go owns the order and timing.
Motivation and Context
Resolves BED-5905
How Has This Been Tested?
Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc.
Screenshots (optional):
Types of changes
Checklist:
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests