@@ -19,7 +19,10 @@ import (
1919 "fmt"
2020 "net"
2121 "net/http"
22+ "os"
23+ "os/signal"
2224 "sync"
25+ "syscall"
2326 "time"
2427
2528 "github.com/prometheus/client_golang/prometheus/promhttp"
@@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() {
163166 }
164167}
165168
166- func (c * ClusterChecker ) sendPollonConfData ( confData pollon. ConfData ) {
169+ func (c * ClusterChecker ) updateDestAddress ( destAddr * net. TCPAddr ) {
167170 c .pollonMutex .Lock ()
168171 defer c .pollonMutex .Unlock ()
169172 if c .pp != nil {
170- c .pp .C <- confData
173+ c .pp .C <- pollon. ConfData { DestAddr : destAddr }
171174 }
172175}
173176
174- func (c * ClusterChecker ) SetProxyInfo ( e store.Store , generation int64 , proxyTimeout time.Duration ) error {
177+ func (c * ClusterChecker ) setProxyInfo ( ctx context. Context , e store.Store , generation int64 , proxyTimeout time.Duration ) error {
175178 proxyInfo := & cluster.ProxyInfo {
176179 InfoUID : common .UID (),
177180 UID : c .uid ,
@@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime
180183 }
181184 log .Debugf ("proxyInfo dump: %s" , spew .Sdump (proxyInfo ))
182185
183- if err := c .e .SetProxyInfo (context . TODO () , proxyInfo , 2 * proxyTimeout ); err != nil {
186+ if err := c .e .SetProxyInfo (ctx , proxyInfo , 2 * proxyTimeout ); err != nil {
184187 return err
185188 }
186189 return nil
187190}
188191
189- // Check reads the cluster data and applies the right pollon configuration.
190- func (c * ClusterChecker ) Check ( ) error {
191- cd , _ , err := c .e .GetClusterData (context . TODO () )
192+ // check reads the cluster data and applies the right pollon configuration.
193+ func (c * ClusterChecker ) check ( ctx context. Context ) error {
194+ cd , _ , err := c .e .GetClusterData (ctx )
192195 if err != nil {
193196 return fmt .Errorf ("cannot get cluster data: %v" , err )
194197 }
@@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error {
201204 log .Debugf ("cd dump: %s" , spew .Sdump (cd ))
202205 if cd == nil {
203206 log .Infow ("no clusterdata available, closing connections to master" )
204- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
207+ c .updateDestAddress ( nil )
205208 return nil
206209 }
207210 if cd .FormatVersion != cluster .CurrentCDFormatVersion {
208- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
211+ c .updateDestAddress ( nil )
209212 return fmt .Errorf ("unsupported clusterdata format version: %d" , cd .FormatVersion )
210213 }
211214 if err = cd .Cluster .Spec .Validate (); err != nil {
212- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
215+ c .updateDestAddress ( nil )
213216 return fmt .Errorf ("clusterdata validation failed: %v" , err )
214217 }
215218
@@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error {
228231 proxy := cd .Proxy
229232 if proxy == nil {
230233 log .Infow ("no proxy object available, closing connections to master" )
231- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
234+ c .updateDestAddress ( nil )
232235 // ignore errors on setting proxy info
233- if err = c .SetProxyInfo ( c .e , cluster .NoGeneration , proxyTimeout ); err != nil {
236+ if err = c .setProxyInfo ( ctx , c .e , cluster .NoGeneration , proxyTimeout ); err != nil {
234237 log .Errorw ("failed to update proxyInfo" , zap .Error (err ))
235238 } else {
236239 // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
@@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error {
245248 db , ok := cd .DBs [proxy .Spec .MasterDBUID ]
246249 if ! ok {
247250 log .Infow ("no db object available, closing connections to master" , "db" , proxy .Spec .MasterDBUID )
248- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
251+ c .updateDestAddress ( nil )
249252 // ignore errors on setting proxy info
250- if err = c .SetProxyInfo ( c .e , proxy .Generation , proxyTimeout ); err != nil {
253+ if err = c .setProxyInfo ( ctx , c .e , proxy .Generation , proxyTimeout ); err != nil {
251254 log .Errorw ("failed to update proxyInfo" , zap .Error (err ))
252255 } else {
253256 // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
@@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error {
259262 return nil
260263 }
261264
265+ // TODO(sgotti) use a resolver with a context if it exists
262266 addr , err := net .ResolveTCPAddr ("tcp" , net .JoinHostPort (db .Status .ListenAddress , db .Status .Port ))
263267 if err != nil {
264268 log .Errorw ("cannot resolve db address" , zap .Error (err ))
265- c .sendPollonConfData (pollon. ConfData { DestAddr : nil } )
269+ c .updateDestAddress ( nil )
266270 return nil
267271 }
268- log . Infow ( "master address" , "address" , addr )
269- if err = c .SetProxyInfo ( c .e , proxy .Generation , proxyTimeout ); err != nil {
272+
273+ if err = c .setProxyInfo ( ctx , c .e , proxy .Generation , proxyTimeout ); err != nil {
270274 // if we failed to update our proxy info when a master is defined we
271275 // cannot ignore this error since the sentinel won't know that we exist
272276 // and are sending connections to a master so, when electing a new
@@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error {
282286
283287 // start proxing only if we are inside enabledProxies, this ensures that the
284288 // sentinel has read our proxyinfo and knows we are alive
285- if util .StringInSlice (proxy .Spec .EnabledProxies , c .uid ) {
286- log .Infow ("proxying to master address" , "address" , addr )
287- c .sendPollonConfData (pollon.ConfData {DestAddr : addr })
288- } else {
289+ if ! util .StringInSlice (proxy .Spec .EnabledProxies , c .uid ) {
289290 log .Infow ("not proxying to master address since we aren't in the enabled proxies list" , "address" , addr )
290- c .sendPollonConfData (pollon.ConfData {DestAddr : nil })
291+ c .updateDestAddress (nil )
292+ return nil
291293 }
292294
293- return nil
294- }
295-
296- func (c * ClusterChecker ) TimeoutChecker (checkOkCh chan struct {}) {
297- c .configMutex .Lock ()
298- timeoutTimer := time .NewTimer (c .proxyTimeout )
299- c .configMutex .Unlock ()
300-
301- for {
302- select {
303- case <- timeoutTimer .C :
304- log .Infow ("check timeout timer fired" )
305- // if the check timeouts close all connections and stop listening
306- // (for example to avoid load balancers forward connections to us
307- // since we aren't ready or in a bad state)
308- c .sendPollonConfData (pollon.ConfData {DestAddr : nil })
309- if c .stopListening {
310- c .stopPollonProxy ()
311- }
295+ // before updating the pollon address, check that the context isn't timed
296+ // out, usually if the context is timeout out one of the above calls will
297+ // return an error but libkv stores doesn't handle contexts so we should
298+ // check here.
299+ select {
300+ case <- ctx .Done ():
301+ log .Infow ("not updating proxy address since context is done: %v" , ctx .Err ())
302+ return nil
303+ default :
304+ }
312305
313- case <- checkOkCh :
314- log . Debugw ( "check ok message received" )
306+ log . Infow ( "proxying to master address" , "address" , addr )
307+ c . updateDestAddress ( addr )
315308
316- // ignore if stop succeeded or not due to timer already expired
317- timeoutTimer . Stop ()
309+ return nil
310+ }
318311
319- c .configMutex .Lock ()
320- timeoutTimer = time .NewTimer (c .proxyTimeout )
321- c .configMutex .Unlock ()
312+ // timeoutChecker will forcefully close connections when the context times
313+ // out.
314+ func (c * ClusterChecker ) timeoutChecker (ctx context.Context ) {
315+ <- ctx .Done ()
316+ if ctx .Err () == context .DeadlineExceeded {
317+ log .Infow ("check timeout fired" )
318+ // if the check timeouts close all connections and stop listening
319+ // (for example to avoid load balancers forward connections to us
320+ // since we aren't ready or in a bad state)
321+ c .updateDestAddress (nil )
322+ if c .stopListening {
323+ c .stopPollonProxy ()
322324 }
323325 }
324326}
325327
326- func (c * ClusterChecker ) Start () error {
327- checkOkCh := make (chan struct {})
328+ // checkLoop executes at predefined intervals the Check function. It'll force
329+ // close connections when a check function continuosly fails for more than a
330+ // timeout.
331+ func (c * ClusterChecker ) checkLoop (pctx context.Context ) error {
328332 checkCh := make (chan error )
329333 timerCh := time .NewTimer (0 ).C
330334
331- // TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
332- // if the Check method is blocked somewhere.
333- // The idomatic/cleaner solution will be to use a context instead of this
334- // TimeoutChecker but we have to change the libkv stores to support contexts.
335- go c .TimeoutChecker (checkOkCh )
335+ c .configMutex .Lock ()
336+ ctx , cancel := context .WithTimeout (pctx , c .proxyTimeout )
337+ c .configMutex .Unlock ()
336338
337339 for {
338340 select {
341+ case <- pctx .Done ():
342+ cancel ()
343+ return nil
339344 case <- timerCh :
345+ // start a new context if it's already done, this happens when the
346+ // context is timed out or cancelled.
347+ select {
348+ case <- ctx .Done ():
349+ c .configMutex .Lock ()
350+ ctx , cancel = context .WithTimeout (pctx , c .proxyTimeout )
351+ c .configMutex .Unlock ()
352+ default :
353+ }
354+
340355 go func () {
341- checkCh <- c .Check ( )
356+ checkCh <- c .check ( ctx )
342357 }()
343358 case err := <- checkCh :
344359 if err != nil {
345- // don't report check ok since it returned an error
360+ // if the check function returned an error then don't stop the
361+ // context so if it times out the TimeoutChecker will close
362+ // connections or it could be cancelled if the next check
363+ // succeeds before the timeout
346364 log .Infow ("check function error" , zap .Error (err ))
347365 } else {
348- // report that check was ok
349- checkOkCh <- struct {}{}
366+ // check was ok, so cancel the context and start a new one with a new TimeoutChecker
367+ cancel ()
368+
369+ c .configMutex .Lock ()
370+ ctx , cancel = context .WithTimeout (pctx , c .proxyTimeout )
371+ c .configMutex .Unlock ()
372+ go c .timeoutChecker (ctx )
350373 }
374+
351375 c .configMutex .Lock ()
352376 timerCh = time .NewTimer (c .proxyCheckInterval ).C
353377 c .configMutex .Unlock ()
354378
355379 case err := <- c .endPollonProxyCh :
356380 if err != nil {
381+ cancel ()
357382 return fmt .Errorf ("proxy error: %v" , err )
358383 }
359384 }
360385 }
361386}
362387
388+ func sigHandler (sigs chan os.Signal , cancel context.CancelFunc ) {
389+ s := <- sigs
390+ log .Debugw ("got signal" , "signal" , s )
391+ cancel ()
392+ }
393+
363394func Execute () {
364395 if err := flagutil .SetFlagsFromEnv (CmdProxy .PersistentFlags (), "STPROXY" ); err != nil {
365396 log .Fatal (err )
@@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) {
428459 }()
429460 }
430461
462+ ctx , cancel := context .WithCancel (context .Background ())
463+ sigs := make (chan os.Signal , 1 )
464+ signal .Notify (sigs , syscall .SIGINT , syscall .SIGTERM )
465+ go sigHandler (sigs , cancel )
466+
431467 clusterChecker , err := NewClusterChecker (uid , cfg )
432468 if err != nil {
433469 log .Fatalf ("cannot create cluster checker: %v" , err )
434470 }
435- if err = clusterChecker .Start ( ); err != nil {
471+ if err = clusterChecker .checkLoop ( ctx ); err != nil {
436472 log .Fatalf ("cluster checker ended with error: %v" , err )
437473 }
438474}
0 commit comments