Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Logs API apporach to fix race condition due to pruning in results watcher #713

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *Server) GetLog(req *pb.GetLogRequest, srv pb.Logs_GetLogServer) error {
s.logger.Error(err)
return status.Error(codes.Internal, "Error streaming log")
}
if object.Status.Size == 0 {
if !object.Status.IsStored {
s.logger.Errorf("no logs exist for %s", req.GetName())
return status.Error(codes.NotFound, "Log doesn't exist")
}
Expand Down Expand Up @@ -169,9 +169,10 @@ func (s *Server) handleReturn(srv pb.Logs_UpdateLogServer, rec *db.Record, log *
}
apiRec := record.ToAPI(rec)
apiRec.UpdateTime = timestamppb.Now()
if written > 0 {
log.Status.Size = written
}

log.Status.Size = written
log.Status.IsStored = returnErr == io.EOF

data, err := json.Marshal(log)
if err != nil {
if !isNilOrEOF(returnErr) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/server/v1alpha2/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ func TestGetLog(t *testing.T) {
},
// To avoid defaulting behavior, explicitly set the file path in status
Status: v1alpha2.LogStatus{
Path: logFile.Name(),
Size: 1024,
Path: logFile.Name(),
Size: 1024,
IsStored: true,
},
}),
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ const (

// LogStatus defines the current status of the log resource.
type LogStatus struct {
Path string `json:"path,omitempty"`
Size int64 `json:"size"`
Path string `json:"path,omitempty"`
Size int64 `json:"size"`
IsStored bool `json:"isStored"`
}

// Default sets up default values for Log TypeMeta, such as API version and kind.
Expand Down
18 changes: 18 additions & 0 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ func (r *Reconciler) deleteUponCompletion(ctx context.Context, o results.Object)
return controller.NewRequeueAfter(r.cfg.RequeueInterval)
}

if r.resultsClient.LogsClient != nil {
logStored, err := r.resultsClient.LogStatus(ctx, o)
if err != nil {
logger.Errorw("Error confirming object's logs were stored - requeuing to prune later",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
return controller.NewRequeueAfter(r.cfg.RequeueInterval)
}
if !logStored {
logger.Debugw("Object is streaming logs - requeuing to prune later", zap.Duration("results.tekton.dev/requeueAfter", r.cfg.RequeueInterval))
return controller.NewRequeueAfter(r.cfg.RequeueInterval)
}
}

logger.Infow("Deleting object", zap.String("results.tekton.dev/uid", string(o.GetUID())),
zap.Int64("results.tekton.dev/time-taken-seconds", int64(time.Since(*completionTime).Seconds())))

Expand Down Expand Up @@ -341,6 +358,7 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
zap.String("name", o.GetName()),
zap.Error(err),
)
return
}
logger.Debugw("Streaming log completed",
zap.String("namespace", o.GetNamespace()),
Expand Down
10 changes: 10 additions & 0 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ func TestReconcile_TaskRun(t *testing.T) {
})

t.Run("delete object once grace period elapses", func(t *testing.T) {
// Recreate the object to retest the deletion
if err := trclient.Delete(ctx, taskrun.GetName(), metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if _, err := trclient.Create(ctx, taskrun, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
// disable logs client so that requeuing due to missing logs
// won't interfere with this test
r.resultsClient.LogsClient = nil
// Enable object deletion, re-reconcile
cfg.CompletedResourceGracePeriod = 1 * time.Second

Expand Down
28 changes: 28 additions & 0 deletions pkg/watcher/results/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package results

import (
"context"
"encoding/json"
"fmt"

"github.com/google/uuid"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
Expand Down Expand Up @@ -86,3 +88,29 @@ func (c *Client) GetLogRecord(ctx context.Context, o Object) (*pb.Record, error)
}
return rec, err
}

// LogStatus checks if logs related to the given object have been successfully stored.
func (c *Client) LogStatus(ctx context.Context, o Object) (bool, error) {
rec, err := c.GetLogRecord(ctx, o)
if err != nil {
return false, err
}

var logStatus LogStatus

err = json.Unmarshal(rec.GetData().GetValue(), &logStatus)
if err != nil {
return false, fmt.Errorf("error unmarshalling : %w", err)
}

return logStatus.Status.IsStored, nil
}

// LogStatus is a struct to match the JSON structure of the log status.
type LogStatus struct {
Status `json:"status"`
}

type Status struct {
IsStored bool `json:"isStored"`
}