Skip to content

Commit

Permalink
Expose metric boskos_acquire_duration_seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
hongkailiu committed Sep 30, 2020
1 parent c0841a5 commit ec851bc
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/bazel-*
.DS_Store
/.vscode/*
/.idea/*

_output/
/hack/tools/bin/
2 changes: 1 addition & 1 deletion cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func createFakeBoskos(objects ...runtime.Object) (*ranch.Storage, boskosClient,
}

func (fb *fakeBoskos) Acquire(rtype, state, dest string) (*common.Resource, error) {
crdRes, err := fb.ranch.Acquire(rtype, state, dest, testOwner, "")
crdRes, _, err := fb.ranch.Acquire(rtype, state, dest, testOwner, "")
if err != nil {
return nil, err
}
Expand Down
25 changes: 24 additions & 1 deletion handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"k8s.io/test-infra/prow/simplifypath"
"sigs.k8s.io/boskos/common"
Expand Down Expand Up @@ -90,6 +92,23 @@ func handleDefault(r *ranch.Ranch) http.HandlerFunc {
}
}

var (
acquireDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "boskos_acquire_duration_seconds",
Help: "Histogram of waiting in seconds for a Boskos' client to acquire a resource.",
Buckets: []float64{1, 10, 100, 500, 1000, 1500, 2000},
}, []string{
"type",
"state",
"dest",
"has_request_id",
})
)

func init() {
prometheus.MustRegister(acquireDurationSeconds)
}

// handleAcquire: Handler for /acquire
// Method: POST
// URLParams:
Expand Down Expand Up @@ -122,7 +141,7 @@ func handleAcquire(r *ranch.Ranch) http.HandlerFunc {

logrus.Infof("Request for a %v %v from %v, dest %v", state, rtype, owner, dest)

resource, err := r.Acquire(rtype, state, dest, owner, requestID)
resource, createdTime, err := r.Acquire(rtype, state, dest, owner, requestID)
if err != nil {
returnAndLogError(res, err, "Acquire failed")
return
Expand All @@ -141,6 +160,10 @@ func handleAcquire(r *ranch.Ranch) http.HandlerFunc {
}
logrus.Infof("Resource leased: %v", string(resJSON))
fmt.Fprint(res, string(resJSON))

latency := time.Since(createdTime)
labels := prometheus.Labels{"type": rtype, "state": state, "dest": dest, "has_request_id": strconv.FormatBool(requestID != "")}
acquireDurationSeconds.With(labels).Observe(latency.Seconds())
}
}

Expand Down
2 changes: 1 addition & 1 deletion mason/mason_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func createFakeBoskos(tc testConfig) (*ranch.Storage, *Client, chan releasedReso
}

func (fb *fakeBoskos) Acquire(rtype, state, dest string) (*common.Resource, error) {
crd, err := fb.ranch.Acquire(rtype, state, dest, owner, "")
crd, _, err := fb.ranch.Acquire(rtype, state, dest, owner, "")
if crd != nil {
return resourcePtr(crd.ToResource()), err
}
Expand Down
29 changes: 25 additions & 4 deletions ranch/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ranch

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -28,6 +29,8 @@ import (
type request struct {
id string
expiration time.Time
// Used to calculate since when this resource has been acquired
createdAt time.Time
}

type requestNode struct {
Expand Down Expand Up @@ -91,13 +94,12 @@ func newRequestQueue() *requestQueue {

// update updates expiration time is updated if already present,
// add a new requestID at the end otherwise (FIFO)
func (rq *requestQueue) update(requestID string, newExpiration time.Time) bool {

func (rq *requestQueue) update(requestID string, newExpiration, now time.Time) bool {
rq.lock.Lock()
defer rq.lock.Unlock()
req, exists := rq.requestMap[requestID]
if !exists {
req = request{id: requestID}
req = request{id: requestID, createdAt: now}
rq.requestList.Append(requestID)
logrus.Infof("request id %s added", requestID)
}
Expand Down Expand Up @@ -144,7 +146,7 @@ func (rq *requestQueue) getRank(requestID string, ttl time.Duration, now time.Ti
// not considering empty requestID as new
var new bool
if requestID != "" {
new = rq.update(requestID, now.Add(ttl))
new = rq.update(requestID, now.Add(ttl), now)
}
rank := 1
rq.lock.RLock()
Expand Down Expand Up @@ -244,6 +246,25 @@ func (rp *RequestManager) GetRank(key interface{}, id string) (int, bool) {
return rq.getRank(id, rp.ttl, rp.now())
}

// GetCreatedAt returns when the request was created
func (rp *RequestManager) GetCreatedAt(key interface{}, id string) (time.Time, error) {
rp.lock.Lock()
defer rp.lock.Unlock()
var createdTime time.Time
rq := rp.requests[key]
if rq == nil {
//This should never happen
return createdTime, fmt.Errorf("failed to get the created time because the request queue is nil for the key %v", key)
}

req, exists := rq.requestMap[id]
if !exists {
//This should never happen
return createdTime, fmt.Errorf("failed to get the created time because the request does not exist for the id %s", id)
}
return req.createdAt, nil
}

// Delete deletes a specific request such that it is not accounted in the next GetRank call.
// This is usually called when the request has been fulfilled.
func (rp *RequestManager) Delete(key interface{}, requestID string) {
Expand Down
52 changes: 31 additions & 21 deletions ranch/ranch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ type acquireRequestPriorityKey struct {
// dest - destination state of the requested resource
// owner - requester of the resource
// requestID - request ID to get a priority in the queue
// Out: A valid Resource object on success, or
// Out: A valid Resource object and the time when the resource was originally requested on success, or
// ResourceNotFound error if target type resource does not exist in target state.
func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.ResourceObject, error) {
func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.ResourceObject, time.Time, error) {
logger := logrus.WithFields(logrus.Fields{
"type": rType,
"state": state,
Expand All @@ -124,6 +124,7 @@ func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.Reso
})

var returnRes *crds.ResourceObject
createdTime := r.now()
if err := retryOnConflict(retry.DefaultBackoff, func() error {
logger.Debug("Determining request priority...")
ts := acquireRequestPriorityKey{rType: rType, state: state}
Expand Down Expand Up @@ -165,6 +166,10 @@ func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.Reso
}
// Deleting this request since it has been fulfilled
if requestID != "" {
if createdTime, err = r.requestMgr.GetCreatedAt(ts, requestID); err != nil {
// It is chosen NOT to fail the function since the resource has been already updated to give ownership.
logger.WithError(err).Errorf("Error occurred when getting the created time")
}
logger.Debug("Cleaning up requests.")
r.requestMgr.Delete(ts, requestID)
}
Expand All @@ -173,23 +178,7 @@ func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.Reso
return nil
}

if new {
logger.Debug("Checking for associated dynamic resource type...")
lifeCycle, err := r.Storage.GetDynamicResourceLifeCycle(rType)
// Assuming error means no associated dynamic resource.
if err == nil {
if typeCount < lifeCycle.Spec.MaxCount {
logger.Debug("Adding new dynamic resources...")
res := newResourceFromNewDynamicResourceLifeCycle(r.Storage.generateName(), lifeCycle, r.now())
if err := r.Storage.AddResource(res); err != nil {
logger.WithError(err).Warningf("unable to add a new resource of type %s", rType)
}
logger.Infof("Added dynamic resource %s of type %s", res.Name, res.Spec.Type)
}
} else {
logrus.WithError(err).Debug("Failed listing DRLC")
}
}
addResource(new, logger, r, rType, typeCount)

if typeCount > 0 {
return &ResourceNotFound{rType}
Expand All @@ -204,10 +193,31 @@ func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.Reso
default:
logrus.WithError(err).Error("Acquire failed")
}
return nil, err
return nil, createdTime, err
}

return returnRes, nil
return returnRes, createdTime, nil
}

func addResource(new bool, logger *logrus.Entry, r *Ranch, rType string, typeCount int) {
if !new {
return
}
logger.Debug("Checking for associated dynamic resource type...")
lifeCycle, err := r.Storage.GetDynamicResourceLifeCycle(rType)
// Assuming error means no associated dynamic resource.
if err == nil {
if typeCount < lifeCycle.Spec.MaxCount {
logger.Debug("Adding new dynamic resources...")
res := newResourceFromNewDynamicResourceLifeCycle(r.Storage.generateName(), lifeCycle, r.now())
if err := r.Storage.AddResource(res); err != nil {
logger.WithError(err).Warningf("unable to add a new resource of type %s", rType)
}
logger.Infof("Added dynamic resource %s of type %s", res.Name, res.Spec.Type)
}
} else {
logrus.WithError(err).Debug("Failed listing DRLC")
}
}

// AcquireByState checks out resources of a given type without an owner,
Expand Down
46 changes: 31 additions & 15 deletions ranch/ranch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ func TestAcquire(t *testing.T) {

for _, tc := range testcases {
c := makeTestRanch(tc.resources)
res, err := c.Acquire(tc.rtype, tc.state, tc.dest, tc.owner, "")
now := time.Now()
c.now = func() time.Time {
return now
}
res, createdTime, err := c.Acquire(tc.rtype, tc.state, tc.dest, tc.owner, "")
if !AreErrorsEqual(err, tc.expectErr) {
t.Errorf("%s - Got error %v, expected error %v", tc.name, err, tc.expectErr)
continue
Expand All @@ -210,7 +214,9 @@ func TestAcquire(t *testing.T) {
t.Errorf("failed to get resources")
continue
}

if !now.Equal(createdTime) {
t.Errorf("expected createdAt %s, got %s", now, createdTime)
}
if err == nil {
if res.Status.State != tc.dest {
t.Errorf("%s - Wrong final state. Got %v, expected %v", tc.name, res.Status.State, tc.dest)
Expand Down Expand Up @@ -239,38 +245,48 @@ func TestAcquirePriority(t *testing.T) {
r.requestMgr.now = func() time.Time { return now }

// Setting Priority, this request will fail
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1"); err == nil {
if _, _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1"); err == nil {
t.Errorf("should fail as there are not resource available")
}
if err := r.Storage.AddResource(res); err != nil {
t.Fatalf("failed to add resource: %v", err)
}
// Attempting to acquire this resource without priority
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, ""); err == nil {
if _, _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, ""); err == nil {
t.Errorf("should fail as there is only resource, and it is prioritizes to request_id_1")
}
// Attempting to acquire this resource with priority, which will set a place in the queue
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_2"); err == nil {
if _, _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_2"); err == nil {
t.Errorf("should fail as there is only resource, and it is prioritizes to request_id_1")
}
// Attempting with the first request
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1"); err != nil {
_, createdTime, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1")
if err != nil {
t.Fatalf("should succeed since the request priority should match its rank in the queue. got %v", err)
}
if !now.Equal(createdTime) {
t.Errorf("expected createdAt %s, got %s", now, createdTime)
}
r.Release(res.Name, common.Free, "tester")
// Attempting with the first request
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1"); err == nil {
if _, _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "request_id_1"); err == nil {
t.Errorf("should not succeed since this request has already been fulfilled")
}
// Attempting to acquire this resource without priority
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, ""); err == nil {
if _, _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, ""); err == nil {
t.Errorf("should fail as request_id_2 has rank 1 now")
}
r.requestMgr.cleanup(expiredFuture)
now2 := time.Now()
r.now = func() time.Time { return now2 }
// Attempting to acquire this resource without priority
if _, err := r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, ""); err != nil {
_, createdTime, err = r.Acquire(res.Spec.Type, res.Status.State, common.Dirty, owner, "")
if err != nil {
t.Errorf("request_id_2 expired, this should work now, got %v", err)
}
if !now2.Equal(createdTime) {
t.Errorf("expected createdAt %s, got %s", now, createdTime)
}
}

func TestAcquireRoundRobin(t *testing.T) {
Expand All @@ -283,7 +299,7 @@ func TestAcquireRoundRobin(t *testing.T) {

c := makeTestRanch(resources)
for i := 0; i < 4; i++ {
res, err := c.Acquire("t", "s", "d", "foo", "")
res, _, err := c.Acquire("t", "s", "d", "foo", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -318,7 +334,7 @@ func TestAcquireOnDemand(t *testing.T) {
c := makeTestRanch(dRLCs)
c.now = func() time.Time { return now }
// First acquire should trigger a creation
if _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID1); err == nil {
if _, _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID1); err == nil {
t.Errorf("should fail since there is not resource yet")
}
if resources, err := c.Storage.GetResources(); err != nil {
Expand All @@ -327,7 +343,7 @@ func TestAcquireOnDemand(t *testing.T) {
t.Fatal("A resource should have been created")
}
// Attempting to create another resource
if _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID1); err == nil {
if _, _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID1); err == nil {
t.Errorf("should succeed since the created is dirty")
}
if resources, err := c.Storage.GetResources(); err != nil {
Expand All @@ -336,7 +352,7 @@ func TestAcquireOnDemand(t *testing.T) {
t.Errorf("No new resource should have been created")
}
// Creating another
if _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID2); err == nil {
if _, _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID2); err == nil {
t.Errorf("should succeed since the created is dirty")
}
if resources, err := c.Storage.GetResources(); err != nil {
Expand All @@ -345,7 +361,7 @@ func TestAcquireOnDemand(t *testing.T) {
t.Errorf("Another resource should have been created")
}
// Attempting to create another
if _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID3); err == nil {
if _, _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID3); err == nil {
t.Errorf("should fail since there is not resource yet")
}
resources, err := c.Storage.GetResources()
Expand All @@ -357,7 +373,7 @@ func TestAcquireOnDemand(t *testing.T) {
for _, res := range resources.Items {
c.Storage.DeleteResource(res.Name)
}
if _, err := c.Acquire(rType, common.Free, common.Busy, owner, ""); err == nil {
if _, _, err := c.Acquire(rType, common.Free, common.Busy, owner, ""); err == nil {
t.Errorf("should fail since there is not resource yet")
}
if resources, err := c.Storage.GetResources(); err != nil {
Expand Down

0 comments on commit ec851bc

Please sign in to comment.