From 7139716fbb459db3b537f3f891d036bc39bdf74d Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Mon, 3 Feb 2025 20:33:25 +0000 Subject: [PATCH] arc wip --- fw/face/multicast-udp-transport.go | 6 ++--- fw/face/unicast-tcp-transport.go | 6 ++--- fw/face/unicast-udp-transport.go | 6 ++--- fw/face/unix-stream-transport.go | 6 ++--- std/engine/face/stream_face.go | 4 ++-- std/types/arc/arc.go | 5 ++++- std/utils/io/stream_pool.go | 35 ++++++++++++++++++++++++++++++ std/utils/io/stream_read.go | 28 ++++++++++++++++++++---- 8 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 std/utils/io/stream_pool.go diff --git a/fw/face/multicast-udp-transport.go b/fw/face/multicast-udp-transport.go index 17290472..47a644df 100644 --- a/fw/face/multicast-udp-transport.go +++ b/fw/face/multicast-udp-transport.go @@ -161,9 +161,9 @@ func (t *MulticastUDPTransport) runReceive() { defer t.Close() for t.running.Load() { - err := ndn_io.ReadTlvStream(t.recvConn, func(b []byte) bool { - t.nInBytes += uint64(len(b)) - t.linkService.handleIncomingFrame(b) + err := ndn_io.ReadTlvStream(t.recvConn, func(b ndn_io.BufT) bool { + t.nInBytes += uint64(len(b.Buf)) + t.linkService.handleIncomingFrame(b.Buf) return true }, func(err error) bool { // Same as unicast UDP transport diff --git a/fw/face/unicast-tcp-transport.go b/fw/face/unicast-tcp-transport.go index fe712ba4..548bd725 100644 --- a/fw/face/unicast-tcp-transport.go +++ b/fw/face/unicast-tcp-transport.go @@ -238,10 +238,10 @@ func (t *UnicastTCPTransport) runReceive() { // The connection can be nil if the initial connection attempt // failed for a persistent face. In that case we will reconnect. if t.conn != nil { - err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool { - t.nInBytes += uint64(len(b)) + err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool { + t.nInBytes += uint64(len(b.Buf)) *t.expirationTime = time.Now().Add(CfgTCPLifetime()) - t.linkService.handleIncomingFrame(b) + t.linkService.handleIncomingFrame(b.Buf) return true }, nil) if err == nil && t.Persistency() != spec_mgmt.PersistencyPermanent { diff --git a/fw/face/unicast-udp-transport.go b/fw/face/unicast-udp-transport.go index 66830b03..730d8c26 100644 --- a/fw/face/unicast-udp-transport.go +++ b/fw/face/unicast-udp-transport.go @@ -137,10 +137,10 @@ func (t *UnicastUDPTransport) sendFrame(frame []byte) { func (t *UnicastUDPTransport) runReceive() { defer t.Close() - err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool { - t.nInBytes += uint64(len(b)) + err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool { + t.nInBytes += uint64(len(b.Buf)) *t.expirationTime = time.Now().Add(CfgUDPLifetime()) - t.linkService.handleIncomingFrame(b) + t.linkService.handleIncomingFrame(b.Buf) return true }, func(err error) bool { // Ignore since UDP is a connectionless protocol diff --git a/fw/face/unix-stream-transport.go b/fw/face/unix-stream-transport.go index 5a8dfac5..398f899e 100644 --- a/fw/face/unix-stream-transport.go +++ b/fw/face/unix-stream-transport.go @@ -91,9 +91,9 @@ func (t *UnixStreamTransport) sendFrame(frame []byte) { func (t *UnixStreamTransport) runReceive() { defer t.Close() - err := ndn_io.ReadTlvStream(t.conn, func(b []byte) bool { - t.nInBytes += uint64(len(b)) - t.linkService.handleIncomingFrame(b) + err := ndn_io.ReadTlvStream(t.conn, func(b ndn_io.BufT) bool { + t.nInBytes += uint64(len(b.Buf)) + t.linkService.handleIncomingFrame(b.Buf) return true }, nil) if err != nil && t.running.Load() { diff --git a/std/engine/face/stream_face.go b/std/engine/face/stream_face.go index 83cffb47..283e6ac5 100644 --- a/std/engine/face/stream_face.go +++ b/std/engine/face/stream_face.go @@ -90,8 +90,8 @@ func (f *StreamFace) Send(pkt enc.Wire) error { func (f *StreamFace) receive() { defer f.setStateDown() - err := ndn_io.ReadTlvStream(f.conn, func(b []byte) bool { - if err := f.onPkt(b); err != nil { + err := ndn_io.ReadTlvStream(f.conn, func(b ndn_io.BufT) bool { + if err := f.onPkt(b.Buf); err != nil { f.Close() // engine error return false // break } diff --git a/std/types/arc/arc.go b/std/types/arc/arc.go index 3f274ad9..446558b9 100644 --- a/std/types/arc/arc.go +++ b/std/types/arc/arc.go @@ -1,7 +1,9 @@ // Arc is an atomically reference counted generic type. package arc -import "sync/atomic" +import ( + "sync/atomic" +) // Arc is an atomically reference counted generic type. // It is specifically designed to be used in a pool. @@ -30,6 +32,7 @@ func (a *Arc[T]) Inc() { func (a *Arc[T]) Dec() int32 { c := a.c.Add(-1) if c == 0 && a.p != nil { + // fmt.Println("Arc: returning to pool") a.p.Put(a) } return c diff --git a/std/utils/io/stream_pool.go b/std/utils/io/stream_pool.go new file mode 100644 index 00000000..1b1a3096 --- /dev/null +++ b/std/utils/io/stream_pool.go @@ -0,0 +1,35 @@ +package io + +import ( + "bytes" + "fmt" + + "github.com/named-data/ndnd/std/ndn" + "github.com/named-data/ndnd/std/types/arc" +) + +var count = 1 + +// streamBufferPool is the pool of buffers used for reading streams. +// When passed an Arc, downstreams must either increment it or copy the buffer. +var streamBufferPool = arc.NewArcPool( + func() *bytes.Buffer { + count++ + if count%100 == 0 { + fmt.Println("StreamBufferPool: creating new buffer", count) + } + return &bytes.Buffer{} + }, + func(buf *bytes.Buffer) { + buf.Reset() + buf.Grow(8 * ndn.MaxNDNPacketSize) + }) + +// streamBuffer returns a buffer from the pool. +func streamBuffer() (*arc.Arc[*bytes.Buffer], []byte) { + bufArc := streamBufferPool.Get() + bufArc.Inc() + recvBuf := bufArc.Load().AvailableBuffer() + recvBuf = recvBuf[:cap(recvBuf)] + return bufArc, recvBuf +} diff --git a/std/utils/io/stream_read.go b/std/utils/io/stream_read.go index 9ae6c2b2..b68462d7 100644 --- a/std/utils/io/stream_read.go +++ b/std/utils/io/stream_read.go @@ -1,28 +1,48 @@ package io import ( + "bytes" "errors" "io" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/ndn" + "github.com/named-data/ndnd/std/types/arc" ) +// BufT is a temporary buffer with an associated Arc. +// A receiver must either manage lifetime of the buffer or copy it. +type BufT struct { + Buf []byte + *arc.Arc[*bytes.Buffer] +} + +// ReadTlvStream reads a stream of TLV-encoded packets from the given reader. func ReadTlvStream( reader io.Reader, - onFrame func([]byte) bool, + onFrame func(BufT) bool, ignoreError func(error) bool, ) error { - recvBuf := make([]byte, ndn.MaxNDNPacketSize*8) + bufArc, recvBuf := streamBuffer() + defer func() { bufArc.Dec() }() + recvOff := 0 tlvOff := 0 for { // If less than one packet space remains in buffer, shift to beginning if len(recvBuf)-recvOff < ndn.MaxNDNPacketSize { - copy(recvBuf, recvBuf[tlvOff:recvOff]) + // Get a new buffer + oldBufArc, oldRecvBuf := bufArc, recvBuf + bufArc, recvBuf = streamBuffer() + + // Copy unparsed data to new buffer + copy(recvBuf, oldRecvBuf[tlvOff:recvOff]) recvOff -= tlvOff tlvOff = 0 + + // Release old buffer + oldBufArc.Dec() } // Read multiple packets at once @@ -58,7 +78,7 @@ func ReadTlvStream( if recvOff-tlvOff >= tlvSize { // Packet was successfully received, send up to link service - shouldContinue := onFrame(recvBuf[tlvOff : tlvOff+tlvSize]) + shouldContinue := onFrame(BufT{recvBuf[tlvOff : tlvOff+tlvSize], bufArc}) if !shouldContinue { return nil }