Skip to content

Commit

Permalink
Send data to Spaces as tasks finish (#4913)
Browse files Browse the repository at this point in the history
This commit attempts to do:
- Make Spaces requests as soon as possible
- Ensure that first request to create the Run blocks all other requests (so we can get the runID)
- Not block task execution
  • Loading branch information
mehulkar committed May 16, 2023
1 parent 721bfcf commit 9cb5abc
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 128 deletions.
2 changes: 2 additions & 0 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func RealRun(
taskSummaries = append(taskSummaries, taskSummary)
// not using defer, just release the lock
mu.Unlock()

runSummary.CloseTask(taskSummary)
}

// Return the error when there is one
Expand Down
9 changes: 9 additions & 0 deletions cli/internal/runsummary/execution_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ type executionSummary struct {
exitCode int
}

func (es *executionSummary) Duration() time.Duration {
ended := es.endedAt
if ended.IsZero() {
ended = time.Now()
}

return ended.Sub(es.startedAt)
}

// MarshalJSON munges the executionSummary into a format we want
// We'll use an anonmyous, private struct for this, so it's not confusingly duplicated.
func (es *executionSummary) MarshalJSON() ([]byte, error) {
Expand Down
3 changes: 1 addition & 2 deletions cli/internal/runsummary/format_execution_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (rsm *Meta) printExecutionSummary() {
successful := summary.ExecutionSummary.cached + summary.ExecutionSummary.success
failed := rsm.RunSummary.getFailedTasks() // Note: ExecutionSummary.failure exists, but we need the task names
cached := summary.ExecutionSummary.cached
// TODO: can we use a method on ExecutionSummary here?
duration := time.Since(summary.ExecutionSummary.startedAt).Truncate(time.Millisecond)
duration := summary.ExecutionSummary.Duration().Truncate(time.Millisecond)

if cached == attempted && attempted > 0 {
terminalProgram := os.Getenv("TERM_PROGRAM")
Expand Down
149 changes: 26 additions & 123 deletions cli/internal/runsummary/run_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package runsummary

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/mitchellh/cli"
Expand All @@ -32,9 +30,6 @@ const NoFrameworkDetected = "<NO FRAMEWORK DETECTED>"
const FrameworkDetectionSkipped = "<FRAMEWORK DETECTION SKIPPED>"

const runSummarySchemaVersion = "0"
const runsEndpoint = "/v0/spaces/%s/runs"
const runsPatchEndpoint = "/v0/spaces/%s/runs/%s"
const tasksEndpoint = "/v0/spaces/%s/runs/%s/tasks"

type runType int

Expand All @@ -53,8 +48,7 @@ type Meta struct {
repoPath turbopath.RelativeSystemPath
singlePackage bool
shouldSave bool
apiClient *client.APIClient
spaceID string
spacesClient *spacesClient
runType runType
synthesizedCommand string
}
Expand Down Expand Up @@ -104,7 +98,7 @@ func NewRunSummary(
executionSummary := newExecutionSummary(synthesizedCommand, repoPath, startAt, profile)

envVars := env.GetEnvMap()
return Meta{
rsm := Meta{
RunSummary: &RunSummary{
ID: ksuid.New(),
Version: runSummarySchemaVersion,
Expand All @@ -123,10 +117,16 @@ func NewRunSummary(
repoRoot: repoRoot,
singlePackage: singlePackage,
shouldSave: shouldSave,
apiClient: apiClient,
spaceID: spaceID,
synthesizedCommand: synthesizedCommand,
}

rsm.spacesClient = newSpacesClient(spaceID, apiClient, ui)
if rsm.spacesClient.enabled {
go rsm.spacesClient.start()
rsm.spacesClient.createRun(&rsm)
}

return rsm
}

// getPath returns a path to where the runSummary is written.
Expand Down Expand Up @@ -164,47 +164,30 @@ func (rsm *Meta) Close(ctx context.Context, exitCode int, workspaceInfos workspa
}

rsm.printExecutionSummary()

// If we don't have a spaceID, we can exit now
if rsm.spaceID == "" {
return nil
if rsm.spacesClient.enabled {
rsm.sendToSpace(ctx)
} else {
// Print any errors if the client is not enabled, since it could have
// been disabled at runtime due to an issue.
rsm.spacesClient.printErrors()
}

return rsm.sendToSpace(ctx)
return nil
}

func (rsm *Meta) sendToSpace(ctx context.Context) error {
if !rsm.apiClient.IsLinked() {
rsm.ui.Warn("Failed to post to space because repo is not linked to a Space. Run `turbo link` first.")
return nil
}

// Wrap the record function so we can hoist out url/errors but keep
// the function signature/type the spinner.WaitFor expects.
var url string
var errs []error
record := func() {
url, errs = rsm.record()
}

func (rsm *Meta) sendToSpace(ctx context.Context) {
rsm.spacesClient.finishRun(rsm)
func() {
_ = spinner.WaitFor(ctx, record, rsm.ui, "...sending run summary...", 1000*time.Millisecond)
_ = spinner.WaitFor(ctx, rsm.spacesClient.Close, rsm.ui, "...sending run summary...", 1000*time.Millisecond)
}()

// After the spinner is done, print any errors and the url
if len(errs) > 0 {
rsm.ui.Warn("Errors recording run to Spaces")
for _, err := range errs {
rsm.ui.Warn(fmt.Sprintf("%v", err))
}
}
rsm.spacesClient.printErrors()

url := rsm.spacesClient.run.URL
if url != "" {
rsm.ui.Output(fmt.Sprintf("Run: %s", url))
rsm.ui.Output("")
}

return nil
}

// closeDryRun wraps up the Run Summary at the end of `turbo run --dry`.
Expand Down Expand Up @@ -259,91 +242,11 @@ func (rsm *Meta) save() error {
return summaryPath.WriteFile(json, 0644)
}

// record sends the summary to the API
func (rsm *Meta) record() (string, []error) {
errs := []error{}

// Right now we'll send the POST to create the Run and the subsequent task payloads
// after all execution is done, but in the future, this first POST request
// can happen when the Run actually starts, so we can send updates to the associated Space
// as tasks complete.
createRunEndpoint := fmt.Sprintf(runsEndpoint, rsm.spaceID)
response := &spacesRunResponse{}

payload := rsm.newSpacesRunCreatePayload()
if startPayload, err := json.Marshal(payload); err == nil {
if resp, err := rsm.apiClient.JSONPost(createRunEndpoint, startPayload); err != nil {
errs = append(errs, fmt.Errorf("POST %s: %w", createRunEndpoint, err))
} else {
if err := json.Unmarshal(resp, response); err != nil {
errs = append(errs, fmt.Errorf("Error unmarshaling response: %w", err))
}
}
// CloseTask posts the result of the Task to Spaces
func (rsm *Meta) CloseTask(task *TaskSummary) {
if rsm.spacesClient.enabled {
rsm.spacesClient.postTask(task)
}

if response.ID != "" {
if taskErrs := rsm.postTaskSummaries(response.ID); len(taskErrs) > 0 {
errs = append(errs, taskErrs...)
}

if donePayload, err := json.Marshal(newSpacesDonePayload(rsm.RunSummary)); err == nil {
patchURL := fmt.Sprintf(runsPatchEndpoint, rsm.spaceID, response.ID)
if _, err := rsm.apiClient.JSONPatch(patchURL, donePayload); err != nil {
errs = append(errs, fmt.Errorf("PATCH %s: %w", patchURL, err))
}
}
}

if len(errs) > 0 {
return response.URL, errs
}

return response.URL, nil
}

func (rsm *Meta) postTaskSummaries(runID string) []error {
errs := []error{}
// We make at most 8 requests at a time.
maxParallelRequests := 8
taskSummaries := rsm.RunSummary.Tasks
taskCount := len(taskSummaries)
taskURL := fmt.Sprintf(tasksEndpoint, rsm.spaceID, runID)

parallelRequestCount := maxParallelRequests
if taskCount < maxParallelRequests {
parallelRequestCount = taskCount
}

queue := make(chan int, taskCount)

wg := &sync.WaitGroup{}
for i := 0; i < parallelRequestCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for index := range queue {
task := taskSummaries[index]
payload := newSpacesTaskPayload(task)
if taskPayload, err := json.Marshal(payload); err == nil {
if _, err := rsm.apiClient.JSONPost(taskURL, taskPayload); err != nil {
errs = append(errs, fmt.Errorf("Error sending %s summary to space: %w", task.TaskID, err))
}
}
}
}()
}

for index := range taskSummaries {
queue <- index
}
close(queue)
wg.Wait()

if len(errs) > 0 {
return errs
}

return nil
}

func getUser(envVars env.EnvironmentVariableMap, dir turbopath.AbsoluteSystemPath) string {
Expand Down
Loading

0 comments on commit 9cb5abc

Please sign in to comment.