diff --git a/cli/internal/run/real_run.go b/cli/internal/run/real_run.go index 2ec48541de347..6386d5dd4195f 100644 --- a/cli/internal/run/real_run.go +++ b/cli/internal/run/real_run.go @@ -113,6 +113,8 @@ func RealRun( taskSummaries = append(taskSummaries, taskSummary) // not using defer, just release the lock mu.Unlock() + + runSummary.CloseTask(taskSummary) } // Return the error when there is one diff --git a/cli/internal/runsummary/execution_summary.go b/cli/internal/runsummary/execution_summary.go index fabb690826bbf..d60b6158c83b0 100644 --- a/cli/internal/runsummary/execution_summary.go +++ b/cli/internal/runsummary/execution_summary.go @@ -127,6 +127,15 @@ type executionSummary struct { exitCode int } +func (es *executionSummary) Duration() time.Duration { + ended := es.endedAt + if ended.IsZero() { + ended = time.Now() + } + + return ended.Sub(es.startedAt) +} + // MarshalJSON munges the executionSummary into a format we want // We'll use an anonmyous, private struct for this, so it's not confusingly duplicated. func (es *executionSummary) MarshalJSON() ([]byte, error) { diff --git a/cli/internal/runsummary/format_execution_summary.go b/cli/internal/runsummary/format_execution_summary.go index be0bcfe87e8c5..7a65ad652c9c2 100644 --- a/cli/internal/runsummary/format_execution_summary.go +++ b/cli/internal/runsummary/format_execution_summary.go @@ -21,8 +21,7 @@ func (rsm *Meta) printExecutionSummary() { successful := summary.ExecutionSummary.cached + summary.ExecutionSummary.success failed := rsm.RunSummary.getFailedTasks() // Note: ExecutionSummary.failure exists, but we need the task names cached := summary.ExecutionSummary.cached - // TODO: can we use a method on ExecutionSummary here? - duration := time.Since(summary.ExecutionSummary.startedAt).Truncate(time.Millisecond) + duration := summary.ExecutionSummary.Duration().Truncate(time.Millisecond) if cached == attempted && attempted > 0 { terminalProgram := os.Getenv("TERM_PROGRAM") diff --git a/cli/internal/runsummary/run_summary.go b/cli/internal/runsummary/run_summary.go index 51ee5c38fe0b4..e74e1b5b1ca9a 100644 --- a/cli/internal/runsummary/run_summary.go +++ b/cli/internal/runsummary/run_summary.go @@ -3,10 +3,8 @@ package runsummary import ( "context" - "encoding/json" "fmt" "path/filepath" - "sync" "time" "github.com/mitchellh/cli" @@ -32,9 +30,6 @@ const NoFrameworkDetected = "" const FrameworkDetectionSkipped = "" const runSummarySchemaVersion = "0" -const runsEndpoint = "/v0/spaces/%s/runs" -const runsPatchEndpoint = "/v0/spaces/%s/runs/%s" -const tasksEndpoint = "/v0/spaces/%s/runs/%s/tasks" type runType int @@ -53,8 +48,7 @@ type Meta struct { repoPath turbopath.RelativeSystemPath singlePackage bool shouldSave bool - apiClient *client.APIClient - spaceID string + spacesClient *spacesClient runType runType synthesizedCommand string } @@ -104,7 +98,7 @@ func NewRunSummary( executionSummary := newExecutionSummary(synthesizedCommand, repoPath, startAt, profile) envVars := env.GetEnvMap() - return Meta{ + rsm := Meta{ RunSummary: &RunSummary{ ID: ksuid.New(), Version: runSummarySchemaVersion, @@ -123,10 +117,16 @@ func NewRunSummary( repoRoot: repoRoot, singlePackage: singlePackage, shouldSave: shouldSave, - apiClient: apiClient, - spaceID: spaceID, synthesizedCommand: synthesizedCommand, } + + rsm.spacesClient = newSpacesClient(spaceID, apiClient, ui) + if rsm.spacesClient.enabled { + go rsm.spacesClient.start() + rsm.spacesClient.createRun(&rsm) + } + + return rsm } // getPath returns a path to where the runSummary is written. @@ -164,47 +164,30 @@ func (rsm *Meta) Close(ctx context.Context, exitCode int, workspaceInfos workspa } rsm.printExecutionSummary() - - // If we don't have a spaceID, we can exit now - if rsm.spaceID == "" { - return nil + if rsm.spacesClient.enabled { + rsm.sendToSpace(ctx) + } else { + // Print any errors if the client is not enabled, since it could have + // been disabled at runtime due to an issue. + rsm.spacesClient.printErrors() } - return rsm.sendToSpace(ctx) + return nil } -func (rsm *Meta) sendToSpace(ctx context.Context) error { - if !rsm.apiClient.IsLinked() { - rsm.ui.Warn("Failed to post to space because repo is not linked to a Space. Run `turbo link` first.") - return nil - } - - // Wrap the record function so we can hoist out url/errors but keep - // the function signature/type the spinner.WaitFor expects. - var url string - var errs []error - record := func() { - url, errs = rsm.record() - } - +func (rsm *Meta) sendToSpace(ctx context.Context) { + rsm.spacesClient.finishRun(rsm) func() { - _ = spinner.WaitFor(ctx, record, rsm.ui, "...sending run summary...", 1000*time.Millisecond) + _ = spinner.WaitFor(ctx, rsm.spacesClient.Close, rsm.ui, "...sending run summary...", 1000*time.Millisecond) }() - // After the spinner is done, print any errors and the url - if len(errs) > 0 { - rsm.ui.Warn("Errors recording run to Spaces") - for _, err := range errs { - rsm.ui.Warn(fmt.Sprintf("%v", err)) - } - } + rsm.spacesClient.printErrors() + url := rsm.spacesClient.run.URL if url != "" { rsm.ui.Output(fmt.Sprintf("Run: %s", url)) rsm.ui.Output("") } - - return nil } // closeDryRun wraps up the Run Summary at the end of `turbo run --dry`. @@ -259,91 +242,11 @@ func (rsm *Meta) save() error { return summaryPath.WriteFile(json, 0644) } -// record sends the summary to the API -func (rsm *Meta) record() (string, []error) { - errs := []error{} - - // Right now we'll send the POST to create the Run and the subsequent task payloads - // after all execution is done, but in the future, this first POST request - // can happen when the Run actually starts, so we can send updates to the associated Space - // as tasks complete. - createRunEndpoint := fmt.Sprintf(runsEndpoint, rsm.spaceID) - response := &spacesRunResponse{} - - payload := rsm.newSpacesRunCreatePayload() - if startPayload, err := json.Marshal(payload); err == nil { - if resp, err := rsm.apiClient.JSONPost(createRunEndpoint, startPayload); err != nil { - errs = append(errs, fmt.Errorf("POST %s: %w", createRunEndpoint, err)) - } else { - if err := json.Unmarshal(resp, response); err != nil { - errs = append(errs, fmt.Errorf("Error unmarshaling response: %w", err)) - } - } +// CloseTask posts the result of the Task to Spaces +func (rsm *Meta) CloseTask(task *TaskSummary) { + if rsm.spacesClient.enabled { + rsm.spacesClient.postTask(task) } - - if response.ID != "" { - if taskErrs := rsm.postTaskSummaries(response.ID); len(taskErrs) > 0 { - errs = append(errs, taskErrs...) - } - - if donePayload, err := json.Marshal(newSpacesDonePayload(rsm.RunSummary)); err == nil { - patchURL := fmt.Sprintf(runsPatchEndpoint, rsm.spaceID, response.ID) - if _, err := rsm.apiClient.JSONPatch(patchURL, donePayload); err != nil { - errs = append(errs, fmt.Errorf("PATCH %s: %w", patchURL, err)) - } - } - } - - if len(errs) > 0 { - return response.URL, errs - } - - return response.URL, nil -} - -func (rsm *Meta) postTaskSummaries(runID string) []error { - errs := []error{} - // We make at most 8 requests at a time. - maxParallelRequests := 8 - taskSummaries := rsm.RunSummary.Tasks - taskCount := len(taskSummaries) - taskURL := fmt.Sprintf(tasksEndpoint, rsm.spaceID, runID) - - parallelRequestCount := maxParallelRequests - if taskCount < maxParallelRequests { - parallelRequestCount = taskCount - } - - queue := make(chan int, taskCount) - - wg := &sync.WaitGroup{} - for i := 0; i < parallelRequestCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for index := range queue { - task := taskSummaries[index] - payload := newSpacesTaskPayload(task) - if taskPayload, err := json.Marshal(payload); err == nil { - if _, err := rsm.apiClient.JSONPost(taskURL, taskPayload); err != nil { - errs = append(errs, fmt.Errorf("Error sending %s summary to space: %w", task.TaskID, err)) - } - } - } - }() - } - - for index := range taskSummaries { - queue <- index - } - close(queue) - wg.Wait() - - if len(errs) > 0 { - return errs - } - - return nil } func getUser(envVars env.EnvironmentVariableMap, dir turbopath.AbsoluteSystemPath) string { diff --git a/cli/internal/runsummary/spaces.go b/cli/internal/runsummary/spaces.go index f197fa8b7242c..2026d87295a0b 100644 --- a/cli/internal/runsummary/spaces.go +++ b/cli/internal/runsummary/spaces.go @@ -1,15 +1,252 @@ package runsummary import ( + "encoding/json" + "fmt" + "sync" + + "github.com/mitchellh/cli" "github.com/vercel/turbo/cli/internal/ci" + "github.com/vercel/turbo/cli/internal/client" ) -// spacesRunResponse deserialized the response from POST Run endpoint -type spacesRunResponse struct { +const runsEndpoint = "/v0/spaces/%s/runs" +const runsPatchEndpoint = "/v0/spaces/%s/runs/%s" +const tasksEndpoint = "/v0/spaces/%s/runs/%s/tasks" + +// spaceRequest contains all the information for a single request to Spaces +type spaceRequest struct { + method string + url string + body interface{} + makeURL func(self *spaceRequest, r *spaceRun) error // Should set url on self + onDone func(self *spaceRequest, response []byte) // Handler for when request completes +} + +func (req *spaceRequest) error(msg string) error { + return fmt.Errorf("[%s] %s: %s", req.method, req.url, msg) +} + +type spacesClient struct { + requests chan *spaceRequest + errors []error + api *client.APIClient + ui cli.Ui + run *spaceRun + runCreated chan struct{} + wg sync.WaitGroup + spaceID string + enabled bool +} + +type spaceRun struct { ID string URL string } +func newSpacesClient(spaceID string, api *client.APIClient, ui cli.Ui) *spacesClient { + c := &spacesClient{ + api: api, + ui: ui, + spaceID: spaceID, + enabled: spaceID != "", + requests: make(chan *spaceRequest), // TODO: give this a size based on tasks + runCreated: make(chan struct{}, 1), + run: &spaceRun{}, + } + + if c.enabled && !c.api.IsLinked() { + c.errors = append(c.errors, fmt.Errorf("Error: experimentalSpaceId is enabled, but repo is not linked to a Space. Run `turbo link --target=spaces`")) + c.enabled = false + } + + return c +} + +// Start receiving and processing requests in 8 goroutines +// There is an additional marker (protected by a mutex) that indicates +// when the first request is done. All other requests are blocked on that one. +// This first request is the POST /run request. We need to block on it because +// the response contains the run ID from the server, which we need to construct the +// URLs of subsequent requests. +func (c *spacesClient) start() { + // Start an immediately invoked go routine that listens for requests coming in from a channel + pending := []*spaceRequest{} + firstRequestStarted := false + + // Create a labeled statement so we can break out of the for loop more easily + + // Setup a for loop that goes infinitely until we break out of it +FirstRequest: + for { + // A select statement that can listen for messages from multiple channels + select { + // listen for new requests coming in + case req, isOpen := <-c.requests: + // If we read from the channel and its already closed, it means + // something went wrong and we are done with the run, but the first + // request either never happened or didn't write to the c.runCreated channel + // to signal that its done. In this case, we need to break out of the forever loop. + if !isOpen { + break FirstRequest + } + // Make the first request right away in a goroutine, + // queue all other requests. When the first request is done, + // we'll get a message on the other channel and break out of this loop + if !firstRequestStarted { + firstRequestStarted = true + go c.dequeueRequest(req) + } else { + pending = append(pending, req) + } + // Wait for c.runCreated channel to be closed and: + case <-c.runCreated: + // 1. flush pending requests + for _, req := range pending { + go c.dequeueRequest(req) + } + + // 2. break out of the forever loop. + break FirstRequest + } + } + + // and then continue listening for more requests as they come in until the channel is closed + for req := range c.requests { + go c.dequeueRequest(req) + } +} + +func (c *spacesClient) makeRequest(req *spaceRequest) { + // The runID is required for POST task requests and PATCH run request URLS, + // so we have to construct these URLs lazily with a `makeURL` affordance. + // + // We are assuming that if makeURL is defined, this is NOT the first request. + // This is not a great assumption, and will fail if our endpoint URLs change later. + // + // Secondly, if makeURL _is_ defined, we call it, and if there are any errors, we exit early. + // We are doing this check before any of the other basic checks (e.g. the existence of a spaceID) + // becaus in the case the repo is not linked to a space, we don't want to print those errors + // for every request that fails. On the other hand, if that POST /run request fails, and N + // requests fail after that as a consequence, it is ok to print all of those errors. + if req.makeURL != nil { + if err := req.makeURL(req, c.run); err != nil { + c.errors = append(c.errors, err) + return + } + } + + // We only care about POST and PATCH right now + if req.method != "POST" && req.method != "PATCH" { + c.errors = append(c.errors, req.error(fmt.Sprintf("Unsupported method %s", req.method))) + return + } + + payload, err := json.Marshal(req.body) + if err != nil { + c.errors = append(c.errors, req.error(fmt.Sprintf("Failed to create payload: %s", err))) + return + } + + // Make the request + var resp []byte + var reqErr error + if req.method == "POST" { + resp, reqErr = c.api.JSONPost(req.url, payload) + } else if req.method == "PATCH" { + resp, reqErr = c.api.JSONPatch(req.url, payload) + } else { + c.errors = append(c.errors, req.error("Unsupported request method")) + } + + if reqErr != nil { + c.errors = append(c.errors, req.error(fmt.Sprintf("%s", reqErr))) + return + } + + // Call the onDone handler if there is one + if req.onDone != nil { + req.onDone(req, resp) + } +} + +func (c *spacesClient) createRun(rsm *Meta) { + c.queueRequest(&spaceRequest{ + method: "POST", + url: fmt.Sprintf(runsEndpoint, c.spaceID), + body: newSpacesRunCreatePayload(rsm), + + // handler for when the request finishes. We set the response into a struct on the client + // because we need the run ID and URL from the server later. + onDone: func(req *spaceRequest, response []byte) { + if err := json.Unmarshal(response, c.run); err != nil { + c.errors = append(c.errors, req.error(fmt.Sprintf("Error unmarshaling response: %s", err))) + } + + // close the run.created channel, because all other requests are blocked on it + close(c.runCreated) + }, + }) +} + +func (c *spacesClient) postTask(task *TaskSummary) { + c.queueRequest(&spaceRequest{ + method: "POST", + makeURL: func(self *spaceRequest, run *spaceRun) error { + if run.ID == "" { + return fmt.Errorf("No Run ID found to post task %s", task.TaskID) + } + self.url = fmt.Sprintf(tasksEndpoint, c.spaceID, run.ID) + return nil + }, + body: newSpacesTaskPayload(task), + }) +} + +func (c *spacesClient) finishRun(rsm *Meta) { + c.queueRequest(&spaceRequest{ + method: "PATCH", + makeURL: func(self *spaceRequest, run *spaceRun) error { + if run.ID == "" { + return fmt.Errorf("No Run ID found to send PATCH request") + } + self.url = fmt.Sprintf(runsPatchEndpoint, c.spaceID, run.ID) + return nil + }, + body: newSpacesDonePayload(rsm.RunSummary), + }) +} + +// queueRequest adds the given request to the requests channel and increments the waitGroup counter +func (c *spacesClient) queueRequest(req *spaceRequest) { + c.wg.Add(1) + c.requests <- req +} + +// dequeueRequest makes the request in a go routine and decrements the waitGroup counter +func (c *spacesClient) dequeueRequest(req *spaceRequest) { + defer c.wg.Done() + c.makeRequest(req) +} + +func (c *spacesClient) printErrors() { + // Print any errors + if len(c.errors) > 0 { + for _, err := range c.errors { + c.ui.Warn(fmt.Sprintf("%s", err)) + } + } +} + +// Cloe will wait for all requests to finish and then close the channel listening for them +func (c *spacesClient) Close() { + // wait for all requests to finish. + c.wg.Wait() + + // close out the channel, since there should be no more requests. + close(c.requests) +} + type spacesClientSummary struct { ID string `json:"id"` Name string `json:"name"` @@ -56,7 +293,7 @@ type spacesTask struct { Logs string `json:"log"` } -func (rsm *Meta) newSpacesRunCreatePayload() *spacesRunPayload { +func newSpacesRunCreatePayload(rsm *Meta) *spacesRunPayload { startTime := rsm.RunSummary.ExecutionSummary.startedAt.UnixMilli() context := "LOCAL" if name := ci.Constant(); name != "" {