Skip to content

Commit

Permalink
engine: nested exec simplifications and service fixes (dagger#7213)
Browse files Browse the repository at this point in the history
* engine: consolidate IDs and re-use servers for nested execs

This is an internal only refactor, though it fixes a few bugs while also
simplifying quite a bit and setting us up for more simplifications soon.

The biggest change is that nested execs connect back to the same server
as the main client caller rather than being completely independent.
* This is required for the fix to services used in modules (separate PR)
  to fully work
* It also should fix the lack of docker auth in many of our integ tests,
  specifically those that use nested execs, which leads to dockerhub rate
  limiting

Along the way it also does some consolidation of IDs, removing
ModuleCallerDigest and just exclusively using ClientID. This requires
that we tell module functions and other nested execs which ID to use,
but that itself is setup for even more simplifications in follow-ups (we
can remove the need for the current DaggerServer construct entirely,
among other things).

Signed-off-by: Erik Sipsma <[email protected]>

* namespace services by server, not by client

Previously it was possible to start a dependent service in one module
API call, and then use it again in a later call, only to have it fail
because it cannot resolve the service address, even though it's still
running. This happened because each invocation has its own client ID,
and client IDs were used to build service addresses.

This change brings service addresses into alignment with the recent
change to uniq them by service ID instead of client ID. The overall
effect is that services are deduped within a Dagger invocation, even
across module calls. So with this change, the service will just stay
running and be re-used by a later call, thanks to the grace period.

Signed-off-by: Alex Suraci <[email protected]>

* fix routing of host services to correct client

Signed-off-by: Erik Sipsma <[email protected]>

* deregister secret tokens once client disconnects

We previously never explicitly removed client ID -> secret token
mappings because it theoretically opened more possibilities for
malicious attempts to register a client ID with a different token.

However, we need to deregister these now since Client IDs are a content
hash of the function call/nested exec definition, which means the same
client ID can connect and disconnect multiple times per server.

The security implications of this also end up being extremely minimal.
Registering a client ID with a different secret token was and still is
possible *before* a client fully connects. It is possible to after a
client disconnects now but this would only amount to a DOS since the
"real" client would just be unable to connect. No information would be
leaked. It also would have to be in the same server (i.e. a module or
nested exec called by the main client directly or transitively).

This issue can also be squashed by not leaking the buildkit sock to
nested execs/modules, which is possible now by migrating functionality
from our shim to our custom executor. There's no immediate plans to do
this but the possibility is open whenever needed (or when we make that
change for other reasons).

Signed-off-by: Erik Sipsma <[email protected]>

* add integ test coverage

Signed-off-by: Erik Sipsma <[email protected]>

---------

Signed-off-by: Erik Sipsma <[email protected]>
Signed-off-by: Alex Suraci <[email protected]>
Signed-off-by: Erik Sipsma <[email protected]>
Co-authored-by: Alex Suraci <[email protected]>
  • Loading branch information
2 people authored and vikram-dagger committed May 3, 2024
1 parent 5e47bd9 commit 7255176
Show file tree
Hide file tree
Showing 31 changed files with 510 additions and 367 deletions.
41 changes: 16 additions & 25 deletions cmd/shim/main.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/containerd/console"
"github.com/google/uuid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/runtime-spec/specs-go"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -501,11 +500,8 @@ func setupBundle() (returnExitCode int) {
}

var searchDomains []string
for _, parentClientID := range execMetadata.ParentClientIDs {
searchDomains = append(searchDomains, network.ClientDomain(parentClientID))
}
if len(searchDomains) > 0 {
spec.Process.Env = append(spec.Process.Env, "_DAGGER_PARENT_CLIENT_IDS="+strings.Join(execMetadata.ParentClientIDs, " "))
if ns := execMetadata.ServerID; ns != "" {
searchDomains = append(searchDomains, network.ClientDomain(ns))
}

var hostsFilePath string
Expand Down Expand Up @@ -543,7 +539,7 @@ func setupBundle() (returnExitCode int) {
keepEnv := []string{}
for _, env := range spec.Process.Env {
switch {
case strings.HasPrefix(env, "_DAGGER_ENABLE_NESTING="):
case strings.HasPrefix(env, "_DAGGER_NESTED_CLIENT_ID="):
// keep the env var; we use it at runtime
keepEnv = append(keepEnv, env)

Expand All @@ -562,6 +558,9 @@ func setupBundle() (returnExitCode int) {
Source: "/run/buildkit/buildkitd.sock",
})
case strings.HasPrefix(env, "_DAGGER_SERVER_ID="):
case strings.HasPrefix(env, "_DAGGER_ENGINE_VERSION="):
// don't need this at runtime, it is just for invalidating cache, which
// has already happened by now
case strings.HasPrefix(env, aliasPrefix):
// NB: don't keep this env var, it's only for the bundling step
// keepEnv = append(keepEnv, env)
Expand Down Expand Up @@ -715,7 +714,8 @@ func internalEnv(name string) (string, bool) {
}

func runWithNesting(ctx context.Context, cmd *exec.Cmd) error {
if _, found := internalEnv("_DAGGER_ENABLE_NESTING"); !found {
clientID, ok := internalEnv("_DAGGER_NESTED_CLIENT_ID")
if !ok {
// no nesting; run as normal
return execProcess(cmd, true)
}
Expand All @@ -733,25 +733,16 @@ func runWithNesting(ctx context.Context, cmd *exec.Cmd) error {
}
sessionPort := l.Addr().(*net.TCPAddr).Port

parentClientIDsVal, _ := internalEnv("_DAGGER_PARENT_CLIENT_IDS")

clientParams := client.Params{
SecretToken: sessionToken.String(),
RunnerHost: "unix:///.runner.sock",
ParentClientIDs: strings.Fields(parentClientIDsVal),
}

if _, ok := internalEnv("_DAGGER_ENABLE_NESTING_IN_SAME_SESSION"); ok {
serverID, ok := internalEnv("_DAGGER_SERVER_ID")
if !ok {
return fmt.Errorf("missing _DAGGER_SERVER_ID")
}
clientParams.ServerID = serverID
serverID, ok := internalEnv("_DAGGER_SERVER_ID")
if !ok {
return errors.New("missing nested client server ID")
}

moduleCallerDigest, ok := internalEnv("_DAGGER_MODULE_CALLER_DIGEST")
if ok {
clientParams.ModuleCallerDigest = digest.Digest(moduleCallerDigest)
clientParams := client.Params{
ID: clientID,
ServerID: serverID,
SecretToken: sessionToken.String(),
RunnerHost: "unix:///.runner.sock",
}

sess, ctx, err := client.Connect(ctx, clientParams)
Expand Down
2 changes: 2 additions & 0 deletions core/c2h.go
Expand Up @@ -20,6 +20,7 @@ type c2hTunnel struct {
upstreamHost string
tunnelServiceHost string
tunnelServicePorts []PortForward
sessionID string
}

func (d *c2hTunnel) Tunnel(ctx context.Context) (rerr error) {
Expand Down Expand Up @@ -56,6 +57,7 @@ func (d *c2hTunnel) Tunnel(ctx context.Context) (rerr error) {
upstream := NewHostIPSocket(
port.Protocol.Network(),
fmt.Sprintf("%s:%d", d.upstreamHost, port.Backend),
d.sessionID,
)

sockPath := fmt.Sprintf("/upstream.%d.sock", frontend)
Expand Down
52 changes: 22 additions & 30 deletions core/container.go
Expand Up @@ -1007,16 +1007,22 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts

// this allows executed containers to communicate back to this API
if opts.ExperimentalPrivilegedNesting {
// include the engine version so that these execs get invalidated if the engine/API change
runOpts = append(runOpts, llb.AddEnv("_DAGGER_ENABLE_NESTING", engine.Version))
}

if opts.ModuleCallerDigest != "" {
runOpts = append(runOpts, llb.AddEnv("_DAGGER_MODULE_CALLER_DIGEST", opts.ModuleCallerDigest.String()))
}

if opts.NestedInSameSession {
runOpts = append(runOpts, llb.AddEnv("_DAGGER_ENABLE_NESTING_IN_SAME_SESSION", ""))
callerOpts := opts.NestedExecFunctionCall
if callerOpts == nil {
// default to caching the nested exec
callerOpts = &FunctionCall{
Cache: true,
}
}
clientID, err := container.Query.RegisterCaller(ctx, callerOpts)
if err != nil {
return nil, fmt.Errorf("register caller: %w", err)
}
runOpts = append(runOpts,
llb.AddEnv("_DAGGER_NESTED_CLIENT_ID", clientID),
// include the engine version so that these execs get invalidated if the engine/API change
llb.AddEnv("_DAGGER_ENGINE_VERSION", engine.Version),
)
}

metaSt, metaSourcePath := metaMount(opts.Stdin)
Expand Down Expand Up @@ -1057,13 +1063,7 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
}

// don't pass these through to the container when manually set, they are internal only
if name == "_DAGGER_ENABLE_NESTING" && !opts.ExperimentalPrivilegedNesting {
continue
}
if name == "_DAGGER_MODULE_CALLER_DIGEST" && opts.ModuleCallerDigest == "" {
continue
}
if name == "_DAGGER_ENABLE_NESTING_IN_SAME_SESSION" && !opts.NestedInSameSession {
if name == "_DAGGER_NESTED_CLIENT_ID" && !opts.ExperimentalPrivilegedNesting {
continue
}

Expand Down Expand Up @@ -1188,8 +1188,7 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
return nil, err
}
execMeta := buildkit.ContainerExecUncachedMetadata{
ParentClientIDs: clientMetadata.ClientIDs(),
ServerID: clientMetadata.ServerID,
ServerID: clientMetadata.ServerID,
}
proxyVal, err := execMeta.ToPBFtpProxyVal()
if err != nil {
Expand Down Expand Up @@ -1784,22 +1783,15 @@ type ContainerExecOpts struct {
// Redirect the command's standard error to a file in the container
RedirectStderr string `default:""`

// Provide dagger access to the executed command
// Do not use this option unless you trust the command being executed.
// The command being executed WILL BE GRANTED FULL ACCESS TO YOUR HOST FILESYSTEM
// Provide the executed command access back to the Dagger API
ExperimentalPrivilegedNesting bool `default:"false"`

// Grant the process all root capabilities
InsecureRootCapabilities bool `default:"false"`

// (Internal-only) If this exec is for a module function, this digest will be set in the
// grpc context metadata for any api requests back to the engine. It's used by the API
// server to determine which schema to serve and other module context metadata.
ModuleCallerDigest digest.Digest `name:"-"`

// (Internal-only) Used for module function execs to trigger the nested api client to
// be connected back to the same session.
NestedInSameSession bool `name:"-"`
// (Internal-only) If this is a nested exec for a Function call, this should be set
// with the metadata for that call
NestedExecFunctionCall *FunctionCall `name:"-"`
}

type BuildArg struct {
Expand Down
38 changes: 14 additions & 24 deletions core/git.go
Expand Up @@ -60,14 +60,20 @@ func (*GitRef) TypeDescription() string {

func (ref *GitRef) Tree(ctx context.Context) (*Directory, error) {
bk := ref.Query.Buildkit
st := ref.getState(ctx, bk)
return NewDirectorySt(ctx, ref.Query, *st, "", ref.Repo.Platform, ref.Repo.Services)
st, err := ref.getState(ctx, bk)
if err != nil {
return nil, err
}
return NewDirectorySt(ctx, ref.Query, st, "", ref.Repo.Platform, ref.Repo.Services)
}

func (ref *GitRef) Commit(ctx context.Context) (string, error) {
bk := ref.Query.Buildkit
st := ref.getState(ctx, bk)
p, err := resolveProvenance(ctx, bk, *st)
st, err := ref.getState(ctx, bk)
if err != nil {
return "", err
}
p, err := resolveProvenance(ctx, bk, st)
if err != nil {
return "", err
}
Expand All @@ -77,7 +83,7 @@ func (ref *GitRef) Commit(ctx context.Context) (string, error) {
return p.Sources.Git[0].Commit, nil
}

func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) *llb.State {
func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) (llb.State, error) {
opts := []llb.GitOption{}

if ref.Repo.KeepGitDir {
Expand All @@ -96,26 +102,10 @@ func (ref *GitRef) getState(ctx context.Context, bk *buildkit.Client) *llb.State
opts = append(opts, llb.AuthHeaderSecret(ref.Repo.AuthHeader.Accessor))
}

useDNS := len(ref.Repo.Services) > 0

clientMetadata, err := engine.ClientMetadataFromContext(ctx)
if err == nil && !useDNS {
useDNS = len(clientMetadata.ParentClientIDs) > 0
if err != nil {
return llb.State{}, err
}

var st llb.State
if useDNS {
// NB: only configure search domains if we're directly using a service, or
// if we're nested beneath another search domain.
//
// we have to be a bit selective here to avoid breaking Dockerfile builds
// that use a Buildkit frontend (# syntax = ...) that doesn't have the
// networks API cap.
//
// TODO: add API cap
st = gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.ClientIDs(), opts...)
} else {
st = llb.Git(ref.Repo.URL, ref.Ref, opts...)
}
return &st
return gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.ServerID, opts...), nil
}
112 changes: 112 additions & 0 deletions core/integration/module_test.go
Expand Up @@ -5628,6 +5628,118 @@ func TestModuleUnicodePath(t *testing.T) {
require.JSONEq(t, `{"test":{"hello":"hello"}}`, out)
}

func TestModuleStartServices(t *testing.T) {
t.Parallel()

// regression test for https://github.com/dagger/dagger/pull/6914
t.Run("use service in multiple functions", func(t *testing.T) {
t.Parallel()
c, ctx := connect(t)

out, err := c.Container().From(golangImage).
WithMountedFile(testCLIBinPath, daggerCliFile(t, c)).
WithWorkdir("/work").
With(daggerExec("init", "--source=.", "--name=test", "--sdk=go")).
WithNewFile("/work/main.go", dagger.ContainerWithNewFileOpts{
Contents: `package main
import (
"context"
"fmt"
)
type Test struct {
}
func (m *Test) FnA(ctx context.Context) (*Sub, error) {
svc := dag.Container().
From("python").
WithMountedDirectory(
"/srv/www",
dag.Directory().WithNewFile("index.html", "hey there"),
).
WithWorkdir("/srv/www").
WithExposedPort(23457).
WithExec([]string{"python", "-m", "http.server", "23457"}).
AsService()
ctr := dag.Container().
From("alpine:3.18.6").
WithServiceBinding("svc", svc).
WithExec([]string{"wget", "-O", "-", "http://svc:23457"})
out, err := ctr.Stdout(ctx)
if err != nil {
return nil, err
}
if out != "hey there" {
return nil, fmt.Errorf("unexpected output: %q", out)
}
return &Sub{Ctr: ctr}, nil
}
type Sub struct {
Ctr *Container
}
func (m *Sub) FnB(ctx context.Context) (string, error) {
return m.Ctr.
WithExec([]string{"wget", "-O", "-", "http://svc:23457"}).
Stdout(ctx)
}
`,
}).
With(daggerCall("fnA", "fnB")).
Stdout(ctx)
require.NoError(t, err)
require.Equal(t, "hey there", strings.TrimSpace(out))
})

// regression test for https://github.com/dagger/dagger/issues/6951
t.Run("service in multiple containers", func(t *testing.T) {
t.Parallel()
c, ctx := connect(t)

_, err := c.Container().From(golangImage).
WithMountedFile(testCLIBinPath, daggerCliFile(t, c)).
WithWorkdir("/work").
With(daggerExec("init", "--source=.", "--name=test", "--sdk=go")).
WithNewFile("/work/main.go", dagger.ContainerWithNewFileOpts{
Contents: `package main
import (
"context"
)
type Test struct {
}
func (m *Test) Fn(ctx context.Context) *Container {
redis := dag.Container().
From("redis").
WithExposedPort(6379).
AsService()
cli := dag.Container().
From("redis").
WithoutEntrypoint().
WithServiceBinding("redis", redis)
ctrA := cli.WithExec([]string{"sh", "-c", "redis-cli -h redis info >> /tmp/out.txt"})
file := ctrA.Directory("/tmp").File("/out.txt")
ctrB := dag.Container().
From("alpine").
WithFile("/out.txt", file)
return ctrB.WithExec([]string{"cat", "/out.txt"})
}
`,
}).
With(daggerCall("fn", "stdout")).
Sync(ctx)
require.NoError(t, err)
})
}

func daggerExec(args ...string) dagger.WithContainerFunc {
return func(c *dagger.Container) *dagger.Container {
return c.WithExec(append([]string{"dagger", "--debug"}, args...), dagger.ContainerWithExecOpts{
Expand Down
2 changes: 1 addition & 1 deletion core/interface.go
Expand Up @@ -222,7 +222,7 @@ func (iface *InterfaceType) Install(ctx context.Context, dag *dagql.Server) erro
})
}

res, err := callable.Call(ctx, dagql.CurrentID(ctx), &CallOpts{
res, err := callable.Call(ctx, &CallOpts{
Inputs: callInputs,
ParentVal: runtimeVal.Fields,
})
Expand Down

0 comments on commit 7255176

Please sign in to comment.