Skip to content

Commit e79b68e

Browse files
committed
Enhance scanning pipeline with concurrency improvements and detailed logging
1 parent c7a0222 commit e79b68e

File tree

4 files changed

+85
-28
lines changed

4 files changed

+85
-28
lines changed

event_forwarding.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type EventForwarder struct {
2424
currentFilePath string
2525
lastRotation time.Time
2626
fileMutex sync.Mutex
27+
wg sync.WaitGroup
2728
}
2829

2930
// FastFinderEvent represents an event to be forwarded
@@ -133,6 +134,7 @@ func InitializeEventForwarding(config *ForwardingConfig) error {
133134
httpClient: httpClient,
134135
}
135136

137+
eventForwarder.wg.Add(1)
136138
// Start the forwarding goroutine
137139
go eventForwarder.forwardingLoop()
138140

@@ -243,6 +245,7 @@ func (ef *EventForwarder) shouldForwardEvent(eventType, severity string) bool {
243245

244246
// forwardingLoop runs the periodic event forwarding
245247
func (ef *EventForwarder) forwardingLoop() {
248+
defer ef.wg.Done()
246249
ticker := time.NewTicker(time.Duration(ef.config.FlushTime) * time.Second)
247250
defer ticker.Stop()
248251

@@ -466,11 +469,14 @@ func (ef *EventForwarder) cleanOldFiles() {
466469
// StopEventForwarding stops the event forwarding system
467470
func StopEventForwarding() {
468471
if eventForwarder != nil {
472+
close(eventForwarder.stopChannel)
473+
eventForwarder.wg.Wait()
474+
469475
// Close current file if open
470476
if eventForwarder.currentFile != nil {
471477
eventForwarder.currentFile.Close()
478+
eventForwarder.currentFile = nil
472479
}
473-
close(eventForwarder.stopChannel)
474480
eventForwarder = nil
475481
}
476482
}

finder.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func FindInFilesContent(files *[]string, patterns []string, rules *yara.Rules, h
6868
// handle file content and checksum match
6969
for _, m := range CheckFileChecksumAndContent(path, b, hashList, patterns) {
7070
if !Contains(matchingFiles, m) {
71-
LogMessage(LOG_ALERT, "(ALERT)", "File content match on:", path)
7271
matchingFiles = append(matchingFiles, m)
7372
}
7473
}
@@ -117,7 +116,6 @@ func FindInFilesContent(files *[]string, patterns []string, rules *yara.Rules, h
117116
// handle file content and checksum match for each file in the archive
118117
for _, m := range CheckFileChecksumAndContent(path, body, hashList, patterns) {
119118
if !Contains(matchingFiles, m) {
120-
LogMessage(LOG_ALERT, "(ALERT)", "File content match on:", path)
121119
matchingFiles = append(matchingFiles, m)
122120
}
123121
}
@@ -186,10 +184,16 @@ func checkForChecksum(path string, content []byte, hashList []string) (matchingF
186184
// checkForStringPattern check if file content matches any specified pattern
187185
func checkForStringPattern(path string, content []byte, patterns []string) (matchingFiles []string) {
188186
LogMessage(LOG_VERBOSE, "(SCAN)", "Checking grep patterns in", path)
187+
contentStr := string(content)
188+
lines := strings.Split(contentStr, "\n")
189+
189190
for _, expression := range patterns {
190-
if strings.Contains(string(content), expression) {
191-
LogMessage(LOG_ALERT, "(ALERT)", "Grep match:", expression, "in", path)
192-
matchingFiles = append(matchingFiles, path)
191+
for i, line := range lines {
192+
if strings.Contains(line, expression) {
193+
LogMessage(LOG_ALERT, "(ALERT)", "Grep match:", expression, "in", path, "at line", fmt.Sprintf("%d:", i+1), strings.TrimSpace(line))
194+
matchingFiles = append(matchingFiles, path)
195+
break
196+
}
193197
}
194198
}
195199
return matchingFiles

main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ func MainFastfinderRoutine(config Configuration, pConfigPath string, pNoAdvUI bo
266266
// Wait for matches collection to complete
267267
<-matchesDone
268268

269+
// Update stats
270+
totalFilesScanned += int(pipeline.GetFilesScanned())
271+
totalErrorsEncountered += int(pipeline.GetErrorsEncountered())
272+
totalMatchesFound += len(matchingFiles)
273+
269274
// listing and copy matching files
270275
LogMessage(LOG_INFO, "(INFO)", "scan finished in", basePath)
271276
if len(matchingFiles) > 0 {
@@ -297,8 +302,8 @@ func MainFastfinderRoutine(config Configuration, pConfigPath string, pNoAdvUI bo
297302
StopEventForwarding()
298303
}
299304

300-
LogMessage(LOG_INFO, "(INFO)", fmt.Sprintf("Scan completed in %v", scanDuration))
301-
LogMessage(LOG_INFO, "(INFO)", fmt.Sprintf("Files scanned: %d, Matches found: %d, Errors: %d",
305+
LogMessage(LOG_ALERT, "(INFO)", fmt.Sprintf("Scan completed in %v", scanDuration))
306+
LogMessage(LOG_ALERT, "(INFO)", fmt.Sprintf("Files scanned: %d, Matches found: %d, Errors: %d",
302307
totalFilesScanned, totalMatchesFound, totalErrorsEncountered))
303308

304309
ExitProgram(0, !UIactive)

scanner_pipeline.go

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@ import (
1313

1414
// ScannerPipeline manages concurrent file enumeration and scanning
1515
type ScannerPipeline struct {
16-
fileChan chan string
17-
matchesChan chan string
18-
errChan chan error
19-
wg sync.WaitGroup
20-
enumerationDone chan bool
21-
scanningDone chan bool
16+
fileChan chan string
17+
matchesChan chan string
18+
errChan chan error
19+
wg sync.WaitGroup
20+
enumerationDone chan bool
21+
scanningDone chan bool
22+
filesScanned int64
23+
errorsEncountered int64
24+
statsMutex sync.Mutex
2225
}
2326

2427
// NewScannerPipeline creates a new scanner pipeline
@@ -38,7 +41,7 @@ func (sp *ScannerPipeline) StartEnumeration(paths []string, excludedPaths []stri
3841
go func() {
3942
defer sp.wg.Done()
4043
for _, path := range paths {
41-
enumerateFilesStreaming(path, excludedPaths, sp.fileChan)
44+
sp.enumerateFiles(path, excludedPaths)
4245
}
4346
close(sp.fileChan)
4447
sp.enumerationDone <- true
@@ -83,6 +86,20 @@ func (sp *ScannerPipeline) GetErrors() <-chan error {
8386
return sp.errChan
8487
}
8588

89+
// GetFilesScanned returns the number of files scanned
90+
func (sp *ScannerPipeline) GetFilesScanned() int64 {
91+
sp.statsMutex.Lock()
92+
defer sp.statsMutex.Unlock()
93+
return sp.filesScanned
94+
}
95+
96+
// GetErrorsEncountered returns the number of errors encountered
97+
func (sp *ScannerPipeline) GetErrorsEncountered() int64 {
98+
sp.statsMutex.Lock()
99+
defer sp.statsMutex.Unlock()
100+
return sp.errorsEncountered
101+
}
102+
86103
// Wait waits for enumeration to complete
87104
func (sp *ScannerPipeline) WaitEnumeration() {
88105
<-sp.enumerationDone
@@ -100,37 +117,44 @@ func (sp *ScannerPipeline) WaitAll() {
100117
close(sp.errChan)
101118
}
102119

103-
// enumerateFilesStreaming enumerates files and sends them through a channel using parallel workers
104-
func enumerateFilesStreaming(path string, excludedPaths []string, fileChan chan string) {
120+
// enumerateFiles enumerates files and sends them through a channel using parallel workers
121+
func (sp *ScannerPipeline) enumerateFiles(path string, excludedPaths []string) {
105122
const numWorkers = 8 // Number of parallel workers for directory enumeration
106123

107124
dirQueue := make(chan string, 1000) // Queue of directories to process
108-
var wg sync.WaitGroup
125+
var workerWg sync.WaitGroup // WaitGroup for workers
126+
var taskWg sync.WaitGroup // WaitGroup for scanning tasks
109127
var dirCountMutex sync.Mutex
110128
dirCount := int64(0)
111129

112130
// Launch worker goroutines
113131
for i := 0; i < numWorkers; i++ {
114-
wg.Add(1)
132+
workerWg.Add(1)
115133
go func() {
116-
defer wg.Done()
134+
defer workerWg.Done()
117135
for dirPath := range dirQueue {
118-
enumerateDirectoryWorker(dirPath, excludedPaths, fileChan, dirQueue, &wg, &dirCount, &dirCountMutex)
136+
sp.enumerateDirectoryWorker(dirPath, excludedPaths, dirQueue, &taskWg, &dirCount, &dirCountMutex)
137+
taskWg.Done() // Mark this directory task as completed
119138
}
120139
}()
121140
}
122141

123142
// Queue the root directory
124-
wg.Add(1)
143+
taskWg.Add(1)
125144
dirQueue <- path
126145

146+
// Watcher routine to close queue when all tasks are done
147+
go func() {
148+
taskWg.Wait()
149+
close(dirQueue)
150+
}()
151+
127152
// Wait for all workers to finish
128-
wg.Wait()
129-
close(dirQueue)
153+
workerWg.Wait()
130154
}
131155

132156
// enumerateDirectoryWorker processes a single directory and queues its subdirectories
133-
func enumerateDirectoryWorker(dirPath string, excludedPaths []string, fileChan chan string, dirQueue chan string, wg *sync.WaitGroup, dirCount *int64, mutex *sync.Mutex) {
157+
func (sp *ScannerPipeline) enumerateDirectoryWorker(dirPath string, excludedPaths []string, dirQueue chan string, wg *sync.WaitGroup, dirCount *int64, mutex *sync.Mutex) {
134158
// Update directory count
135159
mutex.Lock()
136160
*dirCount++
@@ -142,6 +166,9 @@ func enumerateDirectoryWorker(dirPath string, excludedPaths []string, fileChan c
142166
entries, err := os.ReadDir(dirPath)
143167
if err != nil {
144168
LogMessage(LOG_ERROR, "(ERROR)", err)
169+
sp.statsMutex.Lock()
170+
sp.errorsEncountered++
171+
sp.statsMutex.Unlock()
145172
return
146173
}
147174

@@ -165,10 +192,12 @@ func enumerateDirectoryWorker(dirPath string, excludedPaths []string, fileChan c
165192
if entry.IsDir() {
166193
// Queue subdirectory for processing by a worker
167194
wg.Add(1)
168-
dirQueue <- fullPath
195+
go func(p string) {
196+
dirQueue <- p
197+
}(fullPath)
169198
} else {
170199
// Send file to the channel
171-
fileChan <- fullPath
200+
sp.fileChan <- fullPath
172201
}
173202
}
174203
}
@@ -184,6 +213,10 @@ func (sp *ScannerPipeline) scanFiles(
184213
contentDependsOnPath bool) {
185214

186215
for filePath := range sp.fileChan {
216+
sp.statsMutex.Lock()
217+
sp.filesScanned++
218+
sp.statsMutex.Unlock()
219+
187220
// Check path patterns first if they exist
188221
pathMatches := false
189222
if len(pathPatterns) > 0 {
@@ -211,6 +244,9 @@ func (sp *ScannerPipeline) scanFiles(
211244
b, err := os.ReadFile(filePath)
212245
if err != nil {
213246
LogMessage(LOG_ERROR, "(ERROR)", "Unable to read file", filePath)
247+
sp.statsMutex.Lock()
248+
sp.errorsEncountered++
249+
sp.statsMutex.Unlock()
214250
continue
215251
}
216252

@@ -222,7 +258,6 @@ func (sp *ScannerPipeline) scanFiles(
222258

223259
// Check checksum and grep patterns
224260
for _, m := range CheckFileChecksumAndContent(filePath, b, hashList, effectivePatterns) {
225-
LogMessage(LOG_ALERT, "(ALERT)", "File content match on:", filePath)
226261
sp.matchesChan <- m
227262
}
228263

@@ -232,6 +267,9 @@ func (sp *ScannerPipeline) scanFiles(
232267
yaraResult, err := PerformYaraScan(&b, rules)
233268
if err != nil {
234269
LogMessage(LOG_ERROR, "(ERROR)", "Error performing yara scan on", filePath, err)
270+
sp.statsMutex.Lock()
271+
sp.errorsEncountered++
272+
sp.statsMutex.Unlock()
235273
continue
236274
}
237275

@@ -256,6 +294,10 @@ func (sp *ScannerPipeline) scanFiles(
256294
// scanFilesPathOnly scans only path patterns
257295
func (sp *ScannerPipeline) scanFilesPathOnly(pathPatterns []*regexp2.Regexp) {
258296
for filePath := range sp.fileChan {
297+
sp.statsMutex.Lock()
298+
sp.filesScanned++
299+
sp.statsMutex.Unlock()
300+
259301
for _, pattern := range pathPatterns {
260302
if match, _ := pattern.MatchString(filePath); match {
261303
LogMessage(LOG_ALERT, "(ALERT)", "File path match on:", filePath)

0 commit comments

Comments
 (0)