Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send task summaries as tasks finish #4913

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
5855a25
Create spacesClient struct to make requests to spaces
May 4, 2023
d3efd08
Post run start at the beginning of the run
mehulkar May 12, 2023
9c04987
Close/POST tasks as they finish
mehulkar May 12, 2023
f8bdc26
Remove wrapper method for PATCH request
mehulkar May 12, 2023
87647e0
Remove api client from runsummary struct, only used by spaces client
mehulkar May 12, 2023
d9e7775
Move constants to the file they are used in
mehulkar May 12, 2023
02bef6a
Put all errors in the same place, no need to distinguish
mehulkar May 12, 2023
dcf3001
Make requests via a goroutine and wait on all requests to be done
mehulkar May 12, 2023
195116e
Add another error condition
mehulkar May 12, 2023
ea92a70
Make unlinked error generic and consolidate to one place
mehulkar May 12, 2023
0d17035
Always decrement waitgroup counter if we're throwing away request
mehulkar May 12, 2023
a1cc4da
Make error/help message for linking better
mehulkar May 12, 2023
8e60c59
Better error method
mehulkar May 12, 2023
26480f3
Remove extranneous check
mehulkar May 12, 2023
dedb97e
Move erorr making to req struct instance
mehulkar May 12, 2023
5f59f5d
Consistent error and debug messages
mehulkar May 12, 2023
1601f99
Lazily compute req urls and set response on onDone handler
mehulkar May 12, 2023
39d7e89
Block all requests on the first run POST request
mehulkar May 12, 2023
d05ea23
Shuffle methods around for readability
mehulkar May 12, 2023
b67511b
give spacesClient access to rsm
mehulkar May 12, 2023
37a183c
Consolidate error msg when there is no spaceId
mehulkar May 12, 2023
c7ee5b3
Remove redundant check for runID when posting tasks
mehulkar May 12, 2023
d96d4b1
Better comments
mehulkar May 12, 2023
50d1adf
Standardized error message
mehulkar May 12, 2023
d24bec7
check for run id first so error messages wont be duplicated
mehulkar May 12, 2023
be6c9cd
Parallelize requests into 8 goroutines
mehulkar May 12, 2023
e3cb144
Use a single wait group
mehulkar May 12, 2023
c98a04f
reduce diff
mehulkar May 12, 2023
d74acd8
remove unnecessary wrapper method
mehulkar May 12, 2023
9722fe8
Move the signal internal to spaceRun
mehulkar May 12, 2023
c6c4439
TODO notes
mehulkar May 12, 2023
46382dc
wait for run create channel to close as new requests come in, not before
mehulkar May 12, 2023
ff223fe
Use req.error for unmarshal error
mehulkar May 12, 2023
2e91229
Remove error return, we just log errors
mehulkar May 12, 2023
03f4d78
rearrange
mehulkar May 12, 2023
2260f6d
make explicit start()
mehulkar May 12, 2023
867e4c4
Make requests in goroutine so they are not blocking when they do star…
mehulkar May 12, 2023
95a9c63
comments
mehulkar May 12, 2023
73e354a
Use Duration method instead of manually calculating duration
mehulkar May 12, 2023
abeab8f
move spaceID into spacesClient struct
mehulkar May 12, 2023
0d41fea
Skip all requests if there is no spaceId
mehulkar May 12, 2023
72d1457
Merge branch 'main' into mehulkar/turbo-916-runs-send-task-summaries-…
mehulkar May 12, 2023
580255e
Remove bidirectional relationship
mehulkar May 12, 2023
c1bb83a
better comments
mehulkar May 12, 2023
0159de9
Fix goroutine setup for async task requests (#4924)
mehulkar May 15, 2023
01a3860
Merge branch 'main' into mehulkar/turbo-916-runs-send-task-summaries-…
mehulkar May 15, 2023
62041c0
Wait for requests to complete before closing channel
mehulkar May 15, 2023
6a454db
separate data from resources
mehulkar May 15, 2023
7b3b663
Improve code comments
mehulkar May 15, 2023
2f6521a
Merge branch 'main' into mehulkar/turbo-916-runs-send-task-summaries-…
mehulkar May 16, 2023
f771e8b
Remove unused field
mehulkar May 16, 2023
c9174ec
Merge branch 'main' into mehulkar/turbo-916-runs-send-task-summaries-…
mehulkar May 16, 2023
eb30ac3
Check whether spaces is enabled earlier and fewer times
mehulkar May 16, 2023
682630f
remove extranneous check for spaceId
mehulkar May 16, 2023
cf4e598
use .enabled consistently
mehulkar May 16, 2023
1937654
Check for linked repo once and disable client if no link
mehulkar May 16, 2023
8e8085c
Print errors when disabled
mehulkar May 16, 2023
ae3d8d6
nothing
mehulkar May 16, 2023
1393df5
Handle edge case of channel being closed before the reuqest is done
mehulkar May 16, 2023
110bcd8
only add error for missing link if spaces is enabled
mehulkar May 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func RealRun(
taskSummaries = append(taskSummaries, taskSummary)
// not using defer, just release the lock
mu.Unlock()

runSummary.CloseTask(taskSummary)
}

// Return the error when there is one
Expand Down
9 changes: 9 additions & 0 deletions cli/internal/runsummary/execution_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ type executionSummary struct {
exitCode int
}

func (es *executionSummary) Duration() time.Duration {
ended := es.endedAt
if ended.IsZero() {
ended = time.Now()
}

return ended.Sub(es.startedAt)
}

// MarshalJSON munges the executionSummary into a format we want
// We'll use an anonmyous, private struct for this, so it's not confusingly duplicated.
func (es *executionSummary) MarshalJSON() ([]byte, error) {
Expand Down
3 changes: 1 addition & 2 deletions cli/internal/runsummary/format_execution_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ func (rsm *Meta) printExecutionSummary() {
attempted := summary.ExecutionSummary.attempted
successful := summary.ExecutionSummary.cached + summary.ExecutionSummary.success
cached := summary.ExecutionSummary.cached
// TODO: can we use a method on ExecutionSummary here?
duration := time.Since(summary.ExecutionSummary.startedAt).Truncate(time.Millisecond)
duration := summary.ExecutionSummary.Duration().Truncate(time.Millisecond)

if cached == attempted && attempted > 0 {
terminalProgram := os.Getenv("TERM_PROGRAM")
Expand Down
149 changes: 26 additions & 123 deletions cli/internal/runsummary/run_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package runsummary

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/mitchellh/cli"
Expand All @@ -32,9 +30,6 @@ const NoFrameworkDetected = "<NO FRAMEWORK DETECTED>"
const FrameworkDetectionSkipped = "<FRAMEWORK DETECTION SKIPPED>"

const runSummarySchemaVersion = "0"
const runsEndpoint = "/v0/spaces/%s/runs"
const runsPatchEndpoint = "/v0/spaces/%s/runs/%s"
const tasksEndpoint = "/v0/spaces/%s/runs/%s/tasks"

type runType int

Expand All @@ -53,8 +48,7 @@ type Meta struct {
repoPath turbopath.RelativeSystemPath
singlePackage bool
shouldSave bool
apiClient *client.APIClient
spaceID string
spacesClient *spacesClient
runType runType
synthesizedCommand string
}
Expand Down Expand Up @@ -104,7 +98,7 @@ func NewRunSummary(
executionSummary := newExecutionSummary(synthesizedCommand, repoPath, startAt, profile)

envVars := env.GetEnvMap()
return Meta{
rsm := Meta{
RunSummary: &RunSummary{
ID: ksuid.New(),
Version: runSummarySchemaVersion,
Expand All @@ -123,10 +117,16 @@ func NewRunSummary(
repoRoot: repoRoot,
singlePackage: singlePackage,
shouldSave: shouldSave,
apiClient: apiClient,
spaceID: spaceID,
synthesizedCommand: synthesizedCommand,
}

rsm.spacesClient = newSpacesClient(spaceID, apiClient, ui)
if rsm.spacesClient.enabled {
go rsm.spacesClient.start()
rsm.spacesClient.createRun(&rsm)
}

return rsm
}

// getPath returns a path to where the runSummary is written.
Expand Down Expand Up @@ -164,47 +164,30 @@ func (rsm *Meta) Close(ctx context.Context, exitCode int, workspaceInfos workspa
}

rsm.printExecutionSummary()

// If we don't have a spaceID, we can exit now
if rsm.spaceID == "" {
return nil
if rsm.spacesClient.enabled {
rsm.sendToSpace(ctx)
} else {
// Print any errors if the client is not enabled, since it could have
// been disabled at runtime due to an issue.
rsm.spacesClient.printErrors()
}

return rsm.sendToSpace(ctx)
return nil
}

func (rsm *Meta) sendToSpace(ctx context.Context) error {
if !rsm.apiClient.IsLinked() {
rsm.ui.Warn("Failed to post to space because repo is not linked to a Space. Run `turbo link` first.")
return nil
}

// Wrap the record function so we can hoist out url/errors but keep
// the function signature/type the spinner.WaitFor expects.
var url string
var errs []error
record := func() {
url, errs = rsm.record()
}

func (rsm *Meta) sendToSpace(ctx context.Context) {
rsm.spacesClient.finishRun(rsm)
func() {
_ = spinner.WaitFor(ctx, record, rsm.ui, "...sending run summary...", 1000*time.Millisecond)
_ = spinner.WaitFor(ctx, rsm.spacesClient.Close, rsm.ui, "...sending run summary...", 1000*time.Millisecond)
}()

// After the spinner is done, print any errors and the url
if len(errs) > 0 {
rsm.ui.Warn("Errors recording run to Spaces")
for _, err := range errs {
rsm.ui.Warn(fmt.Sprintf("%v", err))
}
}
rsm.spacesClient.printErrors()

url := rsm.spacesClient.run.URL
if url != "" {
rsm.ui.Output(fmt.Sprintf("Run: %s", url))
rsm.ui.Output("")
}

return nil
}

// closeDryRun wraps up the Run Summary at the end of `turbo run --dry`.
Expand Down Expand Up @@ -248,91 +231,11 @@ func (rsm *Meta) save() error {
return summaryPath.WriteFile(json, 0644)
}

// record sends the summary to the API
func (rsm *Meta) record() (string, []error) {
errs := []error{}

// Right now we'll send the POST to create the Run and the subsequent task payloads
// after all execution is done, but in the future, this first POST request
// can happen when the Run actually starts, so we can send updates to the associated Space
// as tasks complete.
createRunEndpoint := fmt.Sprintf(runsEndpoint, rsm.spaceID)
response := &spacesRunResponse{}

payload := rsm.newSpacesRunCreatePayload()
if startPayload, err := json.Marshal(payload); err == nil {
if resp, err := rsm.apiClient.JSONPost(createRunEndpoint, startPayload); err != nil {
errs = append(errs, fmt.Errorf("POST %s: %w", createRunEndpoint, err))
} else {
if err := json.Unmarshal(resp, response); err != nil {
errs = append(errs, fmt.Errorf("Error unmarshaling response: %w", err))
}
}
// CloseTask posts the result of the Task to Spaces
func (rsm *Meta) CloseTask(task *TaskSummary) {
if rsm.spacesClient.enabled {
rsm.spacesClient.postTask(task)
}

if response.ID != "" {
if taskErrs := rsm.postTaskSummaries(response.ID); len(taskErrs) > 0 {
errs = append(errs, taskErrs...)
}

if donePayload, err := json.Marshal(newSpacesDonePayload(rsm.RunSummary)); err == nil {
patchURL := fmt.Sprintf(runsPatchEndpoint, rsm.spaceID, response.ID)
if _, err := rsm.apiClient.JSONPatch(patchURL, donePayload); err != nil {
errs = append(errs, fmt.Errorf("PATCH %s: %w", patchURL, err))
}
}
}

if len(errs) > 0 {
return response.URL, errs
}

return response.URL, nil
}

func (rsm *Meta) postTaskSummaries(runID string) []error {
errs := []error{}
// We make at most 8 requests at a time.
maxParallelRequests := 8
taskSummaries := rsm.RunSummary.Tasks
taskCount := len(taskSummaries)
taskURL := fmt.Sprintf(tasksEndpoint, rsm.spaceID, runID)

parallelRequestCount := maxParallelRequests
if taskCount < maxParallelRequests {
parallelRequestCount = taskCount
}

queue := make(chan int, taskCount)

wg := &sync.WaitGroup{}
for i := 0; i < parallelRequestCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for index := range queue {
task := taskSummaries[index]
payload := newSpacesTaskPayload(task)
if taskPayload, err := json.Marshal(payload); err == nil {
if _, err := rsm.apiClient.JSONPost(taskURL, taskPayload); err != nil {
errs = append(errs, fmt.Errorf("Error sending %s summary to space: %w", task.TaskID, err))
}
}
}
}()
}

for index := range taskSummaries {
queue <- index
}
close(queue)
wg.Wait()

if len(errs) > 0 {
return errs
}

return nil
}

func getUser(envVars env.EnvironmentVariableMap, dir turbopath.AbsoluteSystemPath) string {
Expand Down
Loading