Skip to content

Commit

Permalink
Support readinessIndicator file in thick multus-daemon
Browse files Browse the repository at this point in the history
This change supports readinessIndicatorfile in multus-daemon and
refines goroutine termination in case of signal with context.
  • Loading branch information
s1061123 committed Aug 1, 2023
1 parent bf79dc3 commit ad6a132
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 90 deletions.
95 changes: 63 additions & 32 deletions cmd/multus-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"io"
"net/http"
"os"
"os/signal"
"os/user"
"path/filepath"
"sync"
"syscall"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -35,6 +38,7 @@ import (
srv "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types"

"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand All @@ -54,32 +58,47 @@ func main() {
os.Exit(4)
}

configWatcherStopChannel := make(chan struct{})
configWatcherDoneChannel := make(chan struct{})
serverStopChannel := make(chan struct{})
serverDoneChannel := make(chan struct{})
multusConfigFile := ""
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

daemonConf, err := cniServerConfig(*configFilePath)
if err != nil {
os.Exit(1)
}

if err := startMultusDaemon(daemonConf, serverStopChannel, serverDoneChannel); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

multusConf, err := config.ParseMultusConfig(*configFilePath)
if err != nil {
logging.Panicf("startMultusDaemon failed to load the multus configuration: %v", err)
os.Exit(1)
}

logging.Verbosef("multus-daemon started")

if multusConf.ReadinessIndicatorFile != "" {
// Check readinessindicator file before daemon launch
logging.Verbosef("Readiness Indicator file check")
if err := types.GetReadinessIndicatorFile(multusConf.ReadinessIndicatorFile); err != nil {
_ = logging.Errorf("have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", multusConf.ReadinessIndicatorFile, err)
os.Exit(1)
}
logging.Verbosef("Readiness Indicator file check done!")
}

if err := startMultusDaemon(ctx, daemonConf, serverDoneChannel); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

// Wait until daemon ready
logging.Verbosef("API readiness check")
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
logging.Panicf("failed to ready multus-daemon socket: %v", err)
os.Exit(1)
}
logging.Verbosef("API readiness check done!")

// Generate multus CNI config from current CNI config
if multusConf.MultusConfigFile == "auto" {
Expand Down Expand Up @@ -111,39 +130,51 @@ func main() {
}
logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig)

if err := configManager.PersistMultusConfig(generatedMultusConfig); err != nil {
multusConfigFile, err = configManager.PersistMultusConfig(generatedMultusConfig)
if err != nil {
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
}

go func(stopChannel chan<- struct{}, doneChannel chan<- struct{}) {
if err := configManager.MonitorPluginConfiguration(configWatcherStopChannel, doneChannel); err != nil {
go func(ctx context.Context, doneChannel chan<- struct{}) {
if err := configManager.MonitorPluginConfiguration(ctx, doneChannel); err != nil {
_ = logging.Errorf("error watching file: %v", err)
}
}(configWatcherStopChannel, configWatcherDoneChannel)

<-configWatcherDoneChannel
}(ctx, configWatcherDoneChannel)
} else {
if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil {
logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err)
}
}

serverDone := false
configWatcherDone := false
for {
select {
case <-configWatcherDoneChannel:
logging.Verbosef("ConfigWatcher done")
configWatcherDone = true
case <-serverDoneChannel:
logging.Verbosef("multus-server done.")
serverDone = true
signalCh := make(chan os.Signal, 16)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
for sig := range signalCh {
logging.Verbosef("caught %v, stopping...", sig)
cancel()
}
}()

if serverDone && configWatcherDone {
return
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-configWatcherDoneChannel
logging.Verbosef("ConfigWatcher done")
if multusConf.MultusConfigFile == "auto" && multusConfigFile != "" {
logging.Verbosef("Delete old config @ %v", multusConfigFile)
os.Remove(multusConfigFile)
}
}
wg.Done()
}()

wg.Add(1)
go func() {
<-serverDoneChannel
logging.Verbosef("multus-server done.")
wg.Done()
}()

wg.Wait()
// never reached
}

Expand All @@ -157,7 +188,7 @@ func waitUntilAPIReady(socketPath string) error {
})
}

func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}, done chan struct{}) error {
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, done chan struct{}) error {
if user, err := user.Current(); err != nil || user.Uid != "0" {
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
}
Expand All @@ -172,11 +203,11 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}
}

if daemonConfig.MetricsPort != nil {
go utilwait.Until(func() {
go utilwait.UntilWithContext(ctx, func(ctx context.Context) {
http.Handle("/metrics", promhttp.Handler())
logging.Debugf("metrics port: %d", *daemonConfig.MetricsPort)
logging.Debugf("metrics: %s", http.ListenAndServe(fmt.Sprintf(":%d", *daemonConfig.MetricsPort), nil))
}, 0, stopCh)
}, 0)
}

l, err := srv.GetListener(api.SocketPath(daemonConfig.SocketDir))
Expand All @@ -186,13 +217,13 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}

server.SetKeepAlivesEnabled(false)
go func() {
utilwait.Until(func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
if err := server.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0, stopCh)
server.Shutdown(context.TODO())
}, 0)
server.Shutdown(context.Background())
close(done)
}()

Expand Down
17 changes: 2 additions & 15 deletions pkg/multus/multus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ var (
releaseStatus = ""
)

var (
pollDuration = 1000 * time.Millisecond
pollTimeout = 45 * time.Second
)

// PrintVersionString ...
func PrintVersionString() string {
return fmt.Sprintf("version:%s(%s%s), commit:%s, date:%s", version, gitTreeState, releaseStatus, commit, date)
Expand Down Expand Up @@ -575,11 +570,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
}

if n.ReadinessIndicatorFile != "" {
err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) {
_, err := os.Stat(n.ReadinessIndicatorFile)
return err == nil, nil
})
if err != nil {
if err := types.GetReadinessIndicatorFile(n.ReadinessIndicatorFile); err != nil {
return nil, cmdErr(k8sArgs, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err)
}
}
Expand Down Expand Up @@ -813,11 +804,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
}

if in.ReadinessIndicatorFile != "" {
err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) {
_, err := os.Stat(in.ReadinessIndicatorFile)
return err == nil, nil
})
if err != nil {
if err := types.GetReadinessIndicatorFile(in.ReadinessIndicatorFile); err != nil {
return cmdErr(k8sArgs, "PollImmediate error waiting for ReadinessIndicatorFile (on del): %v", err)
}
}
Expand Down
Loading

0 comments on commit ad6a132

Please sign in to comment.