Skip to content
This repository has been archived by the owner on Apr 2, 2018. It is now read-only.

Commit

Permalink
watch any log files that appear in source dir
Browse files Browse the repository at this point in the history
[#117078735]

Signed-off-by: Maria Shaldibina <[email protected]>
  • Loading branch information
evashort authored and mariash committed Apr 7, 2016
1 parent 245e3a6 commit 5f9414d
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 76 deletions.
37 changes: 7 additions & 30 deletions cmd/blackbox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,17 @@ func main() {
logger.Fatalf("could not load config file: %s\n", err)
}

members := grouper.Members{}

if len(config.Syslog.Sources) > 0 {
drainer, err := syslog.NewDrainer(
config.Syslog.Destination,
config.Hostname,
)
if err != nil {
logger.Fatalf("could not drain to syslog: %s\n", err)
}

members = append(members, buildTailers(config.Syslog.Sources, drainer)...)
}

group := grouper.NewParallel(nil, members)
group := grouper.NewDynamic(nil, 0, 0)
running := ifrit.Invoke(sigmon.New(group))

go func() {
drainerFactory := syslog.NewDrainerFactory(config.Syslog.Destination, config.Hostname)
fileWatcher := blackbox.NewFileWatcher(logger, config.Syslog.SourceDir, group.Client(), drainerFactory)
fileWatcher.Watch()
}()

err = <-running.Wait()
if err != nil {
logger.Fatalf("failed: %s", err)
}
}

func buildTailers(sources []blackbox.SyslogSource, drainer syslog.Drainer) grouper.Members {
members := make(grouper.Members, len(sources))

for i, source := range sources {
tailer := &blackbox.Tailer{
Source: source,
Drainer: drainer,
}

members[i] = grouper.Member{source.Path, tailer}
}

return members
}
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type SyslogSource struct {
}

type SyslogConfig struct {
Destination syslog.Drain `yaml:"destination"`
Sources []SyslogSource `yaml:"sources"`
Destination syslog.Drain `yaml:"destination"`
SourceDir string `yaml:"source_dir"`
}

type Config struct {
Expand Down
96 changes: 96 additions & 0 deletions file_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package blackbox

import (
"io/ioutil"
"log"
"os"
"path/filepath"
"time"

"github.com/concourse/blackbox/syslog"
"github.com/tedsuo/ifrit/grouper"
)

const POLL_INTERVAL = 5 * time.Second

type fileWatcher struct {
logger *log.Logger

sourceDir string
dynamicGroupClient grouper.DynamicClient

drainerFactory syslog.DrainerFactory
}

func NewFileWatcher(
logger *log.Logger,
sourceDir string,
dynamicGroupClient grouper.DynamicClient,
drainerFactory syslog.DrainerFactory,
) *fileWatcher {
return &fileWatcher{
logger: logger,
sourceDir: sourceDir,
dynamicGroupClient: dynamicGroupClient,
drainerFactory: drainerFactory,
}
}

func (f *fileWatcher) Watch() {
for {
logDirs, err := ioutil.ReadDir(f.sourceDir)
if err != nil {
f.logger.Fatalf("could not list directories in source dir: %s\n", err)
}

for _, logDir := range logDirs {
tag := logDir.Name()
tagDirPath := filepath.Join(f.sourceDir, tag)

fileInfo, err := os.Stat(tagDirPath)
if err != nil {
f.logger.Fatalf("failed to determine if path is directory: %s\n", err)
}

if !fileInfo.IsDir() {
continue
}

logFiles, err := ioutil.ReadDir(tagDirPath)
if err != nil {
f.logger.Fatalf("could not list files in log dir %s: %s\n", tag, err)
}

for _, logFile := range logFiles {
logFileFullPath := filepath.Join(tagDirPath, logFile.Name())
if _, found := f.dynamicGroupClient.Get(logFileFullPath); !found {
f.dynamicGroupClient.Inserter() <- f.memberForFile(logFileFullPath)
}
}
}

time.Sleep(POLL_INTERVAL)
}
}

func (f *fileWatcher) memberForFile(logfilePath string) grouper.Member {
drainer, err := f.drainerFactory.NewDrainer()
if err != nil {
f.logger.Fatalf("could not drain to syslog: %s\n", err)
}

logfileDir := filepath.Dir(logfilePath)

tag, err := filepath.Rel(f.sourceDir, logfileDir)
if err != nil {
f.logger.Fatalf("could not compute tag from file path %s: %s\n", logfilePath, err)
}

tailer := &Tailer{
Path: logfilePath,
Tag: tag,
Drainer: drainer,
}

return grouper.Member{tailer.Path, tailer}
}
Loading

0 comments on commit 5f9414d

Please sign in to comment.