diff --git a/internal/provider/journalctl.go b/internal/provider/journalctl.go new file mode 100644 index 0000000..616959a --- /dev/null +++ b/internal/provider/journalctl.go @@ -0,0 +1,119 @@ +package provider + +import ( + "bufio" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api" +) + +const journalctl = "journalctl" + +func journalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (io.ReadCloser, func() error, error) { + fnlog := log. + WithField("podNamespace", namespace). + WithField("podName", name). + WithField("containerName", container) + + fnlog.Infof("calling for container logs with options %+v", logOpts) + cancel := func() error { return nil } // initialize as noop + + unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator) + + // Handle all the options. + args := []string{"-u", unitName, "--no-hostname"} // only works with -o short-xxx options. + if logOpts.Tail > 0 { + args = append(args, "-n") + args = append(args, fmt.Sprintf("%d", logOpts.Tail)) + } + if logOpts.Follow { + args = append(args, "-f") + } + if !logOpts.Timestamps { + args = append(args, "-o") + args = append(args, "cat") + } else { + args = append(args, "-o") + args = append(args, "short-full") // this is _not_ the default Go timestamp output + } + if logOpts.SinceSeconds > 0 { + args = append(args, "-S") + args = append(args, fmt.Sprintf("-%ds", logOpts.SinceSeconds)) + } + if !logOpts.SinceTime.IsZero() { + args = append(args, "-S") + args = append(args, logOpts.SinceTime.Format(time.RFC3339)) + } + // Previous might not be possible to implement + // TODO(pires,miek) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538 + // LimitBytes - unsure (maybe a io.CopyBuffer?) + + fnlog.Debugf("getting container logs via: %q %v", journalctl, args) + cmd := exec.Command(journalctl, args...) + p, err := cmd.StdoutPipe() + if err != nil { + return nil, cancel, err + } + + if err := cmd.Start(); err != nil { + return nil, cancel, err + } + + cancel = func() error { + go func() { + if err := cmd.Wait(); err != nil { + fnlog.Debugf("wait for %q failed: %s", journalctl, err) + } + }() + return cmd.Process.Kill() + } + + return p, cancel, nil +} + +var ErrExpired = errors.New("timeout expired") + +// journalFollow synchronously follows the io.Reader, writing each new journal entry to writer. The +// follow will continue until a single time.Time is received on the until channel (or it's closed). +func journalFollow(until <-chan time.Time, reader io.Reader, writer io.Writer) error { + scanner := bufio.NewScanner(reader) + bufch := make(chan []byte) + errch := make(chan error) + + go func() { + for scanner.Scan() { + if err := scanner.Err(); err != nil { + errch <- err + return + } + bufch <- scanner.Bytes() + } + // When the context is Done() the 'until' channel is closed, this kicks in the defers in the GetContainerLogsHandler method. + // this cleans up the journalctl, and closes all file descripters. Scan() then stops with an error (before any reads, + // hence the above if err .. .isn't triggered). In the end this go-routine exits. + // the error here is "read |0: file already closed". + }() + + for { + select { + case <-until: + return ErrExpired + + case err := <-errch: + return err + + case buf := <-bufch: + if _, err := writer.Write(buf); err != nil { + return err + } + if _, err := io.WriteString(writer, "\n"); err != nil { + return err + } + } + } +} diff --git a/internal/provider/log.go b/internal/provider/log.go index 671fe45..1552a5b 100644 --- a/internal/provider/log.go +++ b/internal/provider/log.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - "github.com/coreos/go-systemd/v22/sdjournal" "github.com/gorilla/mux" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -33,14 +32,12 @@ func (p *p) GetContainerLogsHandler(w http.ResponseWriter, r *http.Request) { r.Header.Set("Transfer-Encoding", "chunked") - // Retrieve the actual systemd journal reader given: - // * it implements io.ReadCloser, and - // * exposes other functionality, like follow mode. - logsReader, err := p.getJournalReader(namespace, pod, container, opts) + logsReader, cancel, err := journalReader(namespace, pod, container, opts) if err != nil { return errors.Wrap(err, "failed to get systemd journal logs reader") } defer logsReader.Close() + defer cancel() // ResponseWriter must be flushed after each write. if _, ok := w.(writeFlusher); !ok { @@ -48,33 +45,30 @@ func (p *p) GetContainerLogsHandler(w http.ResponseWriter, r *http.Request) { } fw := flushOnWrite(w) + if !opts.Follow { + io.Copy(fw, logsReader) + return nil + } + // If in follow mode, follow until interrupted. - if opts.Follow { - untilTime := make(chan time.Time, 1) - errChan := make(chan error, 1) - - go func(w io.Writer, errChan chan error) { - err := logsReader.Follow(untilTime, w) - if err != nil && err != sdjournal.ErrExpired { - err = errors.Wrap(err, "failed to follow systemd journal logs") - } - errChan <- err - }(fw, errChan) - - // Stop following logs if request context is completed. - select { - case err := <-errChan: - return err - case <-r.Context().Done(): - close(untilTime) + untilTime := make(chan time.Time, 1) + errChan := make(chan error, 1) + + go func(w io.Writer, errChan chan error) { + err := journalFollow(untilTime, logsReader, w) + if err != nil && err != ErrExpired { + err = errors.Wrap(err, "failed to follow systemd journal logs") } - return nil + errChan <- err + }(fw, errChan) - // Otherwise, just pipe the journal reader. - } else { - io.Copy(fw, logsReader) + // Stop following logs if request context is completed. + select { + case err := <-errChan: + return err + case <-r.Context().Done(): + close(untilTime) } - return nil })(w, r) } diff --git a/internal/provider/pod.go b/internal/provider/pod.go index dc83ba5..16e80ec 100644 --- a/internal/provider/pod.go +++ b/internal/provider/pod.go @@ -5,9 +5,7 @@ import ( "fmt" "os" "strings" - "time" - "github.com/coreos/go-systemd/v22/sdjournal" "github.com/pkg/errors" "github.com/virtual-kubelet/systemk/internal/ospkg" "github.com/virtual-kubelet/systemk/internal/unit" @@ -290,60 +288,6 @@ func (p *p) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.P return &pod.Status, nil } -// getJournalReader returns the actual journal reader. -// This is useful when an io.ReadCloser is not enough, eg we need Follow(). -// -// TODO(pires) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538 -func (p *p) getJournalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (*sdjournal.JournalReader, error) { - fnlog := log. - WithField("podNamespace", namespace). - WithField("podName", name). - WithField("containerName", container) - - fnlog.Infof("calling for container logs with options %+v", logOpts) - - unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator) - journalConfig := sdjournal.JournalReaderConfig{ - Matches: []sdjournal.Match{ - { - // Filter by unit. - Field: sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT, - Value: unitName, - }, - }, - } - if logOpts.SinceSeconds > 0 { - // Since duration must be negative so we get logs from the past. - journalConfig.Since = -time.Second * time.Duration(logOpts.SinceSeconds) - } - // By default, SinceTime is "0001-01-01 00:00:00 +0000 UTC". - if !logOpts.SinceTime.IsZero() { - journalConfig.Since = time.Since(logOpts.SinceTime) - } - if logOpts.Tail > 0 { - journalConfig.NumFromTail = uint64(logOpts.Tail) - } - // By default, timestamps are present in journal entries. - // Kubernetes defaults to not having timestamps, so we adapt. - if !logOpts.Timestamps { - journalConfig.Formatter = func(entry *sdjournal.JournalEntry) (string, error) { - msg, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE] - if !ok { - return "", fmt.Errorf("no %q field present in journal entry", sdjournal.SD_JOURNAL_FIELD_MESSAGE) - } - - return fmt.Sprintf("%s\n", msg), nil - } - } - - journalReader, err := sdjournal.NewJournalReader(journalConfig) - if err != nil { - fnlog.Error("failed to retrieve logs from journald, for unit %q", unitName, err) - } - - return journalReader, err -} - // UpdatePod is a noop, func (p *p) UpdatePod(ctx context.Context, pod *corev1.Pod) error { log.