Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pit-cs: Remove extra goroutine and Mutex #40

Merged
merged 2 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions fw/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewThread(id int) *Thread {
t.threadID = id
t.pendingInterests = make(chan *ndn.PendingPacket, fwQueueSize)
t.pendingDatas = make(chan *ndn.PendingPacket, fwQueueSize)
t.pitCS = table.NewPitCS()
t.pitCS = table.NewPitCS(t.finalizeInterest)
t.strategies = InstantiateStrategies(t)
t.deadNonceList = table.NewDeadNonceList()
t.shouldQuit = make(chan interface{}, 1)
Expand Down Expand Up @@ -122,16 +122,17 @@ func (t *Thread) Run() {
runtime.LockOSThread()
}

pitUpdateTimer := t.pitCS.UpdateTimer()
for !core.ShouldQuit {
select {
case pendingPacket := <-t.pendingInterests:
t.processIncomingInterest(pendingPacket)
case pendingPacket := <-t.pendingDatas:
t.processIncomingData(pendingPacket)
case expiringPitEntry := <-t.pitCS.ExpiringPitEntries():
t.finalizeInterest(expiringPitEntry)
case <-t.deadNonceList.Ticker.C:
t.deadNonceList.RemoveExpiredEntries()
case <-pitUpdateTimer:
t.pitCS.Update()
case <-t.shouldQuit:
continue
}
Expand Down
63 changes: 31 additions & 32 deletions table/pit-cs-tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package table
import (
"bytes"
"math/rand"
"sync"
"time"

"github.com/cespare/xxhash"
Expand All @@ -14,6 +13,8 @@ import (

const expiredPitTickerInterval = 100 * time.Millisecond

type OnPitExpiration func(PitEntry)

// PitCsTree represents a PIT-CS implementation that uses a name tree
type PitCsTree struct {
basePitCsTable
Expand All @@ -28,6 +29,8 @@ type PitCsTree struct {
csMap map[uint64]*nameTreeCsEntry

pitExpiryQueue priority_queue.Queue[*nameTreePitEntry, int64]
updateTimer chan struct{}
onExpiration OnPitExpiration
}

type nameTreePitEntry struct {
Expand All @@ -47,25 +50,25 @@ type pitCsTreeNode struct {
component ndn.NameComponent
depth int

parent *pitCsTreeNode
children map[string]*pitCsTreeNode
childrenMutex sync.RWMutex
parent *pitCsTreeNode
children map[string]*pitCsTreeNode

pitEntries []*nameTreePitEntry

csEntry *nameTreeCsEntry
}

// NewPitCS creates a new combined PIT-CS for a forwarding thread.
func NewPitCS() *PitCsTree {
func NewPitCS(onExpiration OnPitExpiration) *PitCsTree {
pitCs := new(PitCsTree)
pitCs.root = new(pitCsTreeNode)
pitCs.root.component = nil // Root component will be nil since it represents zero components
pitCs.root.pitEntries = make([]*nameTreePitEntry, 0)
pitCs.root.children = make(map[string]*pitCsTreeNode)
pitCs.expiringPitEntries = make(chan PitEntry, tableQueueSize)
pitCs.onExpiration = onExpiration
pitCs.pitTokenMap = make(map[uint32]*nameTreePitEntry)
pitCs.pitExpiryQueue = priority_queue.New[*nameTreePitEntry, int64]()
pitCs.updateTimer = make(chan struct{})

// This value has already been validated from loading the configuration, so we know it will be one of the following (or else fatal)
switch csReplacementPolicy {
Expand All @@ -76,31 +79,40 @@ func NewPitCS() *PitCsTree {
}
pitCs.csMap = make(map[uint64]*nameTreeCsEntry)

// Set up the expired PIT entries goroutine
go pitCs.expirationPitLoop()
// Schedule first signal
time.AfterFunc(expiredPitTickerInterval, func() {
pitCs.updateTimer <- struct{}{}
})

return pitCs
}

func (p *PitCsTree) expirationPitLoop() {
for !core.ShouldQuit {
func (p *PitCsTree) UpdateTimer() <-chan struct{} {
return p.updateTimer
}

func (p *PitCsTree) Update() {
for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() {
entry := p.pitExpiryQueue.Pop()
entry.queueIndex = -1
p.onExpiration(entry)
p.RemoveInterest(entry)
}
if !core.ShouldQuit {
updateDuration := expiredPitTickerInterval
if p.pitExpiryQueue.Len() > 0 {
sleepTime := time.Duration(p.pitExpiryQueue.PeekPriority()-time.Now().UnixNano()) * time.Nanosecond
if sleepTime > 0 {
if sleepTime > expiredPitTickerInterval {
sleepTime = expiredPitTickerInterval
}
time.Sleep(sleepTime)
updateDuration = sleepTime
}
} else {
time.Sleep(expiredPitTickerInterval)
}
for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() {
entry := p.pitExpiryQueue.Pop()
entry.queueIndex = -1
p.expiringPitEntries <- entry
p.RemoveInterest(entry)
}
// Schedule next signal
time.AfterFunc(updateDuration, func() {
p.updateTimer <- struct{}{}
})
}
}

Expand Down Expand Up @@ -279,9 +291,6 @@ func (e *nameTreePitEntry) GetOutRecords() []*PitOutRecord {

func (p *pitCsTreeNode) findExactMatchEntry(name *ndn.Name) *pitCsTreeNode {
if name.Size() > p.depth {
p.childrenMutex.RLock()
defer p.childrenMutex.RUnlock()

if child, ok := p.children[name.At(p.depth).String()]; ok {
return child.findExactMatchEntry(name)
}
Expand All @@ -293,9 +302,6 @@ func (p *pitCsTreeNode) findExactMatchEntry(name *ndn.Name) *pitCsTreeNode {

func (p *pitCsTreeNode) findLongestPrefixEntry(name *ndn.Name) *pitCsTreeNode {
if name.Size() > p.depth {
p.childrenMutex.RLock()
defer p.childrenMutex.RUnlock()

if child, ok := p.children[name.At(p.depth).String()]; ok {
return child.findLongestPrefixEntry(name)
}
Expand All @@ -312,26 +318,19 @@ func (p *pitCsTreeNode) fillTreeToPrefix(name *ndn.Name) *pitCsTreeNode {
newNode.parent = curNode
newNode.children = make(map[string]*pitCsTreeNode)

curNode.childrenMutex.Lock()
curNode.children[newNode.component.String()] = newNode
curNode.childrenMutex.Unlock()

curNode = newNode
}
return curNode
}

func (p *pitCsTreeNode) getChildrenCount() int {
p.childrenMutex.RLock()
defer p.childrenMutex.RUnlock()
return len(p.children)
}

func (p *pitCsTreeNode) pruneIfEmpty() {
for curNode := p; curNode.parent != nil && curNode.getChildrenCount() == 0 && len(curNode.pitEntries) == 0 && curNode.csEntry == nil; curNode = curNode.parent {
curNode.parent.childrenMutex.Lock()
delete(curNode.parent.children, curNode.component.String())
curNode.parent.childrenMutex.Unlock()
}
}

Expand Down
38 changes: 19 additions & 19 deletions table/pit-cs-tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestNewPitCSTree(t *testing.T) {
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})

// Initialization size should be 0
assert.Equal(t, pitCS.PitSize(), 0)
Expand Down Expand Up @@ -49,23 +49,23 @@ func TestIsCsAdmitting(t *testing.T) {
csAdmit = false
csReplacementPolicy = "lru"

pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit)

csAdmit = true
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit)
}

func TestIsCsServing(t *testing.T) {
csServe = false
csReplacementPolicy = "lru"

pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsServing(), csServe)

csServe = true
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsServing(), csServe)
}

Expand All @@ -78,7 +78,7 @@ func TestInsertInterest(t *testing.T) {

csReplacementPolicy = "lru"

pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})

pitEntry, duplicateNonce := pitCS.InsertInterest(interest, hint, inFace)

Expand Down Expand Up @@ -161,7 +161,7 @@ func TestInsertInterest(t *testing.T) {
assert.Equal(t, pitCS.PitSize(), 2)

// PitCS with 2 interests, prefixes of each other.
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})

hint, _ = ndn.NameFromString("/")
inFace = uint64(4444)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestInsertInterest(t *testing.T) {

func TestRemoveInterest(t *testing.T) {
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
hint, _ := ndn.NameFromString("/")
inFace := uint64(1111)
name1, _ := ndn.NameFromString("/interest1")
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestRemoveInterest(t *testing.T) {
assert.Equal(t, pitCS.PitSize(), 3)

// Remove PIT entry from a node with more than 1 child
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})
name1, _ = ndn.NameFromString("/root/1")
name2, _ = ndn.NameFromString("/root/2")
name3, _ := ndn.NameFromString("/root/3")
Expand All @@ -261,7 +261,7 @@ func TestRemoveInterest(t *testing.T) {

func TestFindInterestExactMatch(t *testing.T) {
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
hint, _ := ndn.NameFromString("/")
inFace := uint64(1111)
name, _ := ndn.NameFromString("/interest1")
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestFindInterestExactMatch(t *testing.T) {
func TestFindInterestPrefixMatchByData(t *testing.T) {
// Basically the same as FindInterestPrefixMatch, but with data instead
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
name, _ := ndn.NameFromString("/interest1")
data := ndn.NewData(name, []byte("abc"))
hint, _ := ndn.NameFromString("/")
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestFindInterestPrefixMatchByData(t *testing.T) {

func TestInsertOutRecord(t *testing.T) {
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
name, _ := ndn.NameFromString("/interest1")
hint, _ := ndn.NameFromString("/")
inFace := uint64(1111)
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestInsertOutRecord(t *testing.T) {

func TestGetOutRecords(t *testing.T) {
csReplacementPolicy = "lru"
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})
name, _ := ndn.NameFromString("/interest1")
hint, _ := ndn.NameFromString("/")
inFace := uint64(1111)
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestGetOutRecords(t *testing.T) {
func TestInsertData(t *testing.T) {
csReplacementPolicy = "lru"
csCapacity = 1024
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})

// Data does not already exist
name1, _ := ndn.NameFromString("/interest1")
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestInsertData(t *testing.T) {
// PitCS with interest /a/b, data /a/b/v=10
// Interest /a/b is prefix allowed
// Should return data with name /a/b/v=10
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})

name1, _ = ndn.NameFromString("/a/b")
interest1 = ndn.NewInterest(name1)
Expand All @@ -480,7 +480,7 @@ func TestInsertData(t *testing.T) {

// Reduced CS capacity to check that eviction occurs
csCapacity = 1
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})
data1 = ndn.NewData(name1, []byte("data1"))
data2 = ndn.NewData(name2, []byte("data2"))
pitCS.InsertData(data1)
Expand All @@ -491,7 +491,7 @@ func TestInsertData(t *testing.T) {
func FindMatchingDataFromCS(t *testing.T) {
csReplacementPolicy = "lru"
csCapacity = 1024
pitCS := NewPitCS()
pitCS := NewPitCS(func(PitEntry) {})

// Insert data and then fetch it
name1, _ := ndn.NameFromString("/interest1")
Expand Down Expand Up @@ -522,7 +522,7 @@ func FindMatchingDataFromCS(t *testing.T) {
// PitCS with interest /a/b, data /a/b/v=10
// Interest /a/b is prefix allowed
// Should return data with name /a/b/v=10
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})

name1, _ = ndn.NameFromString("/a/b")
interest1 = ndn.NewInterest(name1)
Expand All @@ -537,7 +537,7 @@ func FindMatchingDataFromCS(t *testing.T) {
// PitCS with interest /a/b
// Now look for interest /a/b with prefix allowed
// Should return nil since there is no data
pitCS = NewPitCS()
pitCS = NewPitCS(func(PitEntry) {})

name1, _ = ndn.NameFromString("/a/b")
interest1 = ndn.NewInterest(name1)
Expand Down
18 changes: 8 additions & 10 deletions table/pit-cs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

// PitCsTable dictates what functionality a Pit-Cs table should implement
// Warning: All functions must be called in the same forwarding goroutine as the creation of the table.
type PitCsTable interface {
InsertInterest(interest *ndn.Interest, hint *ndn.Name, inFace uint64) (PitEntry, bool)
RemoveInterest(pitEntry PitEntry) bool
Expand All @@ -30,13 +31,16 @@ type PitCsTable interface {
eraseCsDataFromReplacementStrategy(index uint64)
updatePitExpiry(pitEntry PitEntry)

ExpiringPitEntries() chan PitEntry
// UpdateTimer returns the channel used to signal regular Update() calls in the forwarding thread.
// <- UpdateTimer() and Update() must be called in pairs.
UpdateTimer() <-chan struct{}
// Update() does whatever the PIT table needs to do regularly.
// It may schedule the next UpdateTimer().
Update()
}

// basePitCsTable contains properties common to all PIT-CS tables
type basePitCsTable struct {
expiringPitEntries chan PitEntry
}
type basePitCsTable struct{}

// PitEntry dictates what entries in a PIT-CS table should implement
type PitEntry interface {
Expand Down Expand Up @@ -230,9 +234,3 @@ func (bce *baseCsEntry) StaleTime() time.Time {
func (bce *baseCsEntry) Data() *ndn.Data {
return bce.data
}

// ExpiringPitEntries returns the channel to which PIT entries that are about
// to expire are sent and received.
func (p *basePitCsTable) ExpiringPitEntries() chan PitEntry {
return p.expiringPitEntries
}