Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kfake: add DropControl, SleepOutOfOrder, CoordinatorFor, RehashCoordinators #649

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 247 additions & 61 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@ type (
controller *broker
bs []*broker

coordinatorGen atomic.Uint64

adminCh chan func()
reqCh chan *clientReq
wakeCh chan *slept
@@ -64,6 +66,7 @@ type (
key int16
fn controlFn
keep bool
drop bool
lastReq map[*clientConn]*clientReq // used to not re-run requests that slept, see doc comments below
}

@@ -254,6 +257,7 @@ func (b *broker) listen() {
}

func (c *Cluster) run() {
outer:
for {
var (
creq *clientReq
@@ -270,11 +274,13 @@ func (c *Cluster) run() {
return

case admin := <-c.adminCh:
// Run a custom request in the context of the cluster.
admin()
continue

case creq = <-c.reqCh:
if c.cfg.sleepOutOfOrder {
break
}
// If we have any sleeping request on this node,
// we enqueue the new live request to the end and
// wait for the sleeping request to finish.
@@ -301,19 +307,28 @@ func (c *Cluster) run() {
// Control flow is weird here, but is described more
// fully in the finish/resleep/etc methods.
c.continueSleptControl(s)
select {
case res := <-s.res:
c.finishSleptControl(s)
cctx := s.cctx
s = nil
kresp, err, handled = res.kresp, res.err, res.handled
if handled {
c.popControl(cctx)
goto afterControl
inner:
for {
select {
case <-c.die:
return
case admin := <-c.adminCh:
admin()
continue inner
case res := <-s.res:
c.finishSleptControl(s)
cctx := s.cctx
s = nil
kresp, err, handled = res.kresp, res.err, res.handled
c.maybePopControl(handled, cctx)
if handled {
goto afterControl
}
break inner
case sleepChs := <-c.controlSleep:
c.resleepSleptControl(s, sleepChs)
continue outer
}
case sleepChs := <-c.controlSleep:
c.resleepSleptControl(s, sleepChs)
continue
}

case w = <-c.watchFetchCh:
@@ -435,10 +450,15 @@ func (c *Cluster) run() {
// Controlling a request drops the control function from the cluster, meaning
// that a control function can only control *one* request. To keep the control
// function handling more requests, you can call KeepControl within your
// control function.
// control function. Alternatively, if you want to just run some logic in your
// control function but then have the cluster handle the request as normal,
// you can call DropControl to drop a control function that was not handled.
//
// It is safe to add new control functions within a control function.
//
// It is safe to add new control functions within a control function. Control
// functions are not called concurrently.
// Control functions are run serially unless you use SleepControl, multiple
// control functions are "in progress", and you run Cluster.Close. Closing a
// Cluster awakens all sleeping control functions.
func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) {
c.ControlKey(-1, fn)
}
@@ -455,9 +475,15 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) {
// Controlling a request drops the control function from the cluster, meaning
// that a control function can only control *one* request. To keep the control
// function handling more requests, you can call KeepControl within your
// control function.
// control function. Alternatively, if you want to just run some logic in your
// control function but then have the cluster handle the request as normal,
// you can call DropControl to drop a control function that was not handled.
//
// It is safe to add new control functions within a control function.
//
// Control functions are run serially unless you use SleepControl, multiple
// control functions are "in progress", and you run Cluster.Close. Closing a
// Cluster awakens all sleeping control functions.
func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool)) {
c.controlMu.Lock()
defer c.controlMu.Unlock()
@@ -484,6 +510,19 @@ func (c *Cluster) KeepControl() {
}
}

// DropControl allows you to drop the current control function. This takes
// precedence over KeepControl. The use of this function is you can run custom
// control logic *once*, drop the control function, and return that the
// function was not handled -- thus allowing other control functions to run, or
// allowing the kfake cluster to process the request as normal.
func (c *Cluster) DropControl() {
c.controlMu.Lock()
defer c.controlMu.Unlock()
if c.currentControl != nil {
c.currentControl.drop = true
}
}

// SleepControl sleeps the current control function until wakeup returns. This
// yields to run any other connection.
//
@@ -525,7 +564,10 @@ func (c *Cluster) SleepControl(wakeup func()) {
}()

c.controlSleep <- sleepChs
<-sleepChs.clientCont
select {
case <-sleepChs.clientCont:
case <-c.die:
}
}

// CurrentNode is solely valid from within a control function; it returns
@@ -560,20 +602,25 @@ func (c *Cluster) tryControlKey(key int16, creq *clientReq) (kmsg.Response, erro
}
cctx.lastReq[creq.cc] = creq
res := c.runControl(cctx, creq)
select {
case res := <-res:
if res.handled {
c.popControl(cctx)
return res.kresp, res.err, true
for {
select {
case <-c.die:
return nil, nil, false
case admin := <-c.adminCh:
admin()
continue
case res := <-res:
c.maybePopControl(res.handled, cctx)
return res.kresp, res.err, res.handled
case sleepChs := <-c.controlSleep:
c.beginSleptControl(&slept{
cctx: cctx,
sleepChs: sleepChs,
res: res,
creq: creq,
})
return nil, nil, true
}
case sleepChs := <-c.controlSleep:
c.beginSleptControl(&slept{
cctx: cctx,
sleepChs: sleepChs,
res: res,
creq: creq,
})
return nil, nil, true
}
}
return nil, nil, false
@@ -606,7 +653,11 @@ func (c *Cluster) beginSleptControl(s *slept) {
// unlock us safely.
bs := c.sleeping[s.creq.cc]
if bs == nil {
bs = &bsleep{c: c}
bs = &bsleep{
c: c,
set: make(map[*slept]struct{}),
setWake: make(chan *slept, 1),
}
c.sleeping[s.creq.cc] = bs
}
bs.enqueue(s)
@@ -648,23 +699,29 @@ func (c *Cluster) resleepSleptControl(s *slept, sleepChs sleepChs) {
c.controlMu.Unlock()
s.sleepChs = sleepChs
s.continueDequeue <- struct{}{}
// For OOO requests, we need to manually trigger a goroutine to
// watch for the sleep to end.
s.bs.maybeWaitOOOWake(s)
}

func (c *Cluster) popControl(cctx *controlCtx) {
if !cctx.keep {
func (c *Cluster) maybePopControl(handled bool, cctx *controlCtx) {
if handled && !cctx.keep || cctx.drop {
delete(c.control[cctx.key], cctx)
}
}

// bsleep manages sleeping requests on a connection to a broker, or
// non-sleeping requests that are waiting for sleeping requests to finish.
type bsleep struct {
c *Cluster
mu sync.Mutex
queue []*slept
c *Cluster
mu sync.Mutex
queue []*slept
set map[*slept]struct{}
setWake chan *slept
}

type slept struct {
bs *bsleep
cctx *controlCtx
sleepChs sleepChs
res <-chan controlResp
@@ -681,69 +738,172 @@ type sleepChs struct {

// enqueue has a few potential behaviors.
//
// * If s is waiting, this is a new request enqueueing to the back of an
// (1) If s is waiting, this is a new request enqueueing to the back of an
// existing queue, where we are waiting for the head request to finish
// sleeping. Easy case.
//
// * If s is not waiting, this is a sleeping request. If the queue is empty,
// (2) If s is not waiting, this is a sleeping request. If the queue is empty,
// this is the first sleeping request on a node. We enqueue and start our wait
// goroutine. Easy.
//
// * If s is not waiting, but our queue is non-empty, this must be from a
// convoluted scenario: There was a request in front of us that slept, and we
// were waiting, and now we ourselves slept OR we previously slept, but we
// returned "not handled". We are now re-enqueueing ourself. Rather than add to
// the back, we update our head request with the new enqueued values. In this
// last case, bsleep is actually waiting for a signal down 'continueDequeue',
// and it will be signaled in the 'run' goroutine once tryControl returns
// (which it will, right after we are done here). We need to update values on
// the head.
// (3) If s is not waiting, but our queue is non-empty, this must be from a
// convoluted scenario:
//
// (a) the user has SleepOutOfOrder configured,
// (b) or, there was a request in front of us that slept, we were waiting,
// and now we ourselves are sleeping
// (c) or, we are sleeping for the second time in a single control
func (bs *bsleep) enqueue(s *slept) bool {
if bs == nil {
return false
return false // Do not enqueue, nothing sleeping
}
s.continueDequeue = make(chan struct{}, 1)
s.bs = bs
bs.mu.Lock()
defer bs.mu.Unlock()
if s.waiting {
if len(bs.queue) > 0 {
bs.queue = append(bs.queue, s)
if bs.c.cfg.sleepOutOfOrder {
panic("enqueueing a waiting request even though we are sleeping out of order")
}
if !bs.empty() {
bs.keep(s) // Case (1)
return true
}
return false
return false // We do not enqueue, do not wait: nothing sleeping ahead of us
}
if len(bs.queue) == 0 {
bs.queue = append(bs.queue, s)
go bs.wait()
if bs.empty() {
bs.keep(s)
go bs.wait() // Case (2)
return true
}
var q0 *slept
if !bs.c.cfg.sleepOutOfOrder {
q0 = bs.queue[0] // Case (3b) or (3c) -- just update values below
} else {
// Case (3a), out of order sleep: we need to check the entire
// queue to see if this request was already sleeping and, if
// so, update the values. If it was not already sleeping, we
// "keep" the new sleeping item.
bs.keep(s)
return true
}
q0 := bs.queue[0]
if q0.creq != s.creq {
panic("internal error: sleeping request not head request")
}
// We do not update continueDequeue because it is actively being read,
// we just reuse the old value.
q0.cctx = s.cctx
q0.sleepChs = s.sleepChs
q0.res = s.res
q0.waiting = s.waiting
return true
}

// keep stores a sleeping request to be managed. For out of order control, the
// log is a bit more complicated and we need to watch for the control sleep
// finishing here, and forward the "I'm done sleeping" notification to waitSet.
func (bs *bsleep) keep(s *slept) {
if !bs.c.cfg.sleepOutOfOrder {
bs.queue = append(bs.queue, s)
return
}
bs.set[s] = struct{}{}
bs.maybeWaitOOOWake(s)
}

func (bs *bsleep) maybeWaitOOOWake(s *slept) {
if !bs.c.cfg.sleepOutOfOrder {
return
}
go func() {
select {
case <-bs.c.die:
case <-s.sleepChs.clientWait:
select {
case <-bs.c.die:
case bs.setWake <- s:
}
}
}()
}

func (bs *bsleep) empty() bool {
return len(bs.queue) == 0 && len(bs.set) == 0
}

func (bs *bsleep) wait() {
if bs.c.cfg.sleepOutOfOrder {
bs.waitSet()
} else {
bs.waitQueue()
}
}

// For out of order control, all control functions run concurrently, serially.
// Whenever they wake up, they send themselves down setWake. waitSet manages
// handling the wake up and interacting with the serial manage goroutine to
// run everything properly.
func (bs *bsleep) waitSet() {
for {
bs.mu.Lock()
if len(bs.queue) == 0 {
if len(bs.set) == 0 {
bs.mu.Unlock()
return
}
q0 := bs.queue[0]
bs.mu.Unlock()

if q0.continueDequeue == nil {
q0.continueDequeue = make(chan struct{}, 1)
// Wait for a control function to awaken.
var q *slept
select {
case <-bs.c.die:
return
case q = <-bs.setWake:
q.sleepChs.clientWait = nil
}

// Now, schedule ourselves with the run loop.
select {
case <-bs.c.die:
return
case bs.c.wakeCh <- q:
}

// Wait for this control function to finish its loop in the run
// function. Once it does, if clientWait is non-nil, the
// control function went back to sleep. If it is nil, the
// control function is done and we remove this from tracking.
select {
case <-bs.c.die:
return
case <-q.continueDequeue:
}
if q.sleepChs.clientWait == nil {
bs.mu.Lock()
delete(bs.set, q)
bs.mu.Unlock()
}
}
}

// For in-order control functions, the concept is slightly simpler but the
// logic flow is the same. We wait for the head control function to wake up,
// try to run it, and then wait for it to finish. The logic of this function is
// the same as waitSet, minus the middle part where we wait for something to
// wake up.
func (bs *bsleep) waitQueue() {
for {
bs.mu.Lock()
if len(bs.queue) == 0 {
bs.mu.Unlock()
return
}
q0 := bs.queue[0]
bs.mu.Unlock()

if q0.sleepChs.clientWait != nil {
select {
case <-bs.c.die:
return
case <-q0.sleepChs.clientWait:
q0.sleepChs.clientWait = nil
}
@@ -755,7 +915,11 @@ func (bs *bsleep) wait() {
case bs.c.wakeCh <- q0:
}

<-q0.continueDequeue
select {
case <-bs.c.die:
return
case <-q0.continueDequeue:
}
if q0.sleepChs.clientWait == nil {
bs.mu.Lock()
bs.queue = bs.queue[1:]
@@ -802,6 +966,28 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32
return err
}

// CoordinatorFor returns the node ID of the group or transaction coordinator
// for the given key.
func (c *Cluster) CoordinatorFor(key string) int32 {
var n int32
c.admin(func() {
l := len(c.bs)
if l == 0 {
n = -1
return
}
n = c.coordinator(key).node
})
return n
}

// RehashCoordinators simulates group and transacational ID coordinators moving
// around. All group and transactional IDs are rekeyed. This forces clients to
// reload coordinators.
func (c *Cluster) RehashCoordinators() {
c.coordinatorGen.Add(1)
}

// AddNode adds a node to the cluster. If nodeID is -1, the next node ID is
// used. If port is 0 or negative, a random port is chosen. This returns the
// added node ID and the port used, or an error if the node already exists or
11 changes: 11 additions & 0 deletions pkg/kfake/config.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ type cfg struct {
enableSASL bool
sasls map[struct{ m, u string }]string // cleared after client initialization
tls *tls.Config

sleepOutOfOrder bool
}

// NumBrokers sets the number of brokers to start in the fake cluster.
@@ -113,3 +115,12 @@ func TLS(c *tls.Config) Opt {
func SeedTopics(partitions int32, ts ...string) Opt {
return opt{func(cfg *cfg) { cfg.seedTopics = append(cfg.seedTopics, seedTopics{partitions, ts}) }}
}

// SleepOutOfOrder allows functions to be handled out of order when control
// functions are sleeping. The functions are be handled internally out of
// order, but responses still wait for the sleeping requests to finish. This
// can be used to set up complicated chains of control where functions only
// advance when you know another request is actively being handled.
func SleepOutOfOrder() Opt {
return opt{func(cfg *cfg) { cfg.sleepOutOfOrder = true }}
}
3 changes: 2 additions & 1 deletion pkg/kfake/groups.go
Original file line number Diff line number Diff line change
@@ -101,7 +101,8 @@ func (gs groupState) String() string {
}

func (c *Cluster) coordinator(id string) *broker {
n := hashString(id) % uint64(len(c.bs))
gen := c.coordinatorGen.Load()
n := hashString(fmt.Sprint("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs))
return c.bs[n]
}