Skip to content

Commit 38f2307

Browse files
committed
proxy: use context for proxy check and timeouts
Use a context with deadline of proxyTimeout instead of a timer to handle checks and timeouts. As an additional effect the check function will exit or will not update the proxy address if the context is done. Also handle signals to stop proxy.
1 parent fc6e568 commit 38f2307

File tree

2 files changed

+98
-60
lines changed

2 files changed

+98
-60
lines changed

cmd/proxy/cmd/proxy.go

Lines changed: 96 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import (
1919
"fmt"
2020
"net"
2121
"net/http"
22+
"os"
23+
"os/signal"
2224
"sync"
25+
"syscall"
2326
"time"
2427

2528
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() {
163166
}
164167
}
165168

166-
func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
169+
func (c *ClusterChecker) updateDestAddress(destAddr *net.TCPAddr) {
167170
c.pollonMutex.Lock()
168171
defer c.pollonMutex.Unlock()
169172
if c.pp != nil {
170-
c.pp.C <- confData
173+
c.pp.C <- pollon.ConfData{DestAddr: destAddr}
171174
}
172175
}
173176

174-
func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error {
177+
func (c *ClusterChecker) setProxyInfo(ctx context.Context, e store.Store, generation int64, proxyTimeout time.Duration) error {
175178
proxyInfo := &cluster.ProxyInfo{
176179
InfoUID: common.UID(),
177180
UID: c.uid,
@@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime
180183
}
181184
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))
182185

183-
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil {
186+
if err := c.e.SetProxyInfo(ctx, proxyInfo, 2*proxyTimeout); err != nil {
184187
return err
185188
}
186189
return nil
187190
}
188191

189-
// Check reads the cluster data and applies the right pollon configuration.
190-
func (c *ClusterChecker) Check() error {
191-
cd, _, err := c.e.GetClusterData(context.TODO())
192+
// check reads the cluster data and applies the right pollon configuration.
193+
func (c *ClusterChecker) check(ctx context.Context) error {
194+
cd, _, err := c.e.GetClusterData(ctx)
192195
if err != nil {
193196
return fmt.Errorf("cannot get cluster data: %v", err)
194197
}
@@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error {
201204
log.Debugf("cd dump: %s", spew.Sdump(cd))
202205
if cd == nil {
203206
log.Infow("no clusterdata available, closing connections to master")
204-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
207+
c.updateDestAddress(nil)
205208
return nil
206209
}
207210
if cd.FormatVersion != cluster.CurrentCDFormatVersion {
208-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
211+
c.updateDestAddress(nil)
209212
return fmt.Errorf("unsupported clusterdata format version: %d", cd.FormatVersion)
210213
}
211214
if err = cd.Cluster.Spec.Validate(); err != nil {
212-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
215+
c.updateDestAddress(nil)
213216
return fmt.Errorf("clusterdata validation failed: %v", err)
214217
}
215218

@@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error {
228231
proxy := cd.Proxy
229232
if proxy == nil {
230233
log.Infow("no proxy object available, closing connections to master")
231-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
234+
c.updateDestAddress(nil)
232235
// ignore errors on setting proxy info
233-
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil {
236+
if err = c.setProxyInfo(ctx, c.e, cluster.NoGeneration, proxyTimeout); err != nil {
234237
log.Errorw("failed to update proxyInfo", zap.Error(err))
235238
} else {
236239
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
@@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error {
245248
db, ok := cd.DBs[proxy.Spec.MasterDBUID]
246249
if !ok {
247250
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
248-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
251+
c.updateDestAddress(nil)
249252
// ignore errors on setting proxy info
250-
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
253+
if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
251254
log.Errorw("failed to update proxyInfo", zap.Error(err))
252255
} else {
253256
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
@@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error {
259262
return nil
260263
}
261264

265+
// TODO(sgotti) use a resolver with a context if it exists
262266
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
263267
if err != nil {
264268
log.Errorw("cannot resolve db address", zap.Error(err))
265-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
269+
c.updateDestAddress(nil)
266270
return nil
267271
}
268-
log.Infow("master address", "address", addr)
269-
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
272+
273+
if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
270274
// if we failed to update our proxy info when a master is defined we
271275
// cannot ignore this error since the sentinel won't know that we exist
272276
// and are sending connections to a master so, when electing a new
@@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error {
282286

283287
// start proxing only if we are inside enabledProxies, this ensures that the
284288
// sentinel has read our proxyinfo and knows we are alive
285-
if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
286-
log.Infow("proxying to master address", "address", addr)
287-
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
288-
} else {
289+
if !util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
289290
log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr)
290-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
291+
c.updateDestAddress(nil)
292+
return nil
291293
}
292294

293-
return nil
294-
}
295-
296-
func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
297-
c.configMutex.Lock()
298-
timeoutTimer := time.NewTimer(c.proxyTimeout)
299-
c.configMutex.Unlock()
300-
301-
for {
302-
select {
303-
case <-timeoutTimer.C:
304-
log.Infow("check timeout timer fired")
305-
// if the check timeouts close all connections and stop listening
306-
// (for example to avoid load balancers forward connections to us
307-
// since we aren't ready or in a bad state)
308-
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
309-
if c.stopListening {
310-
c.stopPollonProxy()
311-
}
295+
// before updating the pollon address, check that the context isn't timed
296+
// out, usually if the context is timeout out one of the above calls will
297+
// return an error but libkv stores doesn't handle contexts so we should
298+
// check here.
299+
select {
300+
case <-ctx.Done():
301+
log.Infow("not updating proxy address since context is done: %v", ctx.Err())
302+
return nil
303+
default:
304+
}
312305

313-
case <-checkOkCh:
314-
log.Debugw("check ok message received")
306+
log.Infow("proxying to master address", "address", addr)
307+
c.updateDestAddress(addr)
315308

316-
// ignore if stop succeeded or not due to timer already expired
317-
timeoutTimer.Stop()
309+
return nil
310+
}
318311

319-
c.configMutex.Lock()
320-
timeoutTimer = time.NewTimer(c.proxyTimeout)
321-
c.configMutex.Unlock()
312+
// timeoutChecker will forcefully close connections when the context times
313+
// out.
314+
func (c *ClusterChecker) timeoutChecker(ctx context.Context) {
315+
<-ctx.Done()
316+
if ctx.Err() == context.DeadlineExceeded {
317+
log.Infow("check timeout fired")
318+
// if the check timeouts close all connections and stop listening
319+
// (for example to avoid load balancers forward connections to us
320+
// since we aren't ready or in a bad state)
321+
c.updateDestAddress(nil)
322+
if c.stopListening {
323+
c.stopPollonProxy()
322324
}
323325
}
324326
}
325327

326-
func (c *ClusterChecker) Start() error {
327-
checkOkCh := make(chan struct{})
328+
// checkLoop executes at predefined intervals the Check function. It'll force
329+
// close connections when a check function continuosly fails for more than a
330+
// timeout.
331+
func (c *ClusterChecker) checkLoop(pctx context.Context) error {
328332
checkCh := make(chan error)
329333
timerCh := time.NewTimer(0).C
330334

331-
// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
332-
// if the Check method is blocked somewhere.
333-
// The idomatic/cleaner solution will be to use a context instead of this
334-
// TimeoutChecker but we have to change the libkv stores to support contexts.
335-
go c.TimeoutChecker(checkOkCh)
335+
c.configMutex.Lock()
336+
ctx, cancel := context.WithTimeout(pctx, c.proxyTimeout)
337+
c.configMutex.Unlock()
336338

337339
for {
338340
select {
341+
case <-pctx.Done():
342+
cancel()
343+
return nil
339344
case <-timerCh:
345+
// start a new context if it's already done, this happens when the
346+
// context is timed out or cancelled.
347+
select {
348+
case <-ctx.Done():
349+
c.configMutex.Lock()
350+
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
351+
c.configMutex.Unlock()
352+
default:
353+
}
354+
340355
go func() {
341-
checkCh <- c.Check()
356+
checkCh <- c.check(ctx)
342357
}()
343358
case err := <-checkCh:
344359
if err != nil {
345-
// don't report check ok since it returned an error
360+
// if the check function returned an error then don't stop the
361+
// context so if it times out the TimeoutChecker will close
362+
// connections or it could be cancelled if the next check
363+
// succeeds before the timeout
346364
log.Infow("check function error", zap.Error(err))
347365
} else {
348-
// report that check was ok
349-
checkOkCh <- struct{}{}
366+
// check was ok, so cancel the context and start a new one with a new TimeoutChecker
367+
cancel()
368+
369+
c.configMutex.Lock()
370+
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
371+
c.configMutex.Unlock()
372+
go c.timeoutChecker(ctx)
350373
}
374+
351375
c.configMutex.Lock()
352376
timerCh = time.NewTimer(c.proxyCheckInterval).C
353377
c.configMutex.Unlock()
354378

355379
case err := <-c.endPollonProxyCh:
356380
if err != nil {
381+
cancel()
357382
return fmt.Errorf("proxy error: %v", err)
358383
}
359384
}
360385
}
361386
}
362387

388+
func sigHandler(sigs chan os.Signal, cancel context.CancelFunc) {
389+
s := <-sigs
390+
log.Debugw("got signal", "signal", s)
391+
cancel()
392+
}
393+
363394
func Execute() {
364395
if err := flagutil.SetFlagsFromEnv(CmdProxy.PersistentFlags(), "STPROXY"); err != nil {
365396
log.Fatal(err)
@@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) {
428459
}()
429460
}
430461

462+
ctx, cancel := context.WithCancel(context.Background())
463+
sigs := make(chan os.Signal, 1)
464+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
465+
go sigHandler(sigs, cancel)
466+
431467
clusterChecker, err := NewClusterChecker(uid, cfg)
432468
if err != nil {
433469
log.Fatalf("cannot create cluster checker: %v", err)
434470
}
435-
if err = clusterChecker.Start(); err != nil {
471+
if err = clusterChecker.checkLoop(ctx); err != nil {
436472
log.Fatalf("cluster checker ended with error: %v", err)
437473
}
438474
}

tests/integration/proxy_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ func TestProxyListening(t *testing.T) {
8989
Spec: &cluster.ClusterSpec{
9090
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
9191
FailInterval: &cluster.Duration{Duration: 10 * time.Second},
92+
// user faster check interval for tests
93+
ProxyCheckInterval: &cluster.Duration{Duration: 1 * time.Second},
9294
},
9395
Status: cluster.ClusterStatus{
9496
CurrentGeneration: 1,

0 commit comments

Comments
 (0)