diff --git a/fw/thread.go b/fw/thread.go index d0bb4d61..9ee25b1a 100644 --- a/fw/thread.go +++ b/fw/thread.go @@ -83,7 +83,7 @@ func NewThread(id int) *Thread { t.threadID = id t.pendingInterests = make(chan *ndn.PendingPacket, fwQueueSize) t.pendingDatas = make(chan *ndn.PendingPacket, fwQueueSize) - t.pitCS = table.NewPitCS() + t.pitCS = table.NewPitCS(t.finalizeInterest) t.strategies = InstantiateStrategies(t) t.deadNonceList = table.NewDeadNonceList() t.shouldQuit = make(chan interface{}, 1) @@ -122,16 +122,17 @@ func (t *Thread) Run() { runtime.LockOSThread() } + pitUpdateTimer := t.pitCS.UpdateTimer() for !core.ShouldQuit { select { case pendingPacket := <-t.pendingInterests: t.processIncomingInterest(pendingPacket) case pendingPacket := <-t.pendingDatas: t.processIncomingData(pendingPacket) - case expiringPitEntry := <-t.pitCS.ExpiringPitEntries(): - t.finalizeInterest(expiringPitEntry) case <-t.deadNonceList.Ticker.C: t.deadNonceList.RemoveExpiredEntries() + case <-pitUpdateTimer: + t.pitCS.Update() case <-t.shouldQuit: continue } diff --git a/table/pit-cs-tree.go b/table/pit-cs-tree.go index f933ab55..fedf176d 100644 --- a/table/pit-cs-tree.go +++ b/table/pit-cs-tree.go @@ -3,7 +3,6 @@ package table import ( "bytes" "math/rand" - "sync" "time" "github.com/cespare/xxhash" @@ -14,6 +13,8 @@ import ( const expiredPitTickerInterval = 100 * time.Millisecond +type OnPitExpiration func(PitEntry) + // PitCsTree represents a PIT-CS implementation that uses a name tree type PitCsTree struct { basePitCsTable @@ -28,6 +29,8 @@ type PitCsTree struct { csMap map[uint64]*nameTreeCsEntry pitExpiryQueue priority_queue.Queue[*nameTreePitEntry, int64] + updateTimer chan struct{} + onExpiration OnPitExpiration } type nameTreePitEntry struct { @@ -47,9 +50,8 @@ type pitCsTreeNode struct { component ndn.NameComponent depth int - parent *pitCsTreeNode - children map[string]*pitCsTreeNode - childrenMutex sync.RWMutex + parent *pitCsTreeNode + children map[string]*pitCsTreeNode pitEntries []*nameTreePitEntry @@ -57,15 +59,16 @@ type pitCsTreeNode struct { } // NewPitCS creates a new combined PIT-CS for a forwarding thread. -func NewPitCS() *PitCsTree { +func NewPitCS(onExpiration OnPitExpiration) *PitCsTree { pitCs := new(PitCsTree) pitCs.root = new(pitCsTreeNode) pitCs.root.component = nil // Root component will be nil since it represents zero components pitCs.root.pitEntries = make([]*nameTreePitEntry, 0) pitCs.root.children = make(map[string]*pitCsTreeNode) - pitCs.expiringPitEntries = make(chan PitEntry, tableQueueSize) + pitCs.onExpiration = onExpiration pitCs.pitTokenMap = make(map[uint32]*nameTreePitEntry) pitCs.pitExpiryQueue = priority_queue.New[*nameTreePitEntry, int64]() + pitCs.updateTimer = make(chan struct{}) // This value has already been validated from loading the configuration, so we know it will be one of the following (or else fatal) switch csReplacementPolicy { @@ -76,31 +79,40 @@ func NewPitCS() *PitCsTree { } pitCs.csMap = make(map[uint64]*nameTreeCsEntry) - // Set up the expired PIT entries goroutine - go pitCs.expirationPitLoop() + // Schedule first signal + time.AfterFunc(expiredPitTickerInterval, func() { + pitCs.updateTimer <- struct{}{} + }) return pitCs } -func (p *PitCsTree) expirationPitLoop() { - for !core.ShouldQuit { +func (p *PitCsTree) UpdateTimer() <-chan struct{} { + return p.updateTimer +} + +func (p *PitCsTree) Update() { + for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() { + entry := p.pitExpiryQueue.Pop() + entry.queueIndex = -1 + p.onExpiration(entry) + p.RemoveInterest(entry) + } + if !core.ShouldQuit { + updateDuration := expiredPitTickerInterval if p.pitExpiryQueue.Len() > 0 { sleepTime := time.Duration(p.pitExpiryQueue.PeekPriority()-time.Now().UnixNano()) * time.Nanosecond if sleepTime > 0 { if sleepTime > expiredPitTickerInterval { sleepTime = expiredPitTickerInterval } - time.Sleep(sleepTime) + updateDuration = sleepTime } - } else { - time.Sleep(expiredPitTickerInterval) - } - for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() { - entry := p.pitExpiryQueue.Pop() - entry.queueIndex = -1 - p.expiringPitEntries <- entry - p.RemoveInterest(entry) } + // Schedule next signal + time.AfterFunc(updateDuration, func() { + p.updateTimer <- struct{}{} + }) } } @@ -279,9 +291,6 @@ func (e *nameTreePitEntry) GetOutRecords() []*PitOutRecord { func (p *pitCsTreeNode) findExactMatchEntry(name *ndn.Name) *pitCsTreeNode { if name.Size() > p.depth { - p.childrenMutex.RLock() - defer p.childrenMutex.RUnlock() - if child, ok := p.children[name.At(p.depth).String()]; ok { return child.findExactMatchEntry(name) } @@ -293,9 +302,6 @@ func (p *pitCsTreeNode) findExactMatchEntry(name *ndn.Name) *pitCsTreeNode { func (p *pitCsTreeNode) findLongestPrefixEntry(name *ndn.Name) *pitCsTreeNode { if name.Size() > p.depth { - p.childrenMutex.RLock() - defer p.childrenMutex.RUnlock() - if child, ok := p.children[name.At(p.depth).String()]; ok { return child.findLongestPrefixEntry(name) } @@ -312,26 +318,19 @@ func (p *pitCsTreeNode) fillTreeToPrefix(name *ndn.Name) *pitCsTreeNode { newNode.parent = curNode newNode.children = make(map[string]*pitCsTreeNode) - curNode.childrenMutex.Lock() curNode.children[newNode.component.String()] = newNode - curNode.childrenMutex.Unlock() - curNode = newNode } return curNode } func (p *pitCsTreeNode) getChildrenCount() int { - p.childrenMutex.RLock() - defer p.childrenMutex.RUnlock() return len(p.children) } func (p *pitCsTreeNode) pruneIfEmpty() { for curNode := p; curNode.parent != nil && curNode.getChildrenCount() == 0 && len(curNode.pitEntries) == 0 && curNode.csEntry == nil; curNode = curNode.parent { - curNode.parent.childrenMutex.Lock() delete(curNode.parent.children, curNode.component.String()) - curNode.parent.childrenMutex.Unlock() } } diff --git a/table/pit-cs-tree_test.go b/table/pit-cs-tree_test.go index 8b8c1a3d..1eb0eec3 100644 --- a/table/pit-cs-tree_test.go +++ b/table/pit-cs-tree_test.go @@ -12,7 +12,7 @@ import ( func TestNewPitCSTree(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) // Initialization size should be 0 assert.Equal(t, pitCS.PitSize(), 0) @@ -49,11 +49,11 @@ func TestIsCsAdmitting(t *testing.T) { csAdmit = false csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit) csAdmit = true - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit) } @@ -61,11 +61,11 @@ func TestIsCsServing(t *testing.T) { csServe = false csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) assert.Equal(t, pitCS.IsCsServing(), csServe) csServe = true - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) assert.Equal(t, pitCS.IsCsServing(), csServe) } @@ -78,7 +78,7 @@ func TestInsertInterest(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) pitEntry, duplicateNonce := pitCS.InsertInterest(interest, hint, inFace) @@ -161,7 +161,7 @@ func TestInsertInterest(t *testing.T) { assert.Equal(t, pitCS.PitSize(), 2) // PitCS with 2 interests, prefixes of each other. - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) hint, _ = ndn.NameFromString("/") inFace = uint64(4444) @@ -204,7 +204,7 @@ func TestInsertInterest(t *testing.T) { func TestRemoveInterest(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) hint, _ := ndn.NameFromString("/") inFace := uint64(1111) name1, _ := ndn.NameFromString("/interest1") @@ -242,7 +242,7 @@ func TestRemoveInterest(t *testing.T) { assert.Equal(t, pitCS.PitSize(), 3) // Remove PIT entry from a node with more than 1 child - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) name1, _ = ndn.NameFromString("/root/1") name2, _ = ndn.NameFromString("/root/2") name3, _ := ndn.NameFromString("/root/3") @@ -261,7 +261,7 @@ func TestRemoveInterest(t *testing.T) { func TestFindInterestExactMatch(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) hint, _ := ndn.NameFromString("/") inFace := uint64(1111) name, _ := ndn.NameFromString("/interest1") @@ -303,7 +303,7 @@ func TestFindInterestExactMatch(t *testing.T) { func TestFindInterestPrefixMatchByData(t *testing.T) { // Basically the same as FindInterestPrefixMatch, but with data instead csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) name, _ := ndn.NameFromString("/interest1") data := ndn.NewData(name, []byte("abc")) hint, _ := ndn.NameFromString("/") @@ -347,7 +347,7 @@ func TestFindInterestPrefixMatchByData(t *testing.T) { func TestInsertOutRecord(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) name, _ := ndn.NameFromString("/interest1") hint, _ := ndn.NameFromString("/") inFace := uint64(1111) @@ -381,7 +381,7 @@ func TestInsertOutRecord(t *testing.T) { func TestGetOutRecords(t *testing.T) { csReplacementPolicy = "lru" - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) name, _ := ndn.NameFromString("/interest1") hint, _ := ndn.NameFromString("/") inFace := uint64(1111) @@ -431,7 +431,7 @@ func TestGetOutRecords(t *testing.T) { func TestInsertData(t *testing.T) { csReplacementPolicy = "lru" csCapacity = 1024 - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) // Data does not already exist name1, _ := ndn.NameFromString("/interest1") @@ -465,7 +465,7 @@ func TestInsertData(t *testing.T) { // PitCS with interest /a/b, data /a/b/v=10 // Interest /a/b is prefix allowed // Should return data with name /a/b/v=10 - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) name1, _ = ndn.NameFromString("/a/b") interest1 = ndn.NewInterest(name1) @@ -480,7 +480,7 @@ func TestInsertData(t *testing.T) { // Reduced CS capacity to check that eviction occurs csCapacity = 1 - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) data1 = ndn.NewData(name1, []byte("data1")) data2 = ndn.NewData(name2, []byte("data2")) pitCS.InsertData(data1) @@ -491,7 +491,7 @@ func TestInsertData(t *testing.T) { func FindMatchingDataFromCS(t *testing.T) { csReplacementPolicy = "lru" csCapacity = 1024 - pitCS := NewPitCS() + pitCS := NewPitCS(func(PitEntry) {}) // Insert data and then fetch it name1, _ := ndn.NameFromString("/interest1") @@ -522,7 +522,7 @@ func FindMatchingDataFromCS(t *testing.T) { // PitCS with interest /a/b, data /a/b/v=10 // Interest /a/b is prefix allowed // Should return data with name /a/b/v=10 - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) name1, _ = ndn.NameFromString("/a/b") interest1 = ndn.NewInterest(name1) @@ -537,7 +537,7 @@ func FindMatchingDataFromCS(t *testing.T) { // PitCS with interest /a/b // Now look for interest /a/b with prefix allowed // Should return nil since there is no data - pitCS = NewPitCS() + pitCS = NewPitCS(func(PitEntry) {}) name1, _ = ndn.NameFromString("/a/b") interest1 = ndn.NewInterest(name1) diff --git a/table/pit-cs.go b/table/pit-cs.go index 59266dda..3d67fc47 100644 --- a/table/pit-cs.go +++ b/table/pit-cs.go @@ -14,6 +14,7 @@ import ( ) // PitCsTable dictates what functionality a Pit-Cs table should implement +// Warning: All functions must be called in the same forwarding goroutine as the creation of the table. type PitCsTable interface { InsertInterest(interest *ndn.Interest, hint *ndn.Name, inFace uint64) (PitEntry, bool) RemoveInterest(pitEntry PitEntry) bool @@ -30,13 +31,16 @@ type PitCsTable interface { eraseCsDataFromReplacementStrategy(index uint64) updatePitExpiry(pitEntry PitEntry) - ExpiringPitEntries() chan PitEntry + // UpdateTimer returns the channel used to signal regular Update() calls in the forwarding thread. + // <- UpdateTimer() and Update() must be called in pairs. + UpdateTimer() <-chan struct{} + // Update() does whatever the PIT table needs to do regularly. + // It may schedule the next UpdateTimer(). + Update() } // basePitCsTable contains properties common to all PIT-CS tables -type basePitCsTable struct { - expiringPitEntries chan PitEntry -} +type basePitCsTable struct{} // PitEntry dictates what entries in a PIT-CS table should implement type PitEntry interface { @@ -230,9 +234,3 @@ func (bce *baseCsEntry) StaleTime() time.Time { func (bce *baseCsEntry) Data() *ndn.Data { return bce.data } - -// ExpiringPitEntries returns the channel to which PIT entries that are about -// to expire are sent and received. -func (p *basePitCsTable) ExpiringPitEntries() chan PitEntry { - return p.expiringPitEntries -}