Skip to content

Commit 3ac6523

Browse files
committed
refactored Map.Drain API and tests
Signed-off-by: Simone Magnani <[email protected]>
1 parent 57a6e30 commit 3ac6523

File tree

2 files changed

+232
-217
lines changed

2 files changed

+232
-217
lines changed

map.go

Lines changed: 107 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,8 +1291,8 @@ func batchCount(keys, values any) (int, error) {
12911291
// safe. You may see the same key multiple times. Iteration may
12921292
// also abort with an error, see IsIterationAborted.
12931293
//
1294-
// Iterating a queue/stack map returns an error (NextKey).
1295-
// Map.Drain should be used instead.
1294+
// Iterating a queue/stack map returns an error (NextKey) as the.
1295+
// Map.Drain API should be used instead.
12961296
func (m *Map) Iterate() *MapIterator {
12971297
return newMapIterator(m)
12981298
}
@@ -1595,25 +1595,21 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
15951595
}
15961596

15971597
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
1598-
// For array-like maps NextKey returns nil only after maxEntries
1599-
// iterations.
1598+
var key interface{}
1599+
1600+
// For array-like maps NextKey returns nil only after maxEntries iterations.
16001601
for mi.count <= mi.maxEntries {
16011602
if mi.cursor == nil {
16021603
// Pass nil interface to NextKey to make sure the Map's first key
16031604
// is returned. If we pass an uninitialized []byte instead, it'll see a
16041605
// non-nil interface and try to marshal it.
16051606
mi.cursor = make([]byte, mi.target.keySize)
1606-
mi.err = mi.target.NextKey(nil, mi.cursor)
1607+
key = nil
16071608
} else {
1608-
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
1609+
key = mi.cursor
16091610
}
16101611

1611-
if errors.Is(mi.err, ErrKeyNotExist) {
1612-
mi.done = true
1613-
mi.err = nil
1614-
return false
1615-
} else if mi.err != nil {
1616-
mi.err = fmt.Errorf("get next key: %w", mi.err)
1612+
if !mi.fetchNextKey(key) {
16171613
return false
16181614
}
16191615

@@ -1635,110 +1631,131 @@ func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
16351631
return false
16361632
}
16371633

1638-
buf := mi.cursor.([]byte)
1639-
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1640-
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1641-
} else {
1642-
mi.err = sysenc.Unmarshal(keyOut, buf)
1643-
}
1644-
1645-
return mi.err == nil
1634+
return mi.copyCursorToKeyOut(keyOut)
16461635
}
16471636

16481637
mi.err = fmt.Errorf("%w", ErrIterationAborted)
16491638
return false
16501639
}
16511640

16521641
func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
1653-
// Check for maps without a key (e.g., Queue/Stack). In this case, when there is no
1654-
// more data, ErrKeyNotExist arise, but we gently stop the retrieval with no error.
1655-
if mi.target.keySize == 0 {
1656-
if keyOut != nil {
1657-
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
1658-
return false
1659-
}
1660-
mi.err = mi.target.LookupAndDelete(keyOut, valueOut)
1661-
if errors.Is(mi.err, ErrKeyNotExist) {
1662-
mi.done = true
1663-
mi.err = nil
1664-
return false
1665-
} else if mi.err != nil {
1666-
return false
1667-
}
1668-
mi.count++
1669-
return true
1642+
if mi.isKeylessMap() {
1643+
return mi.handleKeylessMap(keyOut, valueOut)
16701644
}
16711645

1672-
// Here we allocate only once data for retrieving the next key in the map.
1646+
// Allocate only once data for retrieving the next key in the map.
16731647
if mi.cursor == nil {
16741648
mi.cursor = make([]byte, mi.target.keySize)
16751649
}
16761650

1677-
for {
1678-
// Always retrieve first key in the map. This should ensure that
1679-
// the whole map is traversed, despite concurrent insertion.
1680-
// The expected ordering might differ:
1681-
// - initial keys in map: `a -> b -> c`
1682-
// - call MapIterator.Next and retrieve key `a`
1683-
// - insert key `d` in map
1684-
// - retrieve all the remaining keys `d -> b -> c`
1685-
mi.err = mi.target.NextKey(nil, mi.cursor)
1686-
if errors.Is(mi.err, ErrKeyNotExist) {
1687-
mi.done = true
1651+
// Always retrieve first key in the map. This should ensure that the whole map
1652+
// is traversed, despite concurrent operations (ordering of items might differ).
1653+
for mi.err == nil && mi.fetchNextKey(nil) {
1654+
if mi.tryLookupAndDelete(valueOut) {
1655+
mi.count++
1656+
return mi.copyCursorToKeyOut(keyOut)
1657+
}
1658+
}
1659+
return false
1660+
}
1661+
1662+
func (mi *MapIterator) tryLookupAndDelete(valueOut interface{}) bool {
1663+
// Default try using the updated `Map.LookupAndDelete` API.
1664+
if !mi.fallback {
1665+
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
1666+
if mi.err == nil {
1667+
return true
1668+
}
1669+
1670+
switch {
1671+
case errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL):
1672+
mi.fallback = true
1673+
case errors.Is(mi.err, ErrKeyNotExist):
1674+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
16881675
mi.err = nil
16891676
return false
1690-
} else if mi.err != nil {
1691-
mi.err = fmt.Errorf("get next key: %w", mi.err)
1677+
default:
1678+
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
16921679
return false
16931680
}
1681+
}
16941682

1695-
// Check if LookupAndDelete is supported and not invalid args, otherwise fallback
1696-
if !mi.fallback {
1697-
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
1698-
if mi.err != nil && errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL) {
1699-
// Fallback to sequential while also reusing `mi.cursor`.
1700-
mi.fallback = true
1701-
} else if errors.Is(mi.err, ErrKeyNotExist) {
1702-
// Same as in MapIterator.nextIterate.
1703-
continue
1704-
} else if mi.err != nil {
1705-
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
1706-
return false
1707-
}
1683+
// Fallback to sequential `Map.Lookup` -> `Map.Delete` when `Map.LookupAndDelete` is not supported.
1684+
mi.err = mi.target.Lookup(mi.cursor, valueOut)
1685+
if mi.err != nil {
1686+
if errors.Is(mi.err, ErrKeyNotExist) {
1687+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
1688+
mi.err = nil
1689+
} else {
1690+
mi.err = fmt.Errorf("look up next key: %w", mi.err)
17081691
}
1692+
return false
1693+
}
17091694

1710-
// Falling back to sequential Lookup -> Delete in the case
1711-
// LookupAndDelete is not supported (e.g., kernel < 5.14).
1712-
if mi.fallback {
1713-
mi.err = mi.target.Lookup(mi.cursor, valueOut)
1714-
if errors.Is(mi.err, ErrKeyNotExist) {
1715-
// Same as in MapIterator.nextIterate.
1716-
continue
1717-
} else if mi.err != nil {
1718-
mi.err = fmt.Errorf("look up next key: %w", mi.err)
1719-
return false
1720-
}
1721-
mi.err = mi.target.Delete(mi.cursor)
1722-
if errors.Is(mi.err, ErrKeyNotExist) {
1723-
// Same as in MapIterator.nextIterate.
1724-
continue
1725-
} else if mi.err != nil {
1726-
mi.err = fmt.Errorf("delete key: %w", mi.err)
1727-
return false
1728-
}
1695+
mi.err = mi.target.Delete(mi.cursor)
1696+
if mi.err != nil {
1697+
if errors.Is(mi.err, ErrKeyNotExist) {
1698+
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
1699+
mi.err = nil
1700+
} else {
1701+
mi.err = fmt.Errorf("delete key: %w", mi.err)
17291702
}
1703+
return false
1704+
}
1705+
1706+
return true
1707+
}
17301708

1709+
func (mi *MapIterator) isKeylessMap() bool {
1710+
return mi.target.keySize == 0
1711+
}
1712+
1713+
func (mi *MapIterator) handleKeylessMap(keyOut, valueOut interface{}) bool {
1714+
if keyOut != nil {
1715+
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
1716+
return false
1717+
}
1718+
1719+
mi.err = mi.target.LookupAndDelete(nil, valueOut)
1720+
if mi.err == nil {
17311721
mi.count++
1722+
return true
1723+
}
17321724

1733-
buf := mi.cursor.([]byte)
1734-
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1735-
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1736-
} else {
1737-
mi.err = sysenc.Unmarshal(keyOut, buf)
1738-
}
1725+
if errors.Is(mi.err, ErrKeyNotExist) {
1726+
mi.done = true
1727+
mi.err = nil
1728+
} else {
1729+
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
1730+
}
1731+
1732+
return false
1733+
}
1734+
1735+
func (mi *MapIterator) fetchNextKey(key interface{}) bool {
1736+
mi.err = mi.target.NextKey(key, mi.cursor)
1737+
if mi.err == nil {
1738+
return true
1739+
}
1740+
1741+
if errors.Is(mi.err, ErrKeyNotExist) {
1742+
mi.done = true
1743+
mi.err = nil
1744+
} else {
1745+
mi.err = fmt.Errorf("get next key: %w", mi.err)
1746+
}
17391747

1740-
return mi.err == nil
1748+
return false
1749+
}
1750+
1751+
func (mi *MapIterator) copyCursorToKeyOut(keyOut interface{}) bool {
1752+
buf := mi.cursor.([]byte)
1753+
if ptr, ok := keyOut.(unsafe.Pointer); ok {
1754+
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
1755+
} else {
1756+
mi.err = sysenc.Unmarshal(keyOut, buf)
17411757
}
1758+
return mi.err == nil
17421759
}
17431760

17441761
// Err returns any encountered error.

0 commit comments

Comments
 (0)