Skip to content

Commit 46f5343

Browse files
committed
introducing Map.Drain API to traverse a map while also deleting entries
This commit introduces the `Map.Drain` API to traverse the map while also removing its entries. It leverages the same `MapIterator` structure, with the introduction of a new unexported method to handle the map draining. The tests make sure that the behavior is as expected, and that this API returns an error while invoked on the wrong map, such as arrays, for which `Map.Iterate` should be used instead. The `LookupAndDelete` system call support has been introduced in: 1. 5.14 for BPF_MAP_TYPE_HASH, BPF_MAP_TYPE_PERCPU_HASH, BPF_MAP_TYPE_LRU_HASH and BPF_MAP_TYPE_LRU_PERCPU_HASH. 2. 4.20 for BPF_MAP_TYPE_QUEUE, BPF_MAP_TYPE_STACK Do not expect the `Map.Drain` API to work on prior versions, according to the target map type. From the user perspective, the usage should be similar to `Map.Iterate`, as shown as follows: ```go m, err := NewMap(&MapSpec{ Type: Hash, KeySize: 4, ValueSize: 8, MaxEntries: 10, }) // populate here the map and defer close it := m.Drain() for it.Next(keyPtr, &value) { // here the entry doesn't exist anymore in the underlying map. ... } ``` Signed-off-by: Simone Magnani <[email protected]>
1 parent 9895aae commit 46f5343

File tree

2 files changed

+245
-21
lines changed

2 files changed

+245
-21
lines changed

map.go

Lines changed: 112 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,10 +1344,31 @@ func batchCount(keys, values any) (int, error) {
13441344
//
13451345
// It's not possible to guarantee that all keys in a map will be
13461346
// returned if there are concurrent modifications to the map.
1347+
//
1348+
// Iterating a hash map from which keys are being deleted is not
1349+
// safe. You may see the same key multiple times. Iteration may
1350+
// also abort with an error, see IsIterationAborted.
1351+
//
1352+
// Iterating a queue/stack map returns an error (NextKey invalid
1353+
// argument): [Map.Drain] API should be used instead.
13471354
func (m *Map) Iterate() *MapIterator {
13481355
return newMapIterator(m)
13491356
}
13501357

1358+
// Drain traverses a map while also removing entries.
1359+
//
1360+
// It's safe to create multiple drainers at the same time,
1361+
// but their respective outputs will differ.
1362+
//
1363+
// Draining a map that does not support entry removal such as
1364+
// an array return an error (LookupAndDelete not supported):
1365+
// [Map.Iterate] API should be used instead.
1366+
func (m *Map) Drain() *MapIterator {
1367+
it := newMapIterator(m)
1368+
it.drain = true
1369+
return it
1370+
}
1371+
13511372
// Close the Map's underlying file descriptor, which could unload the
13521373
// Map from the kernel if it is not pinned or in use by a loaded Program.
13531374
func (m *Map) Close() error {
@@ -1602,6 +1623,12 @@ func marshalMap(m *Map, length int) ([]byte, error) {
16021623
return buf, nil
16031624
}
16041625

1626+
// isKeyValueMap returns true if map supports key-value pairs (ex. hash)
1627+
// and false in case of value-only maps (ex. queue).
1628+
func isKeyValueMap(m *Map) bool {
1629+
return m.keySize != 0
1630+
}
1631+
16051632
// MapIterator iterates a Map.
16061633
//
16071634
// See Map.Iterate.
@@ -1611,7 +1638,7 @@ type MapIterator struct {
16111638
// of []byte to avoid allocations.
16121639
cursor any
16131640
count, maxEntries uint32
1614-
done bool
1641+
done, drain bool
16151642
err error
16161643
}
16171644

@@ -1622,12 +1649,56 @@ func newMapIterator(target *Map) *MapIterator {
16221649
}
16231650
}
16241651

1652+
// cursorToKeyOut copies the current value held in the cursor to the
1653+
// provided argument. In case of errors, returns false and sets a
1654+
// non-nil error in the MapIterator.
1655+
func (mi *MapIterator) cursorToKeyOut(keyOut interface{}) bool {
1656+
buf := mi.cursor.([]byte)
1657+
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1658+
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1659+
} else {
1660+
mi.err = sysenc.Unmarshal(keyOut, buf)
1661+
}
1662+
return mi.err == nil
1663+
}
1664+
1665+
// fetchNextKey loads into the cursor the key following the provided one.
1666+
func (mi *MapIterator) fetchNextKey(key interface{}) bool {
1667+
mi.err = mi.target.NextKey(key, mi.cursor)
1668+
if mi.err == nil {
1669+
return true
1670+
}
1671+
1672+
if errors.Is(mi.err, ErrKeyNotExist) {
1673+
mi.done = true
1674+
mi.err = nil
1675+
} else {
1676+
mi.err = fmt.Errorf("get next key: %w", mi.err)
1677+
}
1678+
1679+
return false
1680+
}
1681+
1682+
// drainMapEntry removes and returns the key held in the cursor
1683+
// from the underlying map.
1684+
func (mi *MapIterator) drainMapEntry(valueOut interface{}) bool {
1685+
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
1686+
if mi.err == nil {
1687+
mi.count++
1688+
return true
1689+
}
1690+
1691+
if errors.Is(mi.err, ErrKeyNotExist) {
1692+
mi.err = nil
1693+
} else {
1694+
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
1695+
}
1696+
1697+
return false
1698+
}
1699+
16251700
// Next decodes the next key and value.
16261701
//
1627-
// Iterating a hash map from which keys are being deleted is not
1628-
// safe. You may see the same key multiple times. Iteration may
1629-
// also abort with an error, see IsIterationAborted.
1630-
//
16311702
// Returns false if there are no more entries. You must check
16321703
// the result of Err afterwards.
16331704
//
@@ -1636,6 +1707,38 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
16361707
if mi.err != nil || mi.done {
16371708
return false
16381709
}
1710+
if mi.drain {
1711+
return mi.nextDrain(keyOut, valueOut)
1712+
}
1713+
return mi.nextIterate(keyOut, valueOut)
1714+
}
1715+
1716+
func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
1717+
// Handle value-only maps (ex. queue).
1718+
if !isKeyValueMap(mi.target) {
1719+
if keyOut != nil {
1720+
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
1721+
return false
1722+
}
1723+
return mi.drainMapEntry(valueOut)
1724+
}
1725+
1726+
if mi.cursor == nil {
1727+
mi.cursor = make([]byte, mi.target.keySize)
1728+
}
1729+
1730+
// Always retrieve first key in the map. This should ensure that the whole map
1731+
// is traversed, despite concurrent operations (ordering of items might differ).
1732+
for mi.err == nil && mi.fetchNextKey(nil) {
1733+
if mi.drainMapEntry(valueOut) {
1734+
return mi.cursorToKeyOut(keyOut)
1735+
}
1736+
}
1737+
return false
1738+
}
1739+
1740+
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
1741+
var key interface{}
16391742

16401743
// For array-like maps NextKey returns nil only after maxEntries
16411744
// iterations.
@@ -1645,17 +1748,12 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
16451748
// is returned. If we pass an uninitialized []byte instead, it'll see a
16461749
// non-nil interface and try to marshal it.
16471750
mi.cursor = make([]byte, mi.target.keySize)
1648-
mi.err = mi.target.NextKey(nil, mi.cursor)
1751+
key = nil
16491752
} else {
1650-
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
1753+
key = mi.cursor
16511754
}
16521755

1653-
if errors.Is(mi.err, ErrKeyNotExist) {
1654-
mi.done = true
1655-
mi.err = nil
1656-
return false
1657-
} else if mi.err != nil {
1658-
mi.err = fmt.Errorf("get next key: %w", mi.err)
1756+
if !mi.fetchNextKey(key) {
16591757
return false
16601758
}
16611759

@@ -1677,14 +1775,7 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
16771775
return false
16781776
}
16791777

1680-
buf := mi.cursor.([]byte)
1681-
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1682-
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1683-
} else {
1684-
mi.err = sysenc.Unmarshal(keyOut, buf)
1685-
}
1686-
1687-
return mi.err == nil
1778+
return mi.cursorToKeyOut(keyOut)
16881779
}
16891780

16901781
mi.err = fmt.Errorf("%w", ErrIterationAborted)

map_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,139 @@ func TestMapIteratorAllocations(t *testing.T) {
11741174
qt.Assert(t, qt.Equals(allocs, float64(0)))
11751175
}
11761176

1177+
func TestMapDrain(t *testing.T) {
1178+
for _, mapType := range []MapType{
1179+
Hash,
1180+
Queue,
1181+
} {
1182+
t.Run(mapType.String(), func(t *testing.T) {
1183+
var (
1184+
keySize, value uint32
1185+
keyPtr interface{}
1186+
values = []uint32{}
1187+
data = []uint32{0, 1}
1188+
)
1189+
1190+
if mapType == Queue {
1191+
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
1192+
keyPtr = nil
1193+
keySize = 0
1194+
}
1195+
1196+
if mapType == Hash {
1197+
testutils.SkipOnOldKernel(t, "5.14", "map type hash")
1198+
keyPtr = new(uint32)
1199+
keySize = 4
1200+
}
1201+
1202+
m, err := NewMap(&MapSpec{
1203+
Type: mapType,
1204+
KeySize: keySize,
1205+
ValueSize: 4,
1206+
MaxEntries: 2,
1207+
})
1208+
qt.Assert(t, qt.IsNil(err))
1209+
defer m.Close()
1210+
1211+
// Assert drain empty map.
1212+
entries := m.Drain()
1213+
qt.Assert(t, qt.IsFalse(entries.Next(keyPtr, &value)))
1214+
qt.Assert(t, qt.IsNil(entries.Err()))
1215+
1216+
for _, v := range data {
1217+
if keySize == 0 {
1218+
err = m.Put(nil, uint32(v))
1219+
} else {
1220+
err = m.Put(uint32(v), uint32(v))
1221+
}
1222+
qt.Assert(t, qt.IsNil(err))
1223+
}
1224+
1225+
entries = m.Drain()
1226+
for entries.Next(keyPtr, &value) {
1227+
values = append(values, value)
1228+
}
1229+
qt.Assert(t, qt.IsNil(entries.Err()))
1230+
1231+
sort.Slice(values, func(i, j int) bool { return values[i] < values[j] })
1232+
qt.Assert(t, qt.DeepEquals(values, data))
1233+
})
1234+
}
1235+
}
1236+
1237+
func TestDrainWrongMap(t *testing.T) {
1238+
arr, err := NewMap(&MapSpec{
1239+
Type: Array,
1240+
KeySize: 4,
1241+
ValueSize: 4,
1242+
MaxEntries: 10,
1243+
})
1244+
qt.Assert(t, qt.IsNil(err))
1245+
defer arr.Close()
1246+
1247+
var key, value uint32
1248+
entries := arr.Drain()
1249+
1250+
qt.Assert(t, qt.IsFalse(entries.Next(&key, &value)))
1251+
qt.Assert(t, qt.IsNotNil(entries.Err()))
1252+
fmt.Println(entries.Err())
1253+
}
1254+
1255+
func TestMapDrainerAllocations(t *testing.T) {
1256+
for _, mapType := range []MapType{
1257+
Hash,
1258+
Queue,
1259+
} {
1260+
t.Run(mapType.String(), func(t *testing.T) {
1261+
var (
1262+
keySize, value uint32
1263+
keyPtr interface{}
1264+
)
1265+
1266+
if mapType == Queue {
1267+
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
1268+
keyPtr = nil
1269+
keySize = 0
1270+
}
1271+
1272+
if mapType == Hash {
1273+
testutils.SkipOnOldKernel(t, "5.14", "map type hash")
1274+
keyPtr = new(uint32)
1275+
keySize = 4
1276+
}
1277+
1278+
m, err := NewMap(&MapSpec{
1279+
Type: mapType,
1280+
KeySize: keySize,
1281+
ValueSize: 4,
1282+
MaxEntries: 10,
1283+
})
1284+
qt.Assert(t, qt.ErrorIs(err, nil))
1285+
defer m.Close()
1286+
1287+
for i := 0; i < int(m.MaxEntries()); i++ {
1288+
if keySize == 0 {
1289+
err = m.Put(nil, uint32(i))
1290+
} else {
1291+
err = m.Put(uint32(i), uint32(i))
1292+
}
1293+
if err != nil {
1294+
t.Fatal(err)
1295+
}
1296+
}
1297+
1298+
iter := m.Drain()
1299+
allocs := testing.AllocsPerRun(int(m.MaxEntries()-1), func() {
1300+
if !iter.Next(keyPtr, &value) {
1301+
t.Fatal("Next failed while draining: %w", iter.Err())
1302+
}
1303+
})
1304+
1305+
qt.Assert(t, qt.Equals(allocs, float64(0)))
1306+
})
1307+
}
1308+
}
1309+
11771310
func TestMapBatchLookupAllocations(t *testing.T) {
11781311
testutils.SkipIfNotSupported(t, haveBatchAPI())
11791312

0 commit comments

Comments
 (0)