|
1 | 1 | package main
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "bytes" |
4 | 5 | "context"
|
| 6 | + "encoding/json" |
| 7 | + "errors" |
5 | 8 | "flag"
|
6 |
| - "io" |
| 9 | + "fmt" |
| 10 | + "log" |
7 | 11 | "log/slog"
|
8 | 12 | "net/http"
|
9 | 13 | "os"
|
| 14 | + "strconv" |
| 15 | + "time" |
10 | 16 |
|
11 | 17 | "dagger.io/dagger"
|
12 | 18 | "github.com/franela/pocketci/pocketci"
|
13 | 19 | )
|
14 | 20 |
|
15 |
| -var verbose = flag.Bool("verbose", false, "whether to enable verbose output") |
| 21 | +var ( |
| 22 | + controlPlane = flag.String("control-plane", "", "url to control plane host") |
| 23 | + interval = flag.Duration("interval", 5*time.Second, "interval between pipeline polls") |
| 24 | + runnerName = flag.String("runner-name", "", "name of the runner that identifies it") |
| 25 | + parallelism = flag.Int("parallelism", 10, "max number of dagger calls to run in parallel") |
| 26 | + |
| 27 | + ErrNoPipeline = errors.New("no pipeline to run") |
| 28 | +) |
16 | 29 |
|
17 | 30 | func main() {
|
18 | 31 | flag.Parse()
|
19 | 32 |
|
| 33 | + if *controlPlane == "" { |
| 34 | + log.Fatalf("control-plane must be specified and be a valid url") |
| 35 | + } |
| 36 | + if *runnerName == "" { |
| 37 | + log.Fatalf("runner-name must be specified") |
| 38 | + } |
| 39 | + |
20 | 40 | ctx := context.Background()
|
21 |
| - out := io.Discard |
22 |
| - if *verbose { |
23 |
| - out = os.Stderr |
| 41 | + |
| 42 | + client, err := dagger.Connect(ctx, dagger.WithLogOutput(os.Stderr)) |
| 43 | + if err != nil { |
| 44 | + log.Fatalf("failed to connect to dagger client: %s", err) |
| 45 | + } |
| 46 | + |
| 47 | + mu := make(chan bool, *parallelism) |
| 48 | + for i := 0; i < *parallelism; i++ { |
| 49 | + mu <- true |
24 | 50 | }
|
25 |
| - client, err := dagger.Connect(ctx, dagger.WithLogOutput(out)) |
| 51 | + |
| 52 | + githubUser := os.Getenv("GITHUB_USERNAME") |
| 53 | + githubPass := os.Getenv("GITHUB_TOKEN") |
| 54 | + netrc := client.SetSecret("github_auth", fmt.Sprintf("machine github.com login %s password %s", githubUser, githubPass)) |
| 55 | + |
| 56 | + for { |
| 57 | + pipeline, err := getPipeline(ctx) |
| 58 | + if err != nil && !errors.Is(err, ErrNoPipeline) { |
| 59 | + log.Fatalf("failed to fetch pipeline: %s", err) |
| 60 | + } |
| 61 | + |
| 62 | + if errors.Is(err, ErrNoPipeline) { |
| 63 | + slog.Info("no pipeline to run") |
| 64 | + time.Sleep(*interval) |
| 65 | + continue |
| 66 | + } |
| 67 | + |
| 68 | + go func() { |
| 69 | + // wait for parallelism |
| 70 | + <-mu |
| 71 | + defer func() { |
| 72 | + mu <- true |
| 73 | + }() |
| 74 | + |
| 75 | + run(ctx, client, netrc, pipeline) |
| 76 | + pipelineDone(pipeline) |
| 77 | + }() |
| 78 | + |
| 79 | + time.Sleep(*interval) |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +func pipelineDone(pipeline *pocketci.PocketciPipeline) { |
| 84 | + res, err := http.Post(*controlPlane+"/pipelines/"+strconv.Itoa(pipeline.ID), "application/json", nil) |
26 | 85 | if err != nil {
|
27 |
| - slog.Error("failed to connect to dagger", slog.String("error", err.Error())) |
| 86 | + slog.Error("could not mark pipeline as done", slog.String("error", err.Error())) |
| 87 | + return |
28 | 88 | }
|
29 |
| - defer client.Close() |
| 89 | + defer res.Body.Close() |
30 | 90 |
|
31 |
| - server, err := pocketci.NewServer(client, pocketci.ServerOptions{ |
32 |
| - GithubUsername: os.Getenv("GITHUB_USERNAME"), |
33 |
| - GithubPassword: os.Getenv("GITHUB_TOKEN"), |
34 |
| - GithubSignature: os.Getenv("X_HUB_SIGNATURE"), |
35 |
| - }) |
| 91 | + if res.StatusCode != http.StatusNoContent { |
| 92 | + slog.Error("could not mark pipeline as done", slog.Int("status_code", res.StatusCode)) |
| 93 | + } |
| 94 | + slog.Info("pipeline is done", slog.Int("pipeline", pipeline.ID)) |
| 95 | +} |
| 96 | + |
| 97 | +func getPipeline(ctx context.Context) (*pocketci.PocketciPipeline, error) { |
| 98 | + buf := bytes.NewBuffer([]byte{}) |
| 99 | + if err := json.NewEncoder(buf).Encode(pocketci.PipelineClaimRequest{RunnerName: *runnerName}); err != nil { |
| 100 | + return nil, err |
| 101 | + } |
| 102 | + |
| 103 | + res, err := http.Post(*controlPlane+"/pipelines/claim", "application/json", buf) |
36 | 104 | if err != nil {
|
37 |
| - slog.Error("failed to create pocketci server", slog.String("error", err.Error())) |
| 105 | + return nil, err |
| 106 | + } |
| 107 | + if res.StatusCode == http.StatusNoContent { |
| 108 | + return nil, ErrNoPipeline |
38 | 109 | }
|
39 | 110 |
|
40 |
| - mux := http.NewServeMux() |
41 |
| - mux.Handle("/", server) |
42 |
| - srv := &http.Server{ |
43 |
| - Addr: ":8080", |
44 |
| - Handler: mux, |
| 111 | + pipeline := &pocketci.PocketciPipeline{} |
| 112 | + if err := json.NewDecoder(res.Body).Decode(pipeline); err != nil { |
| 113 | + return nil, err |
45 | 114 | }
|
46 | 115 |
|
47 |
| - slog.Info("starting pocketci at 8080") |
48 |
| - if err = srv.ListenAndServe(); err != nil { |
49 |
| - slog.Error("server exited", slog.String("error", err.Error())) |
| 116 | + return pipeline, nil |
| 117 | +} |
| 118 | + |
| 119 | +func run(ctx context.Context, dag *dagger.Client, netrc *dagger.Secret, req *pocketci.PocketciPipeline) { |
| 120 | + repoUrl := "https://github.com/" + req.Repository |
| 121 | + slog.Info("cloning repository", slog.String("repository", repoUrl), |
| 122 | + slog.String("ref", req.GitInfo.Branch), slog.String("sha", req.GitInfo.SHA)) |
| 123 | + |
| 124 | + repo, err := pocketci.BaseContainer(dag). |
| 125 | + WithEnvVariable("CACHE_BUST", time.Now().String()). |
| 126 | + WithMountedSecret("/root/.netrc", netrc). |
| 127 | + WithExec([]string{"git", "clone", "--single-branch", "--branch", req.GitInfo.Branch, "--depth", "1", repoUrl, "/app"}). |
| 128 | + WithWorkdir("/app"). |
| 129 | + WithExec([]string{"git", "checkout", req.GitInfo.SHA}). |
| 130 | + Directory("/app"). |
| 131 | + Sync(ctx) |
| 132 | + if err != nil { |
| 133 | + slog.Error("failed to clonse github repository", slog.String("error", err.Error()), |
| 134 | + slog.String("repository", repoUrl), slog.String("ref", req.GitInfo.Branch), slog.String("sha", req.GitInfo.SHA)) |
| 135 | + return |
| 136 | + } |
| 137 | + |
| 138 | + vars := map[string]string{ |
| 139 | + "GITHUB_SHA": req.GitInfo.SHA, |
| 140 | + "GITHUB_ACTIONS": "true", |
| 141 | + } |
| 142 | + |
| 143 | + slog.Info("launching pocketci agent container", |
| 144 | + slog.String("repository_name", req.Repository), slog.String("pipeline", req.Name), |
| 145 | + slog.String("ref", req.GitInfo.Branch), slog.String("sha", req.GitInfo.SHA), |
| 146 | + slog.String("module", req.Module), slog.String("exec", req.Call), |
| 147 | + slog.String("runs_on", req.Runner)) |
| 148 | + |
| 149 | + call := fmt.Sprintf("dagger call --progress plain %s", req.Call) |
| 150 | + if req.Module != "" && req.Module != "." { |
| 151 | + call = fmt.Sprintf("dagger call -m %s --progress plain %s", req.Module, req.Call) |
| 152 | + } |
| 153 | + stdout, err := pocketci.AgentContainer(dag). |
| 154 | + WithEnvVariable("CACHE_BUST", time.Now().String()). |
| 155 | + WithEnvVariable("DAGGER_CLOUD_TOKEN", os.Getenv("DAGGER_CLOUD_TOKEN")). |
| 156 | + WithDirectory("/app", repo). |
| 157 | + WithWorkdir("/app"). |
| 158 | + WithEnvVariable("CI", "pocketci"). |
| 159 | + With(func(c *dagger.Container) *dagger.Container { |
| 160 | + for key, val := range vars { |
| 161 | + c = c.WithEnvVariable(key, val) |
| 162 | + } |
| 163 | + script := fmt.Sprintf("unset TRACEPARENT;unset OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf;unset OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:38015;unset OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf;unset OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:38015/v1/traces;unset OTEL_EXPORTER_OTLP_TRACES_LIVE=1;unset OTEL_EXPORTER_OTLP_LOGS_PROTOCOL=http/protobuf;unset OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=http://127.0.0.1:38015/v1/logs;unset OTEL_EXPORTER_OTLP_METRICS_PROTOCOL=http/protobuf;unset OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://127.0.0.1:38015/v1/metrics; %s", call) |
| 164 | + return c.WithExec([]string{"sh", "-c", script}, dagger.ContainerWithExecOpts{ |
| 165 | + ExperimentalPrivilegedNesting: true, |
| 166 | + }) |
| 167 | + }). |
| 168 | + Stdout(ctx) |
| 169 | + if err != nil { |
| 170 | + return |
50 | 171 | }
|
| 172 | + fmt.Println(stdout) |
51 | 173 | }
|
0 commit comments