From 7f5cc483049195074e259e05739b78877c417e4b Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Wed, 6 Nov 2024 22:03:17 +0100 Subject: [PATCH] actually migrate code for new structure --- pkg/roles/dhcp/api_leases.go | 50 ++++++++-- pkg/roles/dhcp/api_leases_test.go | 6 +- pkg/roles/dhcp/ipam_internal.go | 10 +- pkg/roles/dhcp/ipam_internal_test.go | 2 +- pkg/roles/dhcp/leases.go | 58 +++--------- pkg/roles/dhcp/leases_watch.go | 84 ----------------- pkg/roles/dhcp/metrics.go | 16 +--- pkg/roles/dhcp/role.go | 12 --- pkg/roles/dhcp/scope_selector.go | 53 +++++++++++ pkg/roles/dhcp/scopes.go | 135 +++++++++++++++------------ pkg/roles/dhcp/scopes_watch.go | 12 ++- pkg/tests/utils.go | 23 ----- 12 files changed, 202 insertions(+), 259 deletions(-) delete mode 100644 pkg/roles/dhcp/leases_watch.go create mode 100644 pkg/roles/dhcp/scope_selector.go diff --git a/pkg/roles/dhcp/api_leases.go b/pkg/roles/dhcp/api_leases.go index 679dd2ec9..4cb676a86 100644 --- a/pkg/roles/dhcp/api_leases.go +++ b/pkg/roles/dhcp/api_leases.go @@ -47,11 +47,16 @@ func (r *Role) APILeasesGet() usecase.Interactor { r.log.Warn("failed to get scope", zap.Error(err)) return status.Wrap(errors.New("failed to get scope"), status.Internal) } + s, err := r.scopeFromKV(rawScope.Kvs[0]) + if err != nil { + r.log.Warn("failed to parse scope", zap.Error(err)) + return status.Wrap(err, status.Internal) + } leaseKey := r.i.KV().Key( types.KeyRole, types.KeyScopes, - input.ScopeName, + s.Name, ) if input.Identifier == "" { leaseKey = leaseKey.Prefix(true) @@ -63,7 +68,7 @@ func (r *Role) APILeasesGet() usecase.Interactor { return status.Wrap(err, status.Internal) } for _, lease := range rawLeases.Kvs { - l, err := r.leaseFromKV(lease) + l, err := s.leaseFromKV(lease) if err != nil { r.log.Warn("failed to parse lease", zap.Error(err)) continue @@ -127,7 +132,7 @@ func (r *Role) APILeasesPut() usecase.Interactor { return status.Wrap(errors.New("failed to construct scope"), status.Internal) } - l := r.NewLease(input.Identifier) + l := scope.NewLease(input.Identifier) l.Address = input.Address l.Hostname = input.Hostname l.AddressLeaseTime = input.AddressLeaseTime @@ -156,13 +161,40 @@ type APILeasesWOLInput struct { func (r *Role) APILeasesWOL() usecase.Interactor { u := usecase.NewInteractor(func(ctx context.Context, input APILeasesWOLInput, output *struct{}) error { - r.leasesM.RLock() - l, ok := r.leases[input.Identifier] - r.leasesM.RUnlock() - if !ok { - return status.InvalidArgument + rawScope, err := r.i.KV().Get( + ctx, + r.i.KV().Key( + types.KeyRole, + types.KeyScopes, + input.Scope, + ).String(), + ) + if err != nil || len(rawScope.Kvs) < 1 { + r.log.Warn("failed to get scope", zap.Error(err)) + return status.Wrap(errors.New("failed to get scope"), status.Internal) } - err := l.sendWOL() + scope, err := r.scopeFromKV(rawScope.Kvs[0]) + if err != nil { + r.log.Warn("failed to construct scope", zap.Error(err)) + return status.Wrap(errors.New("failed to construct scope"), status.Internal) + } + + leaseKey := r.i.KV().Key( + types.KeyRole, + types.KeyScopes, + scope.Name, + input.Identifier, + ) + rawLeases, err := r.i.KV().Get(ctx, leaseKey.String(), clientv3.WithPrefix()) + if err != nil || len(rawLeases.Kvs) < 1 { + return status.Wrap(err, status.InvalidArgument) + } + l, err := scope.leaseFromKV(rawLeases.Kvs[0]) + if err != nil { + return status.Wrap(err, status.Internal) + } + + err = l.sendWOL() if err != nil { return status.Wrap(err, status.Internal) } diff --git a/pkg/roles/dhcp/api_leases_test.go b/pkg/roles/dhcp/api_leases_test.go index 06d36742a..0eba9e75c 100644 --- a/pkg/roles/dhcp/api_leases_test.go +++ b/pkg/roles/dhcp/api_leases_test.go @@ -33,7 +33,7 @@ func TestAPILeasesGet(t *testing.T) { types.KeyScopes, scope.Name, ).String(), - tests.MustJSON(scope), + tests.MustJSON(&scope), )) lease := testLease() tests.PanicIfError(inst.KV().Put( @@ -70,7 +70,7 @@ func TestAPILeasesPut(t *testing.T) { types.KeyScopes, scope.Name, ).String(), - tests.MustJSON(scope), + tests.MustJSON(&scope), )) assert.NoError(t, role.APILeasesPut().Interact(ctx, dhcp.APILeasesPutInput{ Identifier: name, @@ -111,7 +111,7 @@ func TestAPILeasesDelete(t *testing.T) { types.KeyScopes, scope.Name, ).String(), - tests.MustJSON(scope), + tests.MustJSON(&scope), )) lease := testLease() tests.PanicIfError(inst.KV().Put( diff --git a/pkg/roles/dhcp/ipam_internal.go b/pkg/roles/dhcp/ipam_internal.go index bb612a976..8e313169f 100644 --- a/pkg/roles/dhcp/ipam_internal.go +++ b/pkg/roles/dhcp/ipam_internal.go @@ -147,13 +147,9 @@ func (i *InternalIPAM) IsIPFree(ip netip.Addr, identifier *string) bool { return false } // check for existing leases - i.role.leasesM.RLock() - defer i.role.leasesM.RUnlock() - for _, l := range i.role.leases { - // Ignore leases from other scopes - if l.ScopeKey != i.scope.Name { - continue - } + i.scope.leasesSync.RLock() + defer i.scope.leasesSync.RUnlock() + for _, l := range i.scope.leases { if l.Address != ip.String() { continue } diff --git a/pkg/roles/dhcp/ipam_internal_test.go b/pkg/roles/dhcp/ipam_internal_test.go index f916fc369..e1cb18576 100644 --- a/pkg/roles/dhcp/ipam_internal_test.go +++ b/pkg/roles/dhcp/ipam_internal_test.go @@ -58,7 +58,7 @@ func TestIPAMInternal_NextFreeAddress_UniqueParallel(t *testing.T) { types.KeyScopes, scope.Name, ).String(), - tests.MustJSON(scope), + tests.MustJSON(&scope), )) // Create fake leases to test against for i := 0; i < iter-10; i++ { diff --git a/pkg/roles/dhcp/leases.go b/pkg/roles/dhcp/leases.go index c6732332e..5e257e209 100644 --- a/pkg/roles/dhcp/leases.go +++ b/pkg/roles/dhcp/leases.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "fmt" "math" "net" "net/netip" @@ -42,47 +41,24 @@ type Lease struct { } func (r *Role) FindLease(req *Request4) *Lease { - r.leasesM.RLock() - defer r.leasesM.RUnlock() - lease, ok := r.leases[r.DeviceIdentifier(req.DHCPv4)] + expectedScope := r.findScopeForRequest(req) + expectedScope.leasesSync.RLock() + defer expectedScope.leasesSync.RUnlock() + lease, ok := expectedScope.leases[r.DeviceIdentifier(req.DHCPv4)] if !ok { return nil } - // Check if the leases's scope matches the expected scope to handle this request - expectedScope := r.findScopeForRequest(req, func(scope *Scope) int { - // Consider the existing lease for finding the scope - // Check how many bits of the leases address match the scope - sm := scope.match(net.ParseIP(lease.Address)) - // If the matching bits match how many bits are in the CIDR, and - // the scope of the lease matches the scope we're filtering, we've - // got a match - if sm == lease.scope.cidr.Bits() && lease.ScopeKey == scope.Name { - return 99 - } - return -1 - }) - if expectedScope != nil && lease.scope != expectedScope { - // We have a specific scope to handle this request but it doesn't match the lease - lease.scope = expectedScope - lease.ScopeKey = expectedScope.Name - lease.setLeaseIP(req) - lease.log.Info("Re-assigning address for lease due to changed request scope", zap.String("newIP", lease.Address)) - go func() { - err := lease.Put(req.Context, lease.scope.TTL) - if err != nil { - r.log.Warn("failed to update lease", zap.Error(err)) - } - }() - } return lease } -func (r *Role) NewLease(identifier string) *Lease { +func (s *Scope) NewLease(identifier string) *Lease { return &Lease{ - inst: r.i, + inst: s.inst, Identifier: identifier, - log: r.log.With(zap.String("identifier", identifier)), + log: s.log.With(zap.String("identifier", identifier)), Expiry: 0, + scope: s, + ScopeKey: s.Name, } } @@ -107,27 +83,21 @@ func (l *Lease) setLeaseIP(req *Request4) { l.scope.ipam.UseIP(*ip, l.Identifier) } -func (r *Role) leaseFromKV(raw *mvccpb.KeyValue) (*Lease, error) { - prefix := r.i.KV().Key( +func (s *Scope) leaseFromKV(raw *mvccpb.KeyValue) (*Lease, error) { + prefix := s.inst.KV().Key( types.KeyRole, types.KeyScopes, ).Prefix(true).String() keyParts := strings.SplitN(prefix, "/", 2) identifier := strings.TrimPrefix(string(raw.Key), prefix+"/"+keyParts[0]) - l := r.NewLease(identifier) + l := s.NewLease(identifier) err := json.Unmarshal(raw.Value, &l) if err != nil { return l, err } l.etcdKey = string(raw.Key) - - r.scopesM.RLock() - scope, ok := r.scopes[l.ScopeKey] - r.scopesM.RUnlock() - if !ok { - return l, fmt.Errorf("DHCP lease with invalid scope key: %s", l.ScopeKey) - } - l.scope = scope + l.scope = s + l.ScopeKey = s.Name return l, nil } diff --git a/pkg/roles/dhcp/leases_watch.go b/pkg/roles/dhcp/leases_watch.go deleted file mode 100644 index 5b591308c..000000000 --- a/pkg/roles/dhcp/leases_watch.go +++ /dev/null @@ -1,84 +0,0 @@ -package dhcp - -import ( - "context" - "errors" - "strings" - "time" - - "beryju.io/gravity/pkg/roles/dhcp/types" - "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" -) - -func (r *Role) handleLeaseOp(ev *clientv3.Event) { - rec, err := r.leaseFromKV(ev.Kv) - if ev.Type == clientv3.EventTypeDelete { - r.leasesM.Lock() - defer r.leasesM.Unlock() - delete(r.leases, rec.Identifier) - } else { - // Check if the lease parsed above actually was parsed correctly, - // we don't care for that when removing, but prevent adding - // empty leases - if err != nil { - r.log.Warn("failed to parse lease", zap.Error(err)) - return - } - r.leasesM.Lock() - defer r.leasesM.Unlock() - r.leases[rec.Identifier] = rec - } -} - -func (r *Role) loadInitialLeases(ctx context.Context) { - prefix := r.i.KV().Key( - types.KeyRole, - types.KeyScopes, - ).Prefix(true).String() - leases, err := r.i.KV().Get( - ctx, - prefix, - clientv3.WithPrefix(), - ) - if err != nil { - r.log.Warn("failed to list initial leases", zap.Error(err)) - if !errors.Is(err, context.Canceled) { - time.Sleep(5 * time.Second) - r.loadInitialLeases(ctx) - } - return - } - for _, lease := range leases.Kvs { - relKey := strings.ReplaceAll(string(lease.Key), prefix, "") - if !strings.Contains("/", relKey) { - continue - } - r.handleLeaseOp(&clientv3.Event{ - Type: mvccpb.PUT, - Kv: lease, - }) - } -} - -func (r *Role) startWatchLeases() { - prefix := r.i.KV().Key( - types.KeyRole, - types.KeyScopes, - ).Prefix(true).String() - watchChan := r.i.KV().Watch( - r.ctx, - prefix, - clientv3.WithPrefix(), - ) - for watchResp := range watchChan { - for _, event := range watchResp.Events { - relKey := strings.ReplaceAll(string(event.Kv.Key), prefix, "") - if !strings.Contains("/", relKey) { - continue - } - r.handleLeaseOp(event) - } - } -} diff --git a/pkg/roles/dhcp/metrics.go b/pkg/roles/dhcp/metrics.go index b5cc41d7c..25dbb22b5 100644 --- a/pkg/roles/dhcp/metrics.go +++ b/pkg/roles/dhcp/metrics.go @@ -1,8 +1,6 @@ package dhcp import ( - "math/big" - "beryju.io/gravity/pkg/extconfig" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -36,14 +34,8 @@ var ( func (s *Scope) calculateUsage() { usable := s.ipam.UsableSize() dhcpScopeSize.WithLabelValues(s.Name).Set(float64(usable.Uint64())) - used := big.NewInt(0) - s.role.leasesM.RLock() - defer s.role.leasesM.RUnlock() - for _, lease := range s.role.leases { - if lease.ScopeKey != s.Name { - continue - } - used = used.Add(used, big.NewInt(1)) - } - dhcpScopeUsage.WithLabelValues(s.Name).Set(float64(used.Uint64())) + s.leasesSync.RLock() + defer s.leasesSync.RUnlock() + used := len(s.leases) + dhcpScopeUsage.WithLabelValues(s.Name).Set(float64(used)) } diff --git a/pkg/roles/dhcp/role.go b/pkg/roles/dhcp/role.go index 5c6ba40f6..1aae51945 100644 --- a/pkg/roles/dhcp/role.go +++ b/pkg/roles/dhcp/role.go @@ -26,7 +26,6 @@ type Role struct { ctx context.Context scopes map[string]*Scope - leases map[string]*Lease cfg *RoleConfig @@ -35,7 +34,6 @@ type Role struct { oui *oui.OuiDb scopesM sync.RWMutex - leasesM sync.RWMutex } func New(instance roles.Instance) *Role { @@ -44,8 +42,6 @@ func New(instance roles.Instance) *Role { i: instance, scopes: make(map[string]*Scope), scopesM: sync.RWMutex{}, - leases: make(map[string]*Lease), - leasesM: sync.RWMutex{}, ctx: instance.Context(), } r.s4 = &handler4{ @@ -78,16 +74,8 @@ func (r *Role) Start(ctx context.Context, config []byte) error { start := sentry.TransactionFromContext(ctx).StartChild("gravity.dhcp.start") defer start.Finish() r.loadInitialScopes(start.Context()) - r.loadInitialLeases(start.Context()) - - // Since scope usage relies on r.leases, but r.leases is loaded after the scopes, - // manually update the usage - for _, s := range r.scopes { - s.calculateUsage() - } go r.startWatchScopes() - go r.startWatchLeases() err := r.initServer4() if err != nil { diff --git a/pkg/roles/dhcp/scope_selector.go b/pkg/roles/dhcp/scope_selector.go new file mode 100644 index 000000000..367ceb7f7 --- /dev/null +++ b/pkg/roles/dhcp/scope_selector.go @@ -0,0 +1,53 @@ +package dhcp + +import ( + "net" + + "go.uber.org/zap" +) + +type scopeSelector func(scope *Scope) int + +func (r *Role) findScopeForRequest(req *Request4, additionalSelectors ...scopeSelector) *Scope { + var match *Scope + longestBits := 0 + r.scopesM.RLock() + defer r.scopesM.RUnlock() + // To prioritise requests from a DHCP relay being matched correctly, give their subnet + // match a 1 bit more priority + const dhcpRelayBias = 1 + for _, scope := range r.scopes { + // Check additional selectors (highest priority) + for _, sel := range additionalSelectors { + m := sel(scope) + if m > -1 && m > longestBits { + match = scope + longestBits = m + } + } + // Check based on gateway IP (next highest priority) + gatewayMatchBits := scope.match(req.GatewayIPAddr) + if gatewayMatchBits > -1 && gatewayMatchBits+dhcpRelayBias > longestBits { + req.log.Debug("selected scope based on cidr match (gateway IP)", zap.String("scope", scope.Name)) + match = scope + longestBits = gatewayMatchBits + dhcpRelayBias + } + // Handle local broadcast, check with the instance's listening IP + // Only consider local scopes if we don't have a match already + localMatchBits := scope.match(net.ParseIP(req.LocalIP())) + if localMatchBits > -1 && localMatchBits > longestBits { + req.log.Debug("selected scope based on cidr match (instance/interface IP)", zap.String("scope", scope.Name)) + match = scope + longestBits = localMatchBits + } + // Fallback to default scope if we don't already have a match + if match == nil && scope.Default { + req.log.Debug("selected scope based on default flag", zap.String("scope", scope.Name)) + match = scope + } + } + if match != nil { + req.log.Debug("final scope selection", zap.String("scope", match.Name)) + } + return match +} diff --git a/pkg/roles/dhcp/scopes.go b/pkg/roles/dhcp/scopes.go index 843894662..59d5b7bca 100644 --- a/pkg/roles/dhcp/scopes.go +++ b/pkg/roles/dhcp/scopes.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "strings" + "sync" "time" "beryju.io/gravity/pkg/roles" @@ -26,17 +27,19 @@ type ScopeDNS struct { type Scope struct { ipam IPAM inst roles.Instance - DNS *ScopeDNS `json:"dns"` - - IPAM map[string]string `json:"ipam"` role *Role log *zap.Logger cidr netip.Prefix Name string `json:"-"` - etcdKey string + etcdKey string + leases map[string]*Lease + leasesWatchCtx context.CancelFunc + leasesSync sync.RWMutex + DNS *ScopeDNS `json:"dns"` + IPAM map[string]string `json:"ipam"` SubnetCIDR string `json:"subnetCidr"` Options []*types.DHCPOption `json:"options"` TTL int64 `json:"ttl"` @@ -46,13 +49,15 @@ type Scope struct { func (r *Role) NewScope(name string) *Scope { return &Scope{ - Name: name, - inst: r.i, - role: r, - TTL: int64((7 * 24 * time.Hour).Seconds()), - log: r.log.With(zap.String("scope", name)), - DNS: &ScopeDNS{}, - IPAM: make(map[string]string), + Name: name, + inst: r.i, + role: r, + TTL: int64((7 * 24 * time.Hour).Seconds()), + log: r.log.With(zap.String("scope", name)), + DNS: &ScopeDNS{}, + IPAM: make(map[string]string), + leases: make(map[string]*Lease), + leasesSync: sync.RWMutex{}, } } @@ -96,52 +101,6 @@ func (s *Scope) ipamType(previous *Scope) (IPAM, error) { } } -type scopeSelector func(scope *Scope) int - -func (r *Role) findScopeForRequest(req *Request4, additionalSelectors ...scopeSelector) *Scope { - var match *Scope - longestBits := 0 - r.scopesM.RLock() - defer r.scopesM.RUnlock() - // To prioritise requests from a DHCP relay being matched correctly, give their subnet - // match a 1 bit more priority - const dhcpRelayBias = 1 - for _, scope := range r.scopes { - // Check additional selectors (highest priority) - for _, sel := range additionalSelectors { - m := sel(scope) - if m > -1 && m > longestBits { - match = scope - longestBits = m - } - } - // Check based on gateway IP (next highest priority) - gatewayMatchBits := scope.match(req.GatewayIPAddr) - if gatewayMatchBits > -1 && gatewayMatchBits+dhcpRelayBias > longestBits { - req.log.Debug("selected scope based on cidr match (gateway IP)", zap.String("scope", scope.Name)) - match = scope - longestBits = gatewayMatchBits + dhcpRelayBias - } - // Handle local broadcast, check with the instance's listening IP - // Only consider local scopes if we don't have a match already - localMatchBits := scope.match(net.ParseIP(req.LocalIP())) - if localMatchBits > -1 && localMatchBits > longestBits { - req.log.Debug("selected scope based on cidr match (instance/interface IP)", zap.String("scope", scope.Name)) - match = scope - longestBits = localMatchBits - } - // Fallback to default scope if we don't already have a match - if match == nil && scope.Default { - req.log.Debug("selected scope based on default flag", zap.String("scope", scope.Name)) - match = scope - } - } - if match != nil { - req.log.Debug("final scope selection", zap.String("scope", match.Name)) - } - return match -} - func (s *Scope) match(peer net.IP) int { ip, err := netip.ParseAddr(peer.String()) if err != nil { @@ -156,11 +115,8 @@ func (s *Scope) match(peer net.IP) int { func (s *Scope) createLeaseFor(req *Request4) *Lease { ident := s.role.DeviceIdentifier(req.DHCPv4) - lease := s.role.NewLease(ident) + lease := s.NewLease(ident) lease.Hostname = req.HostName() - - lease.scope = s - lease.ScopeKey = s.Name lease.setLeaseIP(req) req.log.Info("creating new DHCP lease", zap.String("ip", lease.Address), zap.String("identifier", ident)) return lease @@ -210,3 +166,60 @@ func (s *Scope) executeHook(method string, args ...interface{}) { }, }, args...) } + +func (s *Scope) watchScopeLeases(ctx context.Context) { + evtHandler := func(ev *clientv3.Event) { + lease, err := s.leaseFromKV(ev.Kv) + defer s.calculateUsage() + if ev.Type == clientv3.EventTypeDelete { + delete(s.leases, lease.Identifier) + } else { + // Check if the record parsed above actually was parsed correctly, + // we don't care for that when removing, but prevent adding + // empty leases + if err != nil { + return + } + s.leasesSync.Lock() + defer s.leasesSync.Unlock() + s.leases[lease.Identifier] = lease + } + } + ctx, canc := context.WithCancel(ctx) + s.leasesWatchCtx = canc + + prefix := s.inst.KV().Key(s.etcdKey).Prefix(true).String() + + leases, err := s.inst.KV().Get(ctx, prefix, clientv3.WithPrefix()) + if err != nil { + s.log.Warn("failed to list initial leases", zap.Error(err)) + time.Sleep(5 * time.Second) + s.watchScopeLeases(ctx) + return + } + for _, lease := range leases.Kvs { + evtHandler(&clientv3.Event{ + Type: mvccpb.PUT, + Kv: lease, + }) + } + + watchChan := s.inst.KV().Watch( + ctx, + prefix, + clientv3.WithPrefix(), + ) + go func() { + for watchResp := range watchChan { + for _, event := range watchResp.Events { + go evtHandler(event) + } + } + }() +} + +func (s *Scope) StopWatchingLeases() { + if s != nil && s.leasesWatchCtx != nil { + s.leasesWatchCtx() + } +} diff --git a/pkg/roles/dhcp/scopes_watch.go b/pkg/roles/dhcp/scopes_watch.go index 46c0ac184..417fd1cfc 100644 --- a/pkg/roles/dhcp/scopes_watch.go +++ b/pkg/roles/dhcp/scopes_watch.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap" ) -func (r *Role) handleScopeOp(t mvccpb.Event_EventType, kv *mvccpb.KeyValue) bool { +func (r *Role) handleScopeOp(t mvccpb.Event_EventType, kv *mvccpb.KeyValue, ctx context.Context) bool { prefix := r.i.KV().Key(types.KeyRole, types.KeyScopes).Prefix(true) relKey := strings.TrimPrefix(string(kv.Key), prefix.String()) // we only care about scope-level updates, everything underneath doesn't matter @@ -23,14 +23,20 @@ func (r *Role) handleScopeOp(t mvccpb.Event_EventType, kv *mvccpb.KeyValue) bool r.log.Debug("removed scope", zap.String("key", relKey)) r.scopesM.Lock() defer r.scopesM.Unlock() + sc := r.scopes[relKey] + sc.StopWatchingLeases() delete(r.scopes, relKey) } else if t == mvccpb.PUT { s, err := r.scopeFromKV(kv) if err != nil { r.log.Warn("failed to convert scope from event", zap.Error(err)) } else { + s.watchScopeLeases(ctx) s.calculateUsage() r.scopesM.Lock() + if oldScope, ok := r.scopes[s.Name]; ok { + oldScope.StopWatchingLeases() + } r.scopes[s.Name] = s r.scopesM.Unlock() r.log.Debug("added scope", zap.String("name", s.Name)) @@ -57,7 +63,7 @@ func (r *Role) loadInitialScopes(ctx context.Context) { return } for _, scope := range scopes.Kvs { - r.handleScopeOp(mvccpb.PUT, scope) + r.handleScopeOp(mvccpb.PUT, scope, ctx) } } @@ -69,7 +75,7 @@ func (r *Role) startWatchScopes() { ) for watchResp := range watchChan { for _, event := range watchResp.Events { - if r.handleScopeOp(event.Type, event.Kv) { + if r.handleScopeOp(event.Type, event.Kv, r.ctx) { r.log.Debug("scope watch update", zap.String("key", string(event.Kv.Key))) } } diff --git a/pkg/tests/utils.go b/pkg/tests/utils.go index 647281175..35546d543 100644 --- a/pkg/tests/utils.go +++ b/pkg/tests/utils.go @@ -4,12 +4,10 @@ import ( "context" "encoding/json" "fmt" - "net" "net/netip" "runtime" "strings" "testing" - "time" "beryju.io/gravity/pkg/extconfig" "beryju.io/gravity/pkg/storage" @@ -108,24 +106,3 @@ func Listen(port int32) string { } return extconfig.Get().Listen(port) } - -func WaitForPort(port int32) { - max := 30 - try := 0 - listen := Listen(port) - time.Sleep(500 * time.Millisecond) - for { - ln, err := net.Listen("tcp", listen) - if ln != nil { - _ = ln.Close() - } - if err != nil { - return - } - try += 1 - if try >= max { - panic(fmt.Errorf("failed to wait for port '%s' to be listening", listen)) - } - time.Sleep(1 * time.Millisecond) - } -}