-
Notifications
You must be signed in to change notification settings - Fork 360
OPA: Fail fast on discovery or bundle download errors #3120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b25cae4
ed756e6
23df108
0b857aa
8b57401
b504ce0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"github.com/open-policy-agent/opa/config" | ||
"github.com/open-policy-agent/opa/logging" | ||
"github.com/open-policy-agent/opa/plugins" | ||
"github.com/open-policy-agent/opa/plugins/bundle" | ||
"github.com/open-policy-agent/opa/plugins/discovery" | ||
"github.com/open-policy-agent/opa/rego" | ||
"github.com/open-policy-agent/opa/runtime" | ||
|
@@ -50,6 +51,11 @@ const ( | |
DefaultRequestBodyBufferSize = 8 * 1024 // 8 KB | ||
|
||
spanNameEval = "open-policy-agent" | ||
|
||
GeneralPluginStatusStartupListener = "general-plugin-status-startup" | ||
DiscoveryPluginStartupListener = "skipper-instance-startup-discovery" | ||
PluginStatusStartupListener = "skipper-instance-startup-plugin" | ||
BundlePluginStartupListener = "skipper-instance-startup-bundle" | ||
) | ||
|
||
type OpenPolicyAgentRegistry struct { | ||
|
@@ -363,16 +369,18 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s | |
} | ||
|
||
type OpenPolicyAgentInstance struct { | ||
manager *plugins.Manager | ||
instanceConfig OpenPolicyAgentInstanceConfig | ||
opaConfig *config.Config | ||
bundleName string | ||
preparedQuery *rego.PreparedEvalQuery | ||
preparedQueryDoOnce *sync.Once | ||
interQueryBuiltinCache iCache.InterQueryCache | ||
once sync.Once | ||
stopped bool | ||
registry *OpenPolicyAgentRegistry | ||
manager *plugins.Manager | ||
instanceConfig OpenPolicyAgentInstanceConfig | ||
opaConfig *config.Config | ||
bundleName string | ||
preparedQuery *rego.PreparedEvalQuery | ||
preparedQueryDoOnce *sync.Once | ||
interQueryBuiltinCache iCache.InterQueryCache | ||
once sync.Once | ||
stopped bool | ||
registry *OpenPolicyAgentRegistry | ||
registerBundleListenerOnce *sync.Once | ||
registerDiscoveryListenerOnce *sync.Once | ||
|
||
maxBodyBytes int64 | ||
bodyReadBufferSize int64 | ||
|
@@ -469,7 +477,9 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] | |
preparedQueryDoOnce: new(sync.Once), | ||
interQueryBuiltinCache: iCache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()), | ||
|
||
idGenerator: uniqueIDGenerator, | ||
idGenerator: uniqueIDGenerator, | ||
registerDiscoveryListenerOnce: new(sync.Once), | ||
registerBundleListenerOnce: new(sync.Once), | ||
} | ||
|
||
manager.RegisterCompilerTrigger(opa.compilerUpdated) | ||
|
@@ -480,38 +490,77 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] | |
// Start asynchronously starts the policy engine's plugins that download | ||
// policies, report status, etc. | ||
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { | ||
err := opa.manager.Start(ctx) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
discoveryPlugin := discovery.Lookup(opa.manager) | ||
|
||
// check readiness of all plugins | ||
pluginsReady := func() bool { | ||
for _, status := range opa.manager.PluginStatus() { | ||
if status != nil && status.State != plugins.StateOK { | ||
return false | ||
done := make(chan struct{}) | ||
failed := make(chan error, 1) | ||
|
||
opa.registerDiscoveryListenerOnce.Do(func() { | ||
|
||
discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) { | ||
handleStatusErrors(status, failed, "discovery plugin") | ||
}) | ||
//defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo | ||
}) | ||
|
||
opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) { | ||
if _, exists := status["bundle"]; exists { | ||
bundlePlugin := bundle.Lookup(opa.manager) | ||
if bundlePlugin != nil { | ||
opa.registerBundleListenerOnce.Do(func() { | ||
bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) { | ||
handleStatusErrors(status, failed, "bundle plugin") | ||
//defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo | ||
}) | ||
}) | ||
} | ||
} | ||
return true | ||
} | ||
}) | ||
defer opa.manager.UnregisterPluginStatusListener(PluginStatusStartupListener) | ||
|
||
err = waitFunc(ctx, pluginsReady, 100*time.Millisecond) | ||
// Register listener for general plugin status checks | ||
opa.manager.RegisterPluginStatusListener(GeneralPluginStatusStartupListener, func(status map[string]*plugins.Status) { | ||
for _, pluginStatus := range status { | ||
if pluginStatus != nil && pluginStatus.State != plugins.StateOK { | ||
return | ||
} | ||
} | ||
close(done) | ||
}) | ||
defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener) | ||
|
||
err := opa.manager.Start(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
select { | ||
case <-ctx.Done(): | ||
timeoutErr := ctx.Err() | ||
|
||
for pluginName, status := range opa.manager.PluginStatus() { | ||
if status != nil && status.State != plugins.StateOK { | ||
opa.Logger().WithFields(map[string]interface{}{ | ||
"plugin_name": pluginName, | ||
"plugin_state": status.State, | ||
"error_message": status.Message, | ||
}).Error("Open policy agent plugin did not start in time") | ||
}).Error("Open policy agent plugin: %v did not start in time", pluginName) | ||
} | ||
} | ||
opa.Close(ctx) | ||
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) | ||
if timeoutErr != nil { | ||
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr) | ||
} | ||
return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout) | ||
|
||
case <-done: | ||
return nil | ||
case err := <-failed: | ||
opa.Close(ctx) | ||
|
||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { | ||
|
@@ -521,25 +570,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { | |
}) | ||
} | ||
|
||
func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error { | ||
if fun() { | ||
return nil | ||
} | ||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return fmt.Errorf("timed out while starting: %w", ctx.Err()) | ||
case <-ticker.C: | ||
if fun() { | ||
return nil | ||
} | ||
} | ||
} | ||
} | ||
|
||
func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) { | ||
info := ast.NewObject() | ||
labels := ast.NewObject() | ||
|
@@ -796,3 +826,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) { | |
func (l *QuietLogger) Warn(fmt string, a ...interface{}) { | ||
l.target.Warn(fmt, a) | ||
} | ||
|
||
var temporaryClientErrorHTTPCodes = map[int64]struct{}{ | ||
429: {}, // too many requests | ||
408: {}, // request timeout | ||
} | ||
|
||
func isTemporaryError(code int64) bool { | ||
_, exists := temporaryClientErrorHTTPCodes[code] | ||
return exists | ||
} | ||
|
||
func handleStatusErrors( | ||
status bundle.Status, | ||
failed chan error, | ||
prefix string, | ||
) { | ||
if status.Code == "bundle_error" { | ||
Pushpalanka marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This still feels like we are only handling a subset of errors, is this really reliable? My understanding was that the Code only gives information about what went wrong and that length(status.Errors) essentially gives yoj if anything goes wrong… There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That indeed is more intuitive, but had below concerns. When looking at the OPA implementation below, I also understand if the Status Error code segment, /plugins/bundle/status.go#L64-L97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like we want to reach out to styra to make a proper error handling possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will raise this in the OPA community and get back. PS: open-policy-agent/opa#6983 raised with minimum changes that will help us here. Will take it forward based on review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Went ahead with the |
||
if status.HTTPCode == "" { | ||
failed <- formatStatusError(prefix, status) | ||
return | ||
} | ||
code, err := status.HTTPCode.Int64() | ||
if err == nil { | ||
if code >= 400 && code < 500 && !isTemporaryError(code) { | ||
// Fail for error codes in the range 400-500 excluding temporary errors | ||
failed <- formatStatusError(prefix, status) | ||
return | ||
} else if code >= 500 { | ||
// Do not fail for 5xx errors and keep retrying | ||
return | ||
} | ||
} | ||
if err != nil { | ||
failed <- formatStatusError(prefix, status) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func formatStatusError(prefix string, status bundle.Status) error { | ||
return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v", | ||
prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.