Skip to content

Commit 3d9f4e0

Browse files
committed
introducing Map.Drain API to traverse a map while also deleting entries
This commit introduces the `Map.Drain` API to traverse the map while also removing its entries. It leverages the same `MapIterator` structure, with the introduction of a new unexported method to handle the map draining. The tests make sure that the behavior is as expected, and that this API returns an error while invoked on the wrong map, such as arrays, for which `Map.Iterate` should be used instead. To support the adoption of the `LookupAndDelete` system call at different kernel releases (e.g., queues and hash support this operation from different kernel versions), this implementation introduces a fallback method to iterate through the map using the sequential `Lookup` -> `Delete` operations. As for the `MapIterate.Next`, concurrent operations might modify/remove entries, but the method would try to check whether there is more data to lookup in the map and proceed without failing (e.g., key removed in the meantime doesn't necessarily mean that it should fail, instead try with `nextKey`). From the user perspective, the usage should be similar to `Map.Iterate`, as shown as follows: ```go m, err := NewMap(&MapSpec{ Type: Hash, KeySize: 4, ValueSize: 8, MaxEntries: 10, }) // populate here the map and defer close it := m.Drain() for it.Next(keyPtr, &value) { // here the entry doesn't exist anymore in the underlying map. ... } ``` Signed-off-by: Simone Magnani <simone.magnani@isovalent.com>
1 parent 2613a2c commit 3d9f4e0

File tree

2 files changed

+312
-22
lines changed

2 files changed

+312
-22
lines changed

map.go

Lines changed: 157 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,10 +1286,31 @@ func batchCount(keys, values any) (int, error) {
12861286
//
12871287
// It's not possible to guarantee that all keys in a map will be
12881288
// returned if there are concurrent modifications to the map.
1289+
//
1290+
// Iterating a hash map from which keys are being deleted is not
1291+
// safe. You may see the same key multiple times. Iteration may
1292+
// also abort with an error, see IsIterationAborted.
1293+
//
1294+
// Iterating a queue/stack map returns an error (NextKey) as the
1295+
// Map.Drain API should be used instead.
12891296
func (m *Map) Iterate() *MapIterator {
12901297
return newMapIterator(m)
12911298
}
12921299

1300+
// Drain traverses a map while also removing entries.
1301+
//
1302+
// It's safe to create multiple drainers at the same time,
1303+
// but their respective outputs will differ.
1304+
//
1305+
// Iterating a map that does not support entry removal such as
1306+
// an array return an error (Delete/LookupAndDelete) as the
1307+
// Map.Iterate API should be used instead.
1308+
func (m *Map) Drain() *MapIterator {
1309+
it := newMapIterator(m)
1310+
it.drain = true
1311+
return it
1312+
}
1313+
12931314
// Close the Map's underlying file descriptor, which could unload the
12941315
// Map from the kernel if it is not pinned or in use by a loaded Program.
12951316
func (m *Map) Close() error {
@@ -1548,8 +1569,10 @@ type MapIterator struct {
15481569
// of []byte to avoid allocations.
15491570
cursor any
15501571
count, maxEntries uint32
1551-
done bool
1552-
err error
1572+
done, drain bool
1573+
// Used in Map.Drain when LookupAndDelete is not supported.
1574+
fallback bool
1575+
err error
15531576
}
15541577

15551578
func newMapIterator(target *Map) *MapIterator {
@@ -1561,10 +1584,6 @@ func newMapIterator(target *Map) *MapIterator {
15611584

15621585
// Next decodes the next key and value.
15631586
//
1564-
// Iterating a hash map from which keys are being deleted is not
1565-
// safe. You may see the same key multiple times. Iteration may
1566-
// also abort with an error, see IsIterationAborted.
1567-
//
15681587
// Returns false if there are no more entries. You must check
15691588
// the result of Err afterwards.
15701589
//
@@ -1573,26 +1592,28 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
15731592
if mi.err != nil || mi.done {
15741593
return false
15751594
}
1595+
if mi.drain {
1596+
return mi.nextDrain(keyOut, valueOut)
1597+
}
1598+
return mi.nextIterate(keyOut, valueOut)
1599+
}
15761600

1577-
// For array-like maps NextKey returns nil only after maxEntries
1578-
// iterations.
1601+
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
1602+
var key interface{}
1603+
1604+
// For array-like maps NextKey returns nil only after maxEntries iterations.
15791605
for mi.count <= mi.maxEntries {
15801606
if mi.cursor == nil {
15811607
// Pass nil interface to NextKey to make sure the Map's first key
15821608
// is returned. If we pass an uninitialized []byte instead, it'll see a
15831609
// non-nil interface and try to marshal it.
15841610
mi.cursor = make([]byte, mi.target.keySize)
1585-
mi.err = mi.target.NextKey(nil, mi.cursor)
1611+
key = nil
15861612
} else {
1587-
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
1613+
key = mi.cursor
15881614
}
15891615

1590-
if errors.Is(mi.err, ErrKeyNotExist) {
1591-
mi.done = true
1592-
mi.err = nil
1593-
return false
1594-
} else if mi.err != nil {
1595-
mi.err = fmt.Errorf("get next key: %w", mi.err)
1616+
if !mi.fetchNextKey(key) {
15961617
return false
15971618
}
15981619

@@ -1614,20 +1635,134 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
16141635
return false
16151636
}
16161637

1617-
buf := mi.cursor.([]byte)
1618-
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1619-
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1638+
return mi.copyCursorToKeyOut(keyOut)
1639+
}
1640+
1641+
mi.err = fmt.Errorf("%w", ErrIterationAborted)
1642+
return false
1643+
}
1644+
1645+
func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
1646+
if mi.isKeylessMap() {
1647+
return mi.handleDrainKeylessMap(keyOut, valueOut)
1648+
}
1649+
1650+
// Allocate only once data for retrieving the next key in the map.
1651+
if mi.cursor == nil {
1652+
mi.cursor = make([]byte, mi.target.keySize)
1653+
}
1654+
1655+
// Always retrieve first key in the map. This should ensure that the whole map
1656+
// is traversed, despite concurrent operations (ordering of items might differ).
1657+
for mi.err == nil && mi.fetchNextKey(nil) {
1658+
if mi.tryLookupAndDelete(valueOut) {
1659+
return mi.copyCursorToKeyOut(keyOut)
1660+
}
1661+
}
1662+
return false
1663+
}
1664+
1665+
func (mi *MapIterator) tryLookupAndDelete(valueOut interface{}) bool {
1666+
// Default try using the updated `Map.LookupAndDelete` API.
1667+
if !mi.fallback {
1668+
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
1669+
if mi.err == nil {
1670+
mi.count++
1671+
return true
1672+
}
1673+
1674+
switch {
1675+
case errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL):
1676+
mi.fallback = true
1677+
case errors.Is(mi.err, ErrKeyNotExist):
1678+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
1679+
mi.err = nil
1680+
return false
1681+
default:
1682+
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
1683+
return false
1684+
}
1685+
}
1686+
1687+
// Fallback to sequential `Map.Lookup` -> `Map.Delete` when `Map.LookupAndDelete` is not supported.
1688+
mi.err = mi.target.Lookup(mi.cursor, valueOut)
1689+
if mi.err != nil {
1690+
if errors.Is(mi.err, ErrKeyNotExist) {
1691+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
1692+
mi.err = nil
16201693
} else {
1621-
mi.err = sysenc.Unmarshal(keyOut, buf)
1694+
mi.err = fmt.Errorf("look up next key: %w", mi.err)
16221695
}
1696+
return false
1697+
}
16231698

1624-
return mi.err == nil
1699+
mi.err = mi.target.Delete(mi.cursor)
1700+
if mi.err != nil {
1701+
if errors.Is(mi.err, ErrKeyNotExist) {
1702+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
1703+
mi.err = nil
1704+
} else {
1705+
mi.err = fmt.Errorf("delete key: %w", mi.err)
1706+
}
1707+
return false
1708+
}
1709+
1710+
mi.count++
1711+
return true
1712+
}
1713+
1714+
func (mi *MapIterator) isKeylessMap() bool {
1715+
return mi.target.keySize == 0
1716+
}
1717+
1718+
func (mi *MapIterator) handleDrainKeylessMap(keyOut, valueOut interface{}) bool {
1719+
if keyOut != nil {
1720+
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
1721+
return false
1722+
}
1723+
1724+
mi.err = mi.target.LookupAndDelete(nil, valueOut)
1725+
if mi.err == nil {
1726+
mi.count++
1727+
return true
1728+
}
1729+
1730+
if errors.Is(mi.err, ErrKeyNotExist) {
1731+
mi.done = true
1732+
mi.err = nil
1733+
} else {
1734+
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
16251735
}
16261736

1627-
mi.err = fmt.Errorf("%w", ErrIterationAborted)
16281737
return false
16291738
}
16301739

1740+
func (mi *MapIterator) fetchNextKey(key interface{}) bool {
1741+
mi.err = mi.target.NextKey(key, mi.cursor)
1742+
if mi.err == nil {
1743+
return true
1744+
}
1745+
1746+
if errors.Is(mi.err, ErrKeyNotExist) {
1747+
mi.done = true
1748+
mi.err = nil
1749+
} else {
1750+
mi.err = fmt.Errorf("get next key: %w", mi.err)
1751+
}
1752+
1753+
return false
1754+
}
1755+
1756+
func (mi *MapIterator) copyCursorToKeyOut(keyOut interface{}) bool {
1757+
buf := mi.cursor.([]byte)
1758+
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1759+
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1760+
} else {
1761+
mi.err = sysenc.Unmarshal(keyOut, buf)
1762+
}
1763+
return mi.err == nil
1764+
}
1765+
16311766
// Err returns any encountered error.
16321767
//
16331768
// The method must be called after Next returns nil.

map_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,161 @@ func TestMapIteratorAllocations(t *testing.T) {
11741174
qt.Assert(t, qt.Equals(allocs, float64(0)))
11751175
}
11761176

1177+
func TestDrainEmptyMap(t *testing.T) {
1178+
for _, mapType := range []MapType{
1179+
Hash,
1180+
Queue,
1181+
} {
1182+
t.Run(mapType.String(), func(t *testing.T) {
1183+
var (
1184+
keySize = uint32(4)
1185+
key string
1186+
value uint64
1187+
keyPtr interface{} = &key
1188+
)
1189+
1190+
if mapType == Queue {
1191+
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
1192+
keySize = 0
1193+
keyPtr = nil
1194+
}
1195+
1196+
m, err := NewMap(&MapSpec{
1197+
Type: mapType,
1198+
KeySize: keySize,
1199+
ValueSize: 8,
1200+
MaxEntries: 2,
1201+
})
1202+
qt.Assert(t, qt.IsNil(err))
1203+
defer m.Close()
1204+
1205+
entries := m.Drain()
1206+
if entries.Next(keyPtr, &value) {
1207+
t.Errorf("Empty %v should not be drainable", mapType)
1208+
}
1209+
1210+
qt.Assert(t, qt.IsNil(entries.Err()))
1211+
})
1212+
}
1213+
}
1214+
1215+
func TestMapDrain(t *testing.T) {
1216+
for _, mapType := range []MapType{
1217+
Hash,
1218+
Queue,
1219+
} {
1220+
t.Run(Hash.String(), func(t *testing.T) {
1221+
var (
1222+
key, value uint32
1223+
values []uint32
1224+
anyKey interface{}
1225+
keyPtr interface{} = &key
1226+
keySize uint32 = 4
1227+
data = []uint32{0, 1}
1228+
)
1229+
1230+
if mapType == Queue {
1231+
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
1232+
keySize = 0
1233+
keyPtr = nil
1234+
}
1235+
1236+
m, err := NewMap(&MapSpec{
1237+
Type: mapType,
1238+
KeySize: keySize,
1239+
ValueSize: 4,
1240+
MaxEntries: 2,
1241+
})
1242+
qt.Assert(t, qt.IsNil(err))
1243+
defer m.Close()
1244+
1245+
for _, v := range data {
1246+
if keySize != 0 {
1247+
anyKey = uint32(v)
1248+
}
1249+
err := m.Put(anyKey, uint32(v))
1250+
qt.Assert(t, qt.IsNil(err))
1251+
}
1252+
1253+
entries := m.Drain()
1254+
for entries.Next(keyPtr, &value) {
1255+
values = append(values, value)
1256+
}
1257+
qt.Assert(t, qt.IsNil(entries.Err()))
1258+
1259+
sort.Slice(values, func(i, j int) bool { return values[i] < values[j] })
1260+
qt.Assert(t, qt.DeepEquals(values, data))
1261+
})
1262+
}
1263+
}
1264+
1265+
func TestDrainWrongMap(t *testing.T) {
1266+
arr, err := NewMap(&MapSpec{
1267+
Type: Array,
1268+
KeySize: 4,
1269+
ValueSize: 4,
1270+
MaxEntries: 10,
1271+
})
1272+
qt.Assert(t, qt.IsNil(err))
1273+
defer arr.Close()
1274+
1275+
var key, value uint32
1276+
entries := arr.Drain()
1277+
1278+
qt.Assert(t, qt.IsFalse(entries.Next(&key, &value)))
1279+
qt.Assert(t, qt.IsNotNil(entries.Err()))
1280+
}
1281+
1282+
func TestMapDrainerAllocations(t *testing.T) {
1283+
for _, mapType := range []MapType{
1284+
Hash,
1285+
Queue,
1286+
} {
1287+
t.Run(mapType.String(), func(t *testing.T) {
1288+
var (
1289+
key, value uint32
1290+
anyKey interface{}
1291+
keyPtr interface{} = &key
1292+
keySize uint32 = 4
1293+
)
1294+
1295+
if mapType == Queue {
1296+
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
1297+
keySize = 0
1298+
keyPtr = nil
1299+
}
1300+
1301+
m, err := NewMap(&MapSpec{
1302+
Type: mapType,
1303+
KeySize: keySize,
1304+
ValueSize: 4,
1305+
MaxEntries: 10,
1306+
})
1307+
qt.Assert(t, qt.ErrorIs(err, nil))
1308+
defer m.Close()
1309+
1310+
for i := 0; i < int(m.MaxEntries()); i++ {
1311+
if keySize != 0 {
1312+
anyKey = uint32(i)
1313+
}
1314+
if err := m.Put(anyKey, uint32(i)); err != nil {
1315+
t.Fatal(err)
1316+
}
1317+
}
1318+
1319+
iter := m.Drain()
1320+
1321+
allocs := testing.AllocsPerRun(int(m.MaxEntries()-1), func() {
1322+
if !iter.Next(keyPtr, &value) {
1323+
t.Fatal("Next failed while draining: %w", iter.Err())
1324+
}
1325+
})
1326+
1327+
qt.Assert(t, qt.Equals(allocs, float64(0)))
1328+
})
1329+
}
1330+
}
1331+
11771332
func TestMapBatchLookupAllocations(t *testing.T) {
11781333
testutils.SkipIfNotSupported(t, haveBatchAPI())
11791334

0 commit comments

Comments
 (0)