Skip to content

Commit 50b620e

Browse files
Fix deadlock when bundle and decision logging enabled
This commit attempts to fix the deadlock that happens when bundle and decision logging are both enabled. The opa-envoy plugin creates a new transaction during query evaluation and closes it once eval is complete. Then when it attempts to log the decision, the decision log plugin grabs mask mutex and calls the PrepareForEval function in the rego package which tries to open a new read transaction on the store since the log plugin does not provide one. This call gets blocked if concurrently the bundle plugin has a write transaction open on the store. This write invokes the decision log plugin's callback and tries to grab the mask mutex. This call gets blocked because the decision log plugin is already holding onto it for the mask query. To avoid this, we keep the transaction open in the opa-envoy plugin till we log the decision. Fixes: open-policy-agent/opa#3722 Signed-off-by: Ashutosh Narkar <[email protected]>
1 parent 8256520 commit 50b620e

File tree

5 files changed

+257
-32
lines changed

5 files changed

+257
-32
lines changed

envoyauth/evaluation.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,47 +29,57 @@ type EvalContext interface {
2929

3030
//Eval - Evaluates an input against a provided EvalContext and yields result
3131
func Eval(ctx context.Context, evalContext EvalContext, input ast.Value, result *EvalResult, opts ...func(*rego.Rego)) error {
32+
var err error
3233

33-
err := storage.Txn(ctx, evalContext.Store(), storage.TransactionParams{}, func(txn storage.Transaction) error {
34-
err := getRevision(ctx, evalContext.Store(), txn, result)
34+
if result.Txn == nil {
35+
var txn storage.Transaction
36+
var txnClose TransactionCloser
37+
txn, txnClose, err = result.GetTxn(ctx, evalContext.Store())
3538
if err != nil {
39+
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
3640
return err
3741
}
42+
defer txnClose(ctx, err)
43+
result.Txn = txn
44+
}
3845

39-
result.TxnID = txn.ID()
46+
err = getRevision(ctx, evalContext.Store(), result.Txn, result)
47+
if err != nil {
48+
return err
49+
}
4050

41-
logrus.WithFields(logrus.Fields{
42-
"input": input,
43-
"query": evalContext.ParsedQuery().String(),
44-
"txn": result.TxnID,
45-
}).Debug("Executing policy query.")
51+
result.TxnID = result.Txn.ID()
4652

47-
err = constructPreparedQuery(evalContext, txn, result.Metrics, opts)
48-
if err != nil {
49-
return err
50-
}
53+
logrus.WithFields(logrus.Fields{
54+
"input": input,
55+
"query": evalContext.ParsedQuery().String(),
56+
"txn": result.TxnID,
57+
}).Debug("Executing policy query.")
5158

52-
rs, err := evalContext.PreparedQuery().Eval(
53-
ctx,
54-
rego.EvalParsedInput(input),
55-
rego.EvalTransaction(txn),
56-
rego.EvalMetrics(result.Metrics),
57-
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
58-
)
59+
err = constructPreparedQuery(evalContext, result.Txn, result.Metrics, opts)
60+
if err != nil {
61+
return err
62+
}
5963

60-
if err != nil {
61-
return err
62-
} else if len(rs) == 0 {
63-
return fmt.Errorf("undefined decision")
64-
} else if len(rs) > 1 {
65-
return fmt.Errorf("multiple evaluation results")
66-
}
64+
var rs rego.ResultSet
65+
rs, err = evalContext.PreparedQuery().Eval(
66+
ctx,
67+
rego.EvalParsedInput(input),
68+
rego.EvalTransaction(result.Txn),
69+
rego.EvalMetrics(result.Metrics),
70+
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
71+
)
6772

68-
result.Decision = rs[0].Expressions[0].Value
69-
return nil
70-
})
73+
if err != nil {
74+
return err
75+
} else if len(rs) == 0 {
76+
return fmt.Errorf("undefined decision")
77+
} else if len(rs) > 1 {
78+
return fmt.Errorf("multiple evaluation results")
79+
}
7180

72-
return err
81+
result.Decision = rs[0].Expressions[0].Value
82+
return nil
7383
}
7484

7585
func constructPreparedQuery(evalContext EvalContext, txn storage.Transaction, m metrics.Metrics, opts []func(*rego.Rego)) error {

envoyauth/evaluation_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,19 @@ package envoyauth
33
import (
44
"context"
55
"reflect"
6+
"strings"
7+
"sync"
68
"testing"
79

10+
"github.com/open-policy-agent/opa/plugins/logs"
11+
12+
"github.com/open-policy-agent/opa/ast"
813
"github.com/open-policy-agent/opa/bundle"
14+
"github.com/open-policy-agent/opa/plugins"
15+
"github.com/open-policy-agent/opa/rego"
916
"github.com/open-policy-agent/opa/storage"
1017
"github.com/open-policy-agent/opa/storage/inmem"
18+
iCache "github.com/open-policy-agent/opa/topdown/cache"
1119
)
1220

1321
func TestGetRevisionLegacy(t *testing.T) {
@@ -102,3 +110,167 @@ func TestGetRevisionMulti(t *testing.T) {
102110
}
103111

104112
}
113+
114+
func TestEval(t *testing.T) {
115+
ctx := context.Background()
116+
server, err := testAuthzServer()
117+
if err != nil {
118+
t.Fatal(err)
119+
}
120+
121+
parsedBody := make(map[string]interface{})
122+
parsedBody["firstname"] = "foo"
123+
parsedBody["lastname"] = "bar"
124+
125+
input := make(map[string]interface{})
126+
input["parsed_body"] = parsedBody
127+
128+
inputValue, err := ast.InterfaceToValue(input)
129+
if err != nil {
130+
t.Fatal(err)
131+
}
132+
133+
err = Eval(ctx, server, inputValue, &EvalResult{})
134+
if err != nil {
135+
t.Fatal(err)
136+
}
137+
138+
// include transaction in the result object
139+
er := &EvalResult{}
140+
var txn storage.Transaction
141+
var txnClose TransactionCloser
142+
143+
txn, txnClose, err = er.GetTxn(ctx, server.Store())
144+
if err != nil {
145+
t.Fatal(err)
146+
}
147+
148+
defer txnClose(ctx, err)
149+
er.Txn = txn
150+
151+
err = Eval(ctx, server, inputValue, er)
152+
if err != nil {
153+
t.Fatal(err)
154+
}
155+
}
156+
157+
func testAuthzServer() (*mockExtAuthzGrpcServer, error) {
158+
159+
module := `
160+
package envoy.authz
161+
162+
default allow = false
163+
164+
allow {
165+
input.parsed_body.firstname == "foo"
166+
input.parsed_body.lastname == "bar"
167+
}`
168+
169+
ctx := context.Background()
170+
store := inmem.New()
171+
txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams)
172+
store.UpsertPolicy(ctx, txn, "example.rego", []byte(module))
173+
store.Commit(ctx, txn)
174+
175+
m, err := plugins.New([]byte{}, "test", store)
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
m.Register("test_plugin", &testPlugin{})
181+
config, err := logs.ParseConfig([]byte(`{"plugin": "test_plugin"}`), nil, []string{"test_plugin"})
182+
if err != nil {
183+
return nil, err
184+
}
185+
186+
plugin := logs.New(config, m)
187+
m.Register(logs.Name, plugin)
188+
189+
if err := m.Start(ctx); err != nil {
190+
return nil, err
191+
}
192+
193+
path := "envoy/authz/allow"
194+
query := "data." + strings.Replace(path, "/", ".", -1)
195+
parsedQuery, err := ast.ParseBody(query)
196+
if err != nil {
197+
return nil, err
198+
}
199+
200+
cfg := Config{
201+
Addr: ":0",
202+
Path: path,
203+
parsedQuery: parsedQuery,
204+
}
205+
206+
return &mockExtAuthzGrpcServer{
207+
cfg: cfg,
208+
manager: m,
209+
preparedQueryDoOnce: new(sync.Once),
210+
// interQueryBuiltinCache: iCache.NewInterQueryCache(m.InterQueryBuiltinCacheConfig()),
211+
}, nil
212+
}
213+
214+
type Config struct {
215+
Addr string `json:"addr"`
216+
Path string `json:"path"`
217+
parsedQuery ast.Body
218+
}
219+
220+
type mockExtAuthzGrpcServer struct {
221+
cfg Config
222+
manager *plugins.Manager
223+
preparedQuery *rego.PreparedEvalQuery
224+
preparedQueryDoOnce *sync.Once
225+
}
226+
227+
func (m *mockExtAuthzGrpcServer) ParsedQuery() ast.Body {
228+
return m.cfg.parsedQuery
229+
}
230+
231+
func (m *mockExtAuthzGrpcServer) Store() storage.Store {
232+
return m.manager.Store
233+
}
234+
235+
func (m *mockExtAuthzGrpcServer) Compiler() *ast.Compiler {
236+
return m.manager.GetCompiler()
237+
}
238+
239+
func (m *mockExtAuthzGrpcServer) Runtime() *ast.Term {
240+
return m.manager.Info
241+
}
242+
243+
func (m *mockExtAuthzGrpcServer) PreparedQueryDoOnce() *sync.Once {
244+
return m.preparedQueryDoOnce
245+
}
246+
247+
func (m *mockExtAuthzGrpcServer) InterQueryBuiltinCache() iCache.InterQueryCache {
248+
return nil
249+
}
250+
251+
func (m *mockExtAuthzGrpcServer) PreparedQuery() *rego.PreparedEvalQuery {
252+
return m.preparedQuery
253+
}
254+
255+
func (m *mockExtAuthzGrpcServer) SetPreparedQuery(pq *rego.PreparedEvalQuery) {
256+
m.preparedQuery = pq
257+
}
258+
259+
type testPlugin struct {
260+
events []logs.EventV1
261+
}
262+
263+
func (p *testPlugin) Start(context.Context) error {
264+
return nil
265+
}
266+
267+
func (p *testPlugin) Stop(context.Context) {
268+
}
269+
270+
func (p *testPlugin) Reconfigure(context.Context, interface{}) {
271+
}
272+
273+
func (p *testPlugin) Log(_ context.Context, event logs.EventV1) error {
274+
p.events = append(p.events, event)
275+
return nil
276+
}

envoyauth/response.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package envoyauth
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"net/http"
@@ -9,6 +10,7 @@ import (
910
ext_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
1011
"github.com/open-policy-agent/opa-envoy-plugin/internal/util"
1112
"github.com/open-policy-agent/opa/metrics"
13+
"github.com/open-policy-agent/opa/storage"
1214
)
1315

1416
// EvalResult - Captures the result from evaluating a query against an input
@@ -19,11 +21,15 @@ type EvalResult struct {
1921
TxnID uint64
2022
Decision interface{}
2123
Metrics metrics.Metrics
24+
Txn storage.Transaction
2225
}
2326

2427
// StopFunc should be called as soon as the evaluation is finished
2528
type StopFunc = func()
2629

30+
// TransactionCloser should be called to abort the transaction
31+
type TransactionCloser func(ctx context.Context, err error) error
32+
2733
// NewEvalResult creates a new EvalResult and a StopFunc that is used to stop the timer for metrics
2834
func NewEvalResult() (*EvalResult, StopFunc, error) {
2935
var err error
@@ -34,18 +40,41 @@ func NewEvalResult() (*EvalResult, StopFunc, error) {
3440
er.DecisionID, err = util.UUID4()
3541

3642
if err != nil {
37-
return nil, func() {}, err
43+
return nil, nil, err
3844
}
3945

4046
er.Metrics.Timer(metrics.ServerHandler).Start()
4147

4248
stop := func() {
43-
er.Metrics.Timer(metrics.ServerHandler).Stop()
49+
_ = er.Metrics.Timer(metrics.ServerHandler).Stop()
4450
}
4551

4652
return &er, stop, nil
4753
}
4854

55+
// GetTxn creates a read transaction suitable for the configured EvalResult object
56+
func (result *EvalResult) GetTxn(ctx context.Context, store storage.Store) (storage.Transaction, TransactionCloser, error) {
57+
params := storage.TransactionParams{}
58+
59+
noopCloser := func(ctx context.Context, err error) error {
60+
return nil // no-op default
61+
}
62+
63+
txn, err := store.NewTransaction(ctx, params)
64+
if err != nil {
65+
return nil, noopCloser, err
66+
}
67+
68+
// Setup a closer function that will abort the transaction.
69+
closer := func(ctx context.Context, txnErr error) error {
70+
store.Abort(ctx, txn)
71+
result.Txn = nil
72+
return nil
73+
}
74+
75+
return txn, closer, nil
76+
}
77+
4978
func (result *EvalResult) invalidDecisionErr() error {
5079
return fmt.Errorf("illegal value for policy evaluation result: %T", result.Decision)
5180
}

internal/internal.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,23 @@ func (p *envoyExtAuthzGrpcServer) Check(ctx context.Context, req *ext_authz_v3.C
266266

267267
func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*ext_authz_v3.CheckResponse, func() *rpc_status.Status, error) {
268268
var err error
269+
var evalErr error
269270
start := time.Now()
270271

271272
result, stopeval, err := envoyauth.NewEvalResult()
272273
if err != nil {
273274
logrus.WithField("err", err).Error("Unable to start new evaluation.")
274275
return nil, func() *rpc_status.Status { return nil }, err
275276
}
277+
278+
txn, txnClose, err := result.GetTxn(ctx, p.Store())
279+
if err != nil {
280+
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
281+
return nil, func() *rpc_status.Status { return nil }, err
282+
}
283+
284+
result.Txn = txn
285+
276286
logEntry := logrus.WithField("decision-id", result.DecisionID)
277287

278288
var input map[string]interface{}
@@ -281,11 +291,13 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*
281291
stopeval()
282292
logErr := p.log(ctx, input, result, err)
283293
if logErr != nil {
294+
_ = txnClose(ctx, logErr) // Ignore error
284295
return &rpc_status.Status{
285296
Code: int32(code.Code_UNKNOWN),
286297
Message: logErr.Error(),
287298
}
288299
}
300+
_ = txnClose(ctx, evalErr) // Ignore error
289301
return nil
290302
}
291303

@@ -306,6 +318,7 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*
306318

307319
err = envoyauth.Eval(ctx, p, inputValue, result)
308320
if err != nil {
321+
evalErr = err
309322
return nil, stop, err
310323
}
311324

0 commit comments

Comments
 (0)