diff --git a/cmd/blackbox/main.go b/cmd/blackbox/main.go index bd7ef8d..fb8c976 100644 --- a/cmd/blackbox/main.go +++ b/cmd/blackbox/main.go @@ -33,40 +33,17 @@ func main() { logger.Fatalf("could not load config file: %s\n", err) } - members := grouper.Members{} - - if len(config.Syslog.Sources) > 0 { - drainer, err := syslog.NewDrainer( - config.Syslog.Destination, - config.Hostname, - ) - if err != nil { - logger.Fatalf("could not drain to syslog: %s\n", err) - } - - members = append(members, buildTailers(config.Syslog.Sources, drainer)...) - } - - group := grouper.NewParallel(nil, members) + group := grouper.NewDynamic(nil, 0, 0) running := ifrit.Invoke(sigmon.New(group)) + go func() { + drainerFactory := syslog.NewDrainerFactory(config.Syslog.Destination, config.Hostname) + fileWatcher := blackbox.NewFileWatcher(logger, config.Syslog.SourceDir, group.Client(), drainerFactory) + fileWatcher.Watch() + }() + err = <-running.Wait() if err != nil { logger.Fatalf("failed: %s", err) } } - -func buildTailers(sources []blackbox.SyslogSource, drainer syslog.Drainer) grouper.Members { - members := make(grouper.Members, len(sources)) - - for i, source := range sources { - tailer := &blackbox.Tailer{ - Source: source, - Drainer: drainer, - } - - members[i] = grouper.Member{source.Path, tailer} - } - - return members -} diff --git a/config.go b/config.go index 20600d6..c6ffb39 100644 --- a/config.go +++ b/config.go @@ -40,8 +40,8 @@ type SyslogSource struct { } type SyslogConfig struct { - Destination syslog.Drain `yaml:"destination"` - Sources []SyslogSource `yaml:"sources"` + Destination syslog.Drain `yaml:"destination"` + SourceDir string `yaml:"source_dir"` } type Config struct { diff --git a/file_watcher.go b/file_watcher.go new file mode 100644 index 0000000..9b48fcc --- /dev/null +++ b/file_watcher.go @@ -0,0 +1,96 @@ +package blackbox + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + "time" + + "github.com/concourse/blackbox/syslog" + "github.com/tedsuo/ifrit/grouper" +) + +const POLL_INTERVAL = 5 * time.Second + +type fileWatcher struct { + logger *log.Logger + + sourceDir string + dynamicGroupClient grouper.DynamicClient + + drainerFactory syslog.DrainerFactory +} + +func NewFileWatcher( + logger *log.Logger, + sourceDir string, + dynamicGroupClient grouper.DynamicClient, + drainerFactory syslog.DrainerFactory, +) *fileWatcher { + return &fileWatcher{ + logger: logger, + sourceDir: sourceDir, + dynamicGroupClient: dynamicGroupClient, + drainerFactory: drainerFactory, + } +} + +func (f *fileWatcher) Watch() { + for { + logDirs, err := ioutil.ReadDir(f.sourceDir) + if err != nil { + f.logger.Fatalf("could not list directories in source dir: %s\n", err) + } + + for _, logDir := range logDirs { + tag := logDir.Name() + tagDirPath := filepath.Join(f.sourceDir, tag) + + fileInfo, err := os.Stat(tagDirPath) + if err != nil { + f.logger.Fatalf("failed to determine if path is directory: %s\n", err) + } + + if !fileInfo.IsDir() { + continue + } + + logFiles, err := ioutil.ReadDir(tagDirPath) + if err != nil { + f.logger.Fatalf("could not list files in log dir %s: %s\n", tag, err) + } + + for _, logFile := range logFiles { + logFileFullPath := filepath.Join(tagDirPath, logFile.Name()) + if _, found := f.dynamicGroupClient.Get(logFileFullPath); !found { + f.dynamicGroupClient.Inserter() <- f.memberForFile(logFileFullPath) + } + } + } + + time.Sleep(POLL_INTERVAL) + } +} + +func (f *fileWatcher) memberForFile(logfilePath string) grouper.Member { + drainer, err := f.drainerFactory.NewDrainer() + if err != nil { + f.logger.Fatalf("could not drain to syslog: %s\n", err) + } + + logfileDir := filepath.Dir(logfilePath) + + tag, err := filepath.Rel(f.sourceDir, logfileDir) + if err != nil { + f.logger.Fatalf("could not compute tag from file path %s: %s\n", logfilePath, err) + } + + tailer := &Tailer{ + Path: logfilePath, + Tag: tag, + Drainer: drainer, + } + + return grouper.Member{tailer.Path, tailer} +} diff --git a/integration/syslog_test.go b/integration/syslog_test.go index 3e89c66..66b8a24 100644 --- a/integration/syslog_test.go +++ b/integration/syslog_test.go @@ -3,6 +3,8 @@ package integration_test import ( "io/ioutil" "os" + "path/filepath" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -16,9 +18,14 @@ import ( ) var _ = Describe("Blackbox", func() { - var blackboxRunner *BlackboxRunner - var syslogServer *SyslogServer - var inbox *Inbox + var ( + blackboxRunner *BlackboxRunner + syslogServer *SyslogServer + inbox *Inbox + logDir string + tagName string + logFile *os.File + ) BeforeEach(func() { inbox = NewInbox() @@ -26,13 +33,31 @@ var _ = Describe("Blackbox", func() { syslogServer.Start() blackboxRunner = NewBlackboxRunner(blackboxPath) + + var err error + logDir, err = ioutil.TempDir("", "syslog-test") + Expect(err).NotTo(HaveOccurred()) + + tagName = "test-tag" + err = os.Mkdir(filepath.Join(logDir, tagName), os.ModePerm) + Expect(err).NotTo(HaveOccurred()) + + logFile, err = os.OpenFile( + filepath.Join(logDir, tagName, "tail"), + os.O_WRONLY|os.O_CREATE|os.O_TRUNC, + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { + logFile.Close() + syslogServer.Stop() + os.RemoveAll(logDir) }) - buildConfigHostname := func(hostname string, filePathToWatch string) blackbox.Config { + buildConfigHostname := func(hostname string, dirToWatch string) blackbox.Config { return blackbox.Config{ Hostname: hostname, Syslog: blackbox.SyslogConfig{ @@ -40,31 +65,23 @@ var _ = Describe("Blackbox", func() { Transport: "udp", Address: syslogServer.Addr, }, - Sources: []blackbox.SyslogSource{ - { - Path: filePathToWatch, - Tag: "test-tag", - }, - }, + SourceDir: dirToWatch, }, } } - buildConfig := func(filePathToWatch string) blackbox.Config { - return buildConfigHostname("", filePathToWatch) + buildConfig := func(dirToWatch string) blackbox.Config { + return buildConfigHostname("", dirToWatch) } - It("logs any new lines of a watched file to syslog", func() { - fileToWatch, err := ioutil.TempFile("", "tail") - Expect(err).NotTo(HaveOccurred()) - - config := buildConfig(fileToWatch.Name()) + It("logs any new lines of a file in source directory to syslog with subdirectory name as tag", func() { + config := buildConfig(logDir) blackboxRunner.StartWithConfig(config) - fileToWatch.WriteString("hello\n") - fileToWatch.WriteString("world\n") - fileToWatch.Sync() - fileToWatch.Close() + logFile.WriteString("hello\n") + logFile.WriteString("world\n") + logFile.Sync() + logFile.Close() var message *sl.Message Eventually(inbox.Messages, "5s").Should(Receive(&message)) @@ -78,20 +95,14 @@ var _ = Describe("Blackbox", func() { Expect(message.Content).To(ContainSubstring(Hostname())) blackboxRunner.Stop() - fileToWatch.Close() - os.Remove(fileToWatch.Name()) }) It("can have a custom hostname", func() { - fileToWatch, err := ioutil.TempFile("", "tail") - Expect(err).NotTo(HaveOccurred()) - - config := buildConfigHostname("fake-hostname", fileToWatch.Name()) + config := buildConfigHostname("fake-hostname", logDir) blackboxRunner.StartWithConfig(config) - fileToWatch.WriteString("hello\n") - fileToWatch.Sync() - fileToWatch.Close() + logFile.WriteString("hello\n") + logFile.Sync() var message *sl.Message Eventually(inbox.Messages, "5s").Should(Receive(&message)) @@ -100,29 +111,210 @@ var _ = Describe("Blackbox", func() { Expect(message.Content).To(ContainSubstring("fake-hostname")) blackboxRunner.Stop() - os.Remove(fileToWatch.Name()) }) It("does not log existing messages", func() { - fileToWatch, err := ioutil.TempFile("", "tail") + logFile.WriteString("already present\n") + logFile.Sync() + + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + logFile.WriteString("hello\n") + logFile.Sync() + + var message *sl.Message + Eventually(inbox.Messages, "2s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + + blackboxRunner.Stop() + }) + + It("tracks logs in multiple files in source directory", func() { + anotherLogFile, err := os.OpenFile( + filepath.Join(logDir, tagName, "another-tail"), + os.O_WRONLY|os.O_CREATE|os.O_TRUNC, + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + defer anotherLogFile.Close() + + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + logFile.WriteString("hello\n") + logFile.Sync() + + var message *sl.Message + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + + anotherLogFile.WriteString("hello from the other side\n") + anotherLogFile.Sync() + + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello from the other side")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + }) + + It("tracks files in multiple directories using multiple tags", func() { + tagName2 := "2-test-2-tag" + err := os.Mkdir(filepath.Join(logDir, tagName2), os.ModePerm) Expect(err).NotTo(HaveOccurred()) - fileToWatch.WriteString("already present\n") - fileToWatch.Sync() + anotherLogFile, err := os.OpenFile( + filepath.Join(logDir, tagName2, "another-tail"), + os.O_WRONLY|os.O_CREATE|os.O_TRUNC, + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + defer anotherLogFile.Close() - config := buildConfig(fileToWatch.Name()) + config := buildConfig(logDir) blackboxRunner.StartWithConfig(config) - fileToWatch.WriteString("hello\n") - fileToWatch.Sync() - fileToWatch.Close() + logFile.WriteString("hello\n") + logFile.Sync() var message *sl.Message - Eventually(inbox.Messages, "2s").Should(Receive(&message)) + Eventually(inbox.Messages, "5s").Should(Receive(&message)) Expect(message.Content).To(ContainSubstring("hello")) Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + + anotherLogFile.WriteString("hello from the other side\n") + anotherLogFile.Sync() + + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello from the other side")) + Expect(message.Content).To(ContainSubstring("2-test-2-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + }) + + It("starts tracking logs in newly created files", func() { + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + anotherLogFile, err := os.OpenFile( + filepath.Join(logDir, tagName, "another-tail"), + os.O_WRONLY|os.O_CREATE|os.O_TRUNC, + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + defer anotherLogFile.Close() + + // wait for tailer to pick up file, twice the interval + time.Sleep(10 * time.Second) + + anotherLogFile.WriteString("hello from the other side\n") + anotherLogFile.Sync() + + var message *sl.Message + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello from the other side")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + + By("keeping track of old files") + logFile.WriteString("hello\n") + logFile.Sync() + + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + }) + + It("starts tracking logs in newly created files", func() { + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + logFile.WriteString("hello\n") + logFile.Sync() + + var message *sl.Message + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + + os.Remove(filepath.Join(logDir, tagName, "tail")) + + // wait for tail process to die, tailer interval is 1 sec + time.Sleep(2 * time.Second) + + anotherLogFile, err := os.OpenFile( + filepath.Join(logDir, tagName, "tail"), + os.O_WRONLY|os.O_CREATE|os.O_TRUNC, + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + defer anotherLogFile.Close() + + // wait for tailer to pick up file, twice the interval + time.Sleep(10 * time.Second) + + anotherLogFile.WriteString("bye\n") + anotherLogFile.Sync() + + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("bye")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + }) + + It("ignores subdirectories in tag directories", func() { + err := os.Mkdir(filepath.Join(logDir, tagName, "ignore-me"), os.ModePerm) + Expect(err).NotTo(HaveOccurred()) + + err = ioutil.WriteFile( + filepath.Join(logDir, tagName, "ignore-me", "and-my-son"), + []byte("some-data"), + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + logFile.WriteString("hello\n") + logFile.Sync() + logFile.Close() + + var message *sl.Message + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) + + blackboxRunner.Stop() + }) + + It("ignores files in source directory", func() { + err := ioutil.WriteFile( + filepath.Join(logDir, "not-a-tag-dir"), + []byte("some-data"), + os.ModePerm, + ) + Expect(err).NotTo(HaveOccurred()) + + config := buildConfig(logDir) + blackboxRunner.StartWithConfig(config) + + logFile.WriteString("hello\n") + logFile.Sync() + logFile.Close() + + var message *sl.Message + Eventually(inbox.Messages, "5s").Should(Receive(&message)) + Expect(message.Content).To(ContainSubstring("hello")) + Expect(message.Content).To(ContainSubstring("test-tag")) + Expect(message.Content).To(ContainSubstring(Hostname())) blackboxRunner.Stop() - os.Remove(fileToWatch.Name()) }) }) diff --git a/syslog/drainer_factory.go b/syslog/drainer_factory.go new file mode 100644 index 0000000..bfb70d1 --- /dev/null +++ b/syslog/drainer_factory.go @@ -0,0 +1,24 @@ +package syslog + +type DrainerFactory interface { + NewDrainer() (Drainer, error) +} + +type drainerFactory struct { + destination Drain + hostname string +} + +func NewDrainerFactory(destination Drain, hostname string) DrainerFactory { + return &drainerFactory{ + destination: destination, + hostname: hostname, + } +} + +func (f *drainerFactory) NewDrainer() (Drainer, error) { + return NewDrainer( + f.destination, + f.hostname, + ) +} diff --git a/tailer.go b/tailer.go index 28abb8e..da60434 100644 --- a/tailer.go +++ b/tailer.go @@ -6,20 +6,21 @@ import ( "time" "github.com/ActiveState/tail" - "github.com/ActiveState/tail/watch" + "github.com/hpcloud/tail/watch" "github.com/concourse/blackbox/syslog" ) type Tailer struct { - Source SyslogSource + Path string + Tag string Drainer syslog.Drainer } func (tailer *Tailer) Run(signals <-chan os.Signal, ready chan<- struct{}) error { watch.POLL_DURATION = 1 * time.Second - t, err := tail.TailFile(tailer.Source.Path, tail.Config{ + t, err := tail.TailFile(tailer.Path, tail.Config{ Follow: true, ReOpen: true, Poll: true, @@ -44,7 +45,7 @@ func (tailer *Tailer) Run(signals <-chan os.Signal, ready chan<- struct{}) error return nil } - tailer.Drainer.Drain(line.Text, tailer.Source.Tag) + tailer.Drainer.Drain(line.Text, tailer.Tag) case <-signals: return t.Stop() }