Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: nested exec simplifications and service fixes #7213

Merged
merged 5 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 16 additions & 25 deletions cmd/shim/main.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/containerd/console"
"github.com/google/uuid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/runtime-spec/specs-go"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -501,11 +500,8 @@ func setupBundle() (returnExitCode int) {
}

var searchDomains []string
for _, parentClientID := range execMetadata.ParentClientIDs {
searchDomains = append(searchDomains, network.ClientDomain(parentClientID))
}
if len(searchDomains) > 0 {
spec.Process.Env = append(spec.Process.Env, "_DAGGER_PARENT_CLIENT_IDS="+strings.Join(execMetadata.ParentClientIDs, " "))
if ns := execMetadata.ServerID; ns != "" {
searchDomains = append(searchDomains, network.ClientDomain(ns))
}

var hostsFilePath string
Expand Down Expand Up @@ -543,7 +539,7 @@ func setupBundle() (returnExitCode int) {
keepEnv := []string{}
for _, env := range spec.Process.Env {
switch {
case strings.HasPrefix(env, "_DAGGER_ENABLE_NESTING="):
case strings.HasPrefix(env, "_DAGGER_NESTED_CLIENT_ID="):
// keep the env var; we use it at runtime
keepEnv = append(keepEnv, env)

Expand All @@ -562,6 +558,9 @@ func setupBundle() (returnExitCode int) {
Source: "/run/buildkit/buildkitd.sock",
})
case strings.HasPrefix(env, "_DAGGER_SERVER_ID="):
case strings.HasPrefix(env, "_DAGGER_ENGINE_VERSION="):
// don't need this at runtime, it is just for invalidating cache, which
// has already happened by now
case strings.HasPrefix(env, aliasPrefix):
// NB: don't keep this env var, it's only for the bundling step
// keepEnv = append(keepEnv, env)
Expand Down Expand Up @@ -715,7 +714,8 @@ func internalEnv(name string) (string, bool) {
}

func runWithNesting(ctx context.Context, cmd *exec.Cmd) error {
if _, found := internalEnv("_DAGGER_ENABLE_NESTING"); !found {
clientID, ok := internalEnv("_DAGGER_NESTED_CLIENT_ID")
if !ok {
// no nesting; run as normal
return execProcess(cmd, true)
}
Expand All @@ -733,25 +733,16 @@ func runWithNesting(ctx context.Context, cmd *exec.Cmd) error {
}
sessionPort := l.Addr().(*net.TCPAddr).Port

parentClientIDsVal, _ := internalEnv("_DAGGER_PARENT_CLIENT_IDS")

clientParams := client.Params{
SecretToken: sessionToken.String(),
RunnerHost: "unix:///.runner.sock",
ParentClientIDs: strings.Fields(parentClientIDsVal),
}

if _, ok := internalEnv("_DAGGER_ENABLE_NESTING_IN_SAME_SESSION"); ok {
serverID, ok := internalEnv("_DAGGER_SERVER_ID")
if !ok {
return fmt.Errorf("missing _DAGGER_SERVER_ID")
}
clientParams.ServerID = serverID
serverID, ok := internalEnv("_DAGGER_SERVER_ID")
if !ok {
return errors.New("missing nested client server ID")
}

moduleCallerDigest, ok := internalEnv("_DAGGER_MODULE_CALLER_DIGEST")
if ok {
clientParams.ModuleCallerDigest = digest.Digest(moduleCallerDigest)
clientParams := client.Params{
ID: clientID,
ServerID: serverID,
SecretToken: sessionToken.String(),
RunnerHost: "unix:///.runner.sock",
}

sess, ctx, err := client.Connect(ctx, clientParams)
Expand Down
2 changes: 2 additions & 0 deletions core/c2h.go
Expand Up @@ -20,6 +20,7 @@ type c2hTunnel struct {
upstreamHost string
tunnelServiceHost string
tunnelServicePorts []PortForward
sessionID string
}

func (d *c2hTunnel) Tunnel(ctx context.Context) (rerr error) {
Expand Down Expand Up @@ -56,6 +57,7 @@ func (d *c2hTunnel) Tunnel(ctx context.Context) (rerr error) {
upstream := NewHostIPSocket(
port.Protocol.Network(),
fmt.Sprintf("%s:%d", d.upstreamHost, port.Backend),
d.sessionID,
)

sockPath := fmt.Sprintf("/upstream.%d.sock", frontend)
Expand Down
52 changes: 22 additions & 30 deletions core/container.go
Expand Up @@ -1007,16 +1007,22 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts

// this allows executed containers to communicate back to this API
if opts.ExperimentalPrivilegedNesting {
// include the engine version so that these execs get invalidated if the engine/API change
runOpts = append(runOpts, llb.AddEnv("_DAGGER_ENABLE_NESTING", engine.Version))
}

if opts.ModuleCallerDigest != "" {
runOpts = append(runOpts, llb.AddEnv("_DAGGER_MODULE_CALLER_DIGEST", opts.ModuleCallerDigest.String()))
}

if opts.NestedInSameSession {
runOpts = append(runOpts, llb.AddEnv("_DAGGER_ENABLE_NESTING_IN_SAME_SESSION", ""))
callerOpts := opts.NestedExecFunctionCall
if callerOpts == nil {
// default to caching the nested exec
callerOpts = &FunctionCall{
Cache: true,
}
}
clientID, err := container.Query.RegisterCaller(ctx, callerOpts)
if err != nil {
return nil, fmt.Errorf("register caller: %w", err)
}
runOpts = append(runOpts,
llb.AddEnv("_DAGGER_NESTED_CLIENT_ID", clientID),
// include the engine version so that these execs get invalidated if the engine/API change
llb.AddEnv("_DAGGER_ENGINE_VERSION", engine.Version),
)
}

metaSt, metaSourcePath := metaMount(opts.Stdin)
Expand Down Expand Up @@ -1057,13 +1063,7 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
}

// don't pass these through to the container when manually set, they are internal only
if name == "_DAGGER_ENABLE_NESTING" && !opts.ExperimentalPrivilegedNesting {
continue
}
if name == "_DAGGER_MODULE_CALLER_DIGEST" && opts.ModuleCallerDigest == "" {
continue
}
if name == "_DAGGER_ENABLE_NESTING_IN_SAME_SESSION" && !opts.NestedInSameSession {
if name == "_DAGGER_NESTED_CLIENT_ID" && !opts.ExperimentalPrivilegedNesting {
continue
}

Expand Down Expand Up @@ -1188,8 +1188,7 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
return nil, err
}
execMeta := buildkit.ContainerExecUncachedMetadata{
ParentClientIDs: clientMetadata.ClientIDs(),
ServerID: clientMetadata.ServerID,
ServerID: clientMetadata.ServerID,
}
proxyVal, err := execMeta.ToPBFtpProxyVal()
if err != nil {
Expand Down Expand Up @@ -1784,22 +1783,15 @@ type ContainerExecOpts struct {
// Redirect the command's standard error to a file in the container
RedirectStderr string `default:""`

// Provide dagger access to the executed command
// Do not use this option unless you trust the command being executed.
// The command being executed WILL BE GRANTED FULL ACCESS TO YOUR HOST FILESYSTEM
// Provide the executed command access back to the Dagger API
ExperimentalPrivilegedNesting bool `default:"false"`

// Grant the process all root capabilities
InsecureRootCapabilities bool `default:"false"`

// (Internal-only) If this exec is for a module function, this digest will be set in the
// grpc context metadata for any api requests back to the engine. It's used by the API
// server to determine which schema to serve and other module context metadata.
ModuleCallerDigest digest.Digest `name:"-"`

// (Internal-only) Used for module function execs to trigger the nested api client to
// be connected back to the same session.
NestedInSameSession bool `name:"-"`
// (Internal-only) If this is a nested exec for a Function call, this should be set
// with the metadata for that call
NestedExecFunctionCall *FunctionCall `name:"-"`
}

type BuildArg struct {
Expand Down
38 changes: 14 additions & 24 deletions core/git.go
Expand Up @@ -60,14 +60,20 @@ func (*GitRef) TypeDescription() string {

func (ref *GitRef) Tree(ctx context.Context) (*Directory, error) {
bk := ref.Query.Buildkit
st := ref.getState(ctx, bk)
return NewDirectorySt(ctx, ref.Query, *st, "", ref.Repo.Platform, ref.Repo.Services)
st, err := ref.getState(ctx, bk)
if err != nil {
return nil, err
}
return NewDirectorySt(ctx, ref.Query, st, "", ref.Repo.Platform, ref.Repo.Services)
}

func (ref *GitRef) Commit(ctx context.Context) (string, error) {
bk := ref.Query.Buildkit
st := ref.getState(ctx, bk)
p, err := resolveProvenance(ctx, bk, *st)
st, err := ref.getState(ctx, bk)
if err != nil {
return "", err
}
p, err := resolveProvenance(ctx, bk, st)
if err != nil {
return "", err
}
Expand All @@ -77,7 +83,7 @@ func (ref *GitRef) Commit(ctx context.Context) (string, error) {
return p.Sources.Git[0].Commit, nil
}

func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) *llb.State {
func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) (llb.State, error) {
opts := []llb.GitOption{}

if ref.Repo.KeepGitDir {
Expand All @@ -96,26 +102,10 @@ func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) *llb.State
opts = append(opts, llb.AuthHeaderSecret(ref.Repo.AuthHeader.Accessor))
}

useDNS := len(ref.Repo.Services) > 0

clientMetadata, err := engine.ClientMetadataFromContext(ctx)
if err == nil && !useDNS {
useDNS = len(clientMetadata.ParentClientIDs) > 0
if err != nil {
return llb.State{}, err
}

var st llb.State
if useDNS {
// NB: only configure search domains if we're directly using a service, or
// if we're nested beneath another search domain.
//
// we have to be a bit selective here to avoid breaking Dockerfile builds
// that use a Buildkit frontend (# syntax = ...) that doesn't have the
// networks API cap.
//
// TODO: add API cap
Comment on lines -108 to -115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 - I think this comment has been outdated for a while, should be fine to simplify. I might have only even noticed it while testing Bass's Dockerfile # syntax support, which has always been janky because of weird Docker <-> Buildkit <-> buildx versioning weirdness anyway (oci-layout:// source doesn't work for example, so no Container.import). Either way, the c2c networking code doesn't use a networks API cap anymore.

edit: oh wait, this is my own change lol. well I APPROVE

st = gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.ClientIDs(), opts...)
} else {
st = llb.Git(ref.Repo.URL, ref.Ref, opts...)
}
return &st
return gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.ServerID, opts...), nil
}
112 changes: 112 additions & 0 deletions core/integration/module_test.go
Expand Up @@ -5628,6 +5628,118 @@ func TestModuleUnicodePath(t *testing.T) {
require.JSONEq(t, `{"test":{"hello":"hello"}}`, out)
}

func TestModuleStartServices(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

t.Parallel()

// regression test for https://github.com/dagger/dagger/pull/6914
t.Run("use service in multiple functions", func(t *testing.T) {
t.Parallel()
c, ctx := connect(t)

out, err := c.Container().From(golangImage).
WithMountedFile(testCLIBinPath, daggerCliFile(t, c)).
WithWorkdir("/work").
With(daggerExec("init", "--source=.", "--name=test", "--sdk=go")).
WithNewFile("/work/main.go", dagger.ContainerWithNewFileOpts{
Contents: `package main
import (
"context"
"fmt"
)
type Test struct {
}
func (m *Test) FnA(ctx context.Context) (*Sub, error) {
svc := dag.Container().
From("python").
WithMountedDirectory(
"/srv/www",
dag.Directory().WithNewFile("index.html", "hey there"),
).
WithWorkdir("/srv/www").
WithExposedPort(23457).
WithExec([]string{"python", "-m", "http.server", "23457"}).
AsService()
ctr := dag.Container().
From("alpine:3.18.6").
WithServiceBinding("svc", svc).
WithExec([]string{"wget", "-O", "-", "http://svc:23457"})
out, err := ctr.Stdout(ctx)
if err != nil {
return nil, err
}
if out != "hey there" {
return nil, fmt.Errorf("unexpected output: %q", out)
}
return &Sub{Ctr: ctr}, nil
}
type Sub struct {
Ctr *Container
}
func (m *Sub) FnB(ctx context.Context) (string, error) {
return m.Ctr.
WithExec([]string{"wget", "-O", "-", "http://svc:23457"}).
Stdout(ctx)
}
`,
}).
With(daggerCall("fnA", "fnB")).
Stdout(ctx)
require.NoError(t, err)
require.Equal(t, "hey there", strings.TrimSpace(out))
})

// regression test for https://github.com/dagger/dagger/issues/6951
t.Run("service in multiple containers", func(t *testing.T) {
t.Parallel()
c, ctx := connect(t)

_, err := c.Container().From(golangImage).
WithMountedFile(testCLIBinPath, daggerCliFile(t, c)).
WithWorkdir("/work").
With(daggerExec("init", "--source=.", "--name=test", "--sdk=go")).
WithNewFile("/work/main.go", dagger.ContainerWithNewFileOpts{
Contents: `package main
import (
"context"
)
type Test struct {
}
func (m *Test) Fn(ctx context.Context) *Container {
redis := dag.Container().
From("redis").
WithExposedPort(6379).
AsService()
cli := dag.Container().
From("redis").
WithoutEntrypoint().
WithServiceBinding("redis", redis)
ctrA := cli.WithExec([]string{"sh", "-c", "redis-cli -h redis info >> /tmp/out.txt"})
file := ctrA.Directory("/tmp").File("/out.txt")
ctrB := dag.Container().
From("alpine").
WithFile("/out.txt", file)
return ctrB.WithExec([]string{"cat", "/out.txt"})
}
`,
}).
With(daggerCall("fn", "stdout")).
Sync(ctx)
require.NoError(t, err)
})
}

func daggerExec(args ...string) dagger.WithContainerFunc {
return func(c *dagger.Container) *dagger.Container {
return c.WithExec(append([]string{"dagger", "--debug"}, args...), dagger.ContainerWithExecOpts{
Expand Down
2 changes: 1 addition & 1 deletion core/interface.go
Expand Up @@ -222,7 +222,7 @@ func (iface *InterfaceType) Install(ctx context.Context, dag *dagql.Server) erro
})
}

res, err := callable.Call(ctx, dagql.CurrentID(ctx), &CallOpts{
res, err := callable.Call(ctx, &CallOpts{
Inputs: callInputs,
ParentVal: runtimeVal.Fields,
})
Expand Down