Skip to content

Commit

Permalink
Test Integration: Hot Reload spec.ignoreErrors
Browse files Browse the repository at this point in the history
Adds integration tests that assert that the `spec.ignoreErrors` field
works as expected when hot reloading a component, namely that daprd
exits error when `spec.ignoreErrors=false` and continues running
when `spec.ignoreErrors=true`.

No implementation changes.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed Mar 6, 2024
1 parent b0e40d2 commit 637d323
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ func (p *Processor) processComponents(ctx context.Context) error {
if err != nil {
err = fmt.Errorf("process component %s error: %s", comp.Name, err)
if !comp.Spec.IgnoreErrors {
log.Warnf("Error processing component, daprd will exit gracefully")
log.Warnf("Error processing component, daprd will exit gracefully: %s", err)

Check warning on line 336 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L336

Added line #L336 was not covered by tests
return err
}
log.Error(err)
log.Errorf("Ignoring error processing component: %s", err)

Check warning on line 339 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L339

Added line #L339 was not covered by tests
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/framework/process/logline/logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin
t.Helper()

if len(expLines) == 0 {
go io.Copy(io.Discard, reader)
l.done.Add(1)
return expLines
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func RunIntegrationTests(t *testing.T) {
t.Logf("setting up test case")
options := tcase.Setup(t)

ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
t.Cleanup(cancel)

framework.Run(t, ctx, options...)
Expand All @@ -74,5 +74,5 @@ func RunIntegrationTests(t *testing.T) {
})
}

t.Logf("Total integration test execution time: %s", time.Since(startTime).Truncate(time.Millisecond*100))
t.Logf("Total integration test execution time: %s", time.Since(startTime).Truncate(time.Millisecond))
}
141 changes: 141 additions & 0 deletions tests/integration/suite/daprd/hotreload/operator/ignoreerrors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operator

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

compapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/operator/api"
operatorv1 "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/operator"
"github.com/dapr/dapr/tests/integration/framework/process/logline"
"github.com/dapr/dapr/tests/integration/framework/process/sentry"
"github.com/dapr/dapr/tests/integration/framework/util"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(ignoreerrors))
}

type ignoreerrors struct {
daprd *daprd.Daprd
operator *operator.Operator
}

func (i *ignoreerrors) Setup(t *testing.T) []framework.Option {
sentry := sentry.New(t)

i.operator = operator.New(t,
operator.WithSentry(sentry),
operator.WithGetConfigurationFn(func(context.Context, *operatorv1.GetConfigurationRequest) (*operatorv1.GetConfigurationResponse, error) {
return &operatorv1.GetConfigurationResponse{
Configuration: []byte(
`{"kind":"Configuration","apiVersion":"dapr.io/v1alpha1","metadata":{"name":"hotreloading"},"spec":{"features":[{"name":"HotReload","enabled":true}]}}`,
),
}, nil
}),
)

logline := logline.New(t,
logline.WithStdoutLineContains(
"Failed to init component a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
"Ignoring error processing component: process component a error: [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
"Error processing component, daprd will exit gracefully: process component a error: [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
),
)

i.daprd = daprd.New(t,
daprd.WithMode("kubernetes"),
daprd.WithConfigs("hotreloading"),
daprd.WithExecOptions(exec.WithEnvVars(t, "DAPR_TRUST_ANCHORS", string(sentry.CABundle().TrustAnchors))),
daprd.WithSentryAddress(sentry.Address()),
daprd.WithControlPlaneAddress(i.operator.Address(t)),
daprd.WithDisableK8sSecretStore(true),
daprd.WithExecOptions(
exec.WithRunError(func(t *testing.T, err error) {
require.ErrorContains(t, err, "exit status 1")
}),
exec.WithExitCode(1),
exec.WithStdout(logline.Stdout()),
),
)

return []framework.Option{
framework.WithProcesses(sentry, i.operator, logline, i.daprd),
}
}

func (i *ignoreerrors) Run(t *testing.T, ctx context.Context) {
i.daprd.WaitUntilRunning(t, ctx)

client := util.HTTPClient(t)
assert.Empty(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()))

t.Run("adding a component should become available", func(t *testing.T) {
newComp := compapi.Component{
ObjectMeta: metav1.ObjectMeta{Name: "a"},
Spec: compapi.ComponentSpec{Type: "state.in-memory", Version: "v1"},
}
i.operator.SetComponents(newComp)
i.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &newComp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()), 1)
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a component with an error should be closed and ignored if `ignoreErrors=true`", func(t *testing.T) {
upComp := compapi.Component{
ObjectMeta: metav1.ObjectMeta{Name: "a"},
Spec: compapi.ComponentSpec{Type: "state.sqlite", Version: "v1", IgnoreErrors: true},
}
i.operator.SetComponents(upComp)
i.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &upComp, EventType: operatorv1.ResourceEventType_UPDATED})
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Empty(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()))
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a `ignoreErrors=true` component should make it available again", func(t *testing.T) {
newComp := compapi.Component{
ObjectMeta: metav1.ObjectMeta{Name: "a"},
Spec: compapi.ComponentSpec{Type: "state.in-memory", Version: "v1"},
}
i.operator.SetComponents(newComp)
i.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &newComp, EventType: operatorv1.ResourceEventType_UPDATED})
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()), 1)
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a component with an error should be closed and exit 1 if `ignoreErrors=false`", func(t *testing.T) {
upComp := compapi.Component{
ObjectMeta: metav1.ObjectMeta{Name: "a"},
Spec: compapi.ComponentSpec{Type: "state.sqlite", Version: "v1", IgnoreErrors: false},
}
i.operator.SetComponents(upComp)
i.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &upComp, EventType: operatorv1.ResourceEventType_UPDATED})
})
}
148 changes: 148 additions & 0 deletions tests/integration/suite/daprd/hotreload/selfhosted/ignoreerrors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/

package selfhosted

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/logline"
"github.com/dapr/dapr/tests/integration/framework/util"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(ignoreerrors))
}

type ignoreerrors struct {
daprd *daprd.Daprd
logline *logline.LogLine
resDir string
}

func (i *ignoreerrors) Setup(t *testing.T) []framework.Option {
i.logline = logline.New(t,
logline.WithStdoutLineContains(
"Failed to init component a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
"Ignoring error processing component: process component a error: [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
"Error processing component, daprd will exit gracefully: process component a error: [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for a (state.sqlite/v1): missing connection string",
),
)

configFile := filepath.Join(t.TempDir(), "config.yaml")
require.NoError(t, os.WriteFile(configFile, []byte(`
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: hotreloading
spec:
features:
- name: HotReload
enabled: true
`), 0o600))

i.resDir = t.TempDir()

i.daprd = daprd.New(t,
daprd.WithConfigs(configFile),
daprd.WithResourcesDir(i.resDir),
daprd.WithExecOptions(
exec.WithRunError(func(t *testing.T, err error) {
require.ErrorContains(t, err, "exit status 1")
}),
exec.WithExitCode(1),
exec.WithStdout(i.logline.Stdout()),
),
)

return []framework.Option{
framework.WithProcesses(i.logline, i.daprd),
}
}

func (i *ignoreerrors) Run(t *testing.T, ctx context.Context) {
i.daprd.WaitUntilRunning(t, ctx)

client := util.HTTPClient(t)
assert.Empty(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()))

t.Run("adding components should become available", func(t *testing.T) {
require.NoError(t, os.WriteFile(filepath.Join(i.resDir, "1.yaml"), []byte(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: a
spec:
type: state.in-memory
version: v1
`), 0o600))
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()), 1)
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a component with an error should be closed and ignored if `ignoreErrors=true`", func(t *testing.T) {
require.NoError(t, os.WriteFile(filepath.Join(i.resDir, "1.yaml"), []byte(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: a
spec:
type: state.sqlite
version: v1
ignoreErrors: true
`), 0o600))
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Empty(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()))
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a `ignoreErrors=true` component should make it available again", func(t *testing.T) {
require.NoError(t, os.WriteFile(filepath.Join(i.resDir, "1.yaml"), []byte(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: a
spec:
type: state.in-memory
version: v1
`), 0o600))
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, i.daprd.HTTPPort()), 1)
}, time.Second*5, time.Millisecond*10)
})

t.Run("Updating a component with an error should be closed and exit 1 if `ignoreErrors=false`", func(t *testing.T) {
require.NoError(t, os.WriteFile(filepath.Join(i.resDir, "1.yaml"), []byte(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: a
spec:
type: state.sqlite
version: v1
`), 0o600))
i.logline.EventuallyFoundAll(t)
})
}

0 comments on commit 637d323

Please sign in to comment.