Skip to content

Commit

Permalink
arc wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 3, 2025
1 parent 7938a4b commit 7139716
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 19 deletions.
6 changes: 3 additions & 3 deletions fw/face/multicast-udp-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (t *MulticastUDPTransport) runReceive() {
defer t.Close()

for t.running.Load() {
err := ndn_io.ReadTlvStream(t.recvConn, func(b []byte) bool {
t.nInBytes += uint64(len(b))
t.linkService.handleIncomingFrame(b)
err := ndn_io.ReadTlvStream(t.recvConn, func(b ndn_io.BufT) bool {
t.nInBytes += uint64(len(b.Buf))
t.linkService.handleIncomingFrame(b.Buf)
return true
}, func(err error) bool {
// Same as unicast UDP transport
Expand Down
6 changes: 3 additions & 3 deletions fw/face/unicast-tcp-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ func (t *UnicastTCPTransport) runReceive() {
// The connection can be nil if the initial connection attempt
// failed for a persistent face. In that case we will reconnect.
if t.conn != nil {
err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool {
t.nInBytes += uint64(len(b))
err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool {
t.nInBytes += uint64(len(b.Buf))
*t.expirationTime = time.Now().Add(CfgTCPLifetime())
t.linkService.handleIncomingFrame(b)
t.linkService.handleIncomingFrame(b.Buf)
return true
}, nil)
if err == nil && t.Persistency() != spec_mgmt.PersistencyPermanent {
Expand Down
6 changes: 3 additions & 3 deletions fw/face/unicast-udp-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func (t *UnicastUDPTransport) sendFrame(frame []byte) {
func (t *UnicastUDPTransport) runReceive() {
defer t.Close()

err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool {
t.nInBytes += uint64(len(b))
err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool {
t.nInBytes += uint64(len(b.Buf))
*t.expirationTime = time.Now().Add(CfgUDPLifetime())
t.linkService.handleIncomingFrame(b)
t.linkService.handleIncomingFrame(b.Buf)
return true
}, func(err error) bool {
// Ignore since UDP is a connectionless protocol
Expand Down
6 changes: 3 additions & 3 deletions fw/face/unix-stream-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (t *UnixStreamTransport) sendFrame(frame []byte) {
func (t *UnixStreamTransport) runReceive() {
defer t.Close()

err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool {
t.nInBytes += uint64(len(b))
t.linkService.handleIncomingFrame(b)
err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool {
t.nInBytes += uint64(len(b.Buf))
t.linkService.handleIncomingFrame(b.Buf)
return true
}, nil)
if err != nil && t.running.Load() {
Expand Down
4 changes: 2 additions & 2 deletions std/engine/face/stream_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (f *StreamFace) Send(pkt enc.Wire) error {
func (f *StreamFace) receive() {
defer f.setStateDown()

err := ndn_io.ReadTlvStream(f.conn, func(b []byte) bool {
if err := f.onPkt(b); err != nil {
err := ndn_io.ReadTlvStream(f.conn, func(b ndn_io.BufT) bool {
if err := f.onPkt(b.Buf); err != nil {
f.Close() // engine error
return false // break
}
Expand Down
5 changes: 4 additions & 1 deletion std/types/arc/arc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Arc is an atomically reference counted generic type.
package arc

import "sync/atomic"
import (
"sync/atomic"
)

// Arc is an atomically reference counted generic type.
// It is specifically designed to be used in a pool.
Expand Down Expand Up @@ -30,6 +32,7 @@ func (a *Arc[T]) Inc() {
func (a *Arc[T]) Dec() int32 {
c := a.c.Add(-1)
if c == 0 && a.p != nil {
// fmt.Println("Arc: returning to pool")
a.p.Put(a)
}
return c
Expand Down
35 changes: 35 additions & 0 deletions std/utils/io/stream_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io

import (
"bytes"
"fmt"

"github.com/named-data/ndnd/std/ndn"
"github.com/named-data/ndnd/std/types/arc"
)

var count = 1

// streamBufferPool is the pool of buffers used for reading streams.
// When passed an Arc, downstreams must either increment it or copy the buffer.
var streamBufferPool = arc.NewArcPool(
func() *bytes.Buffer {
count++
if count%100 == 0 {
fmt.Println("StreamBufferPool: creating new buffer", count)
}
return &bytes.Buffer{}
},
func(buf *bytes.Buffer) {
buf.Reset()
buf.Grow(8 * ndn.MaxNDNPacketSize)
})

// streamBuffer returns a buffer from the pool.
func streamBuffer() (*arc.Arc[*bytes.Buffer], []byte) {
bufArc := streamBufferPool.Get()
bufArc.Inc()
recvBuf := bufArc.Load().AvailableBuffer()
recvBuf = recvBuf[:cap(recvBuf)]
return bufArc, recvBuf
}
28 changes: 24 additions & 4 deletions std/utils/io/stream_read.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,48 @@
package io

import (
"bytes"
"errors"
"io"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/ndn"
"github.com/named-data/ndnd/std/types/arc"
)

// BufT is a temporary buffer with an associated Arc.
// A receiver must either manage lifetime of the buffer or copy it.
type BufT struct {
Buf []byte
*arc.Arc[*bytes.Buffer]
}

// ReadTlvStream reads a stream of TLV-encoded packets from the given reader.
func ReadTlvStream(
reader io.Reader,
onFrame func([]byte) bool,
onFrame func(BufT) bool,
ignoreError func(error) bool,
) error {
recvBuf := make([]byte, ndn.MaxNDNPacketSize*8)
bufArc, recvBuf := streamBuffer()
defer func() { bufArc.Dec() }()

recvOff := 0
tlvOff := 0

for {
// If less than one packet space remains in buffer, shift to beginning
if len(recvBuf)-recvOff < ndn.MaxNDNPacketSize {
copy(recvBuf, recvBuf[tlvOff:recvOff])
// Get a new buffer
oldBufArc, oldRecvBuf := bufArc, recvBuf
bufArc, recvBuf = streamBuffer()

// Copy unparsed data to new buffer
copy(recvBuf, oldRecvBuf[tlvOff:recvOff])
recvOff -= tlvOff
tlvOff = 0

// Release old buffer
oldBufArc.Dec()
}

// Read multiple packets at once
Expand Down Expand Up @@ -58,7 +78,7 @@ func ReadTlvStream(

if recvOff-tlvOff >= tlvSize {
// Packet was successfully received, send up to link service
shouldContinue := onFrame(recvBuf[tlvOff : tlvOff+tlvSize])
shouldContinue := onFrame(BufT{recvBuf[tlvOff : tlvOff+tlvSize], bufArc})
if !shouldContinue {
return nil
}
Expand Down

0 comments on commit 7139716

Please sign in to comment.