Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimize expensive API calls #245

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
158 changes: 82 additions & 76 deletions connector/docker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package connector

import (
"fmt"
"github.com/op/go-logging"
"strings"
"sync"
"time"

"github.com/bcicen/ctop/connector/collector"
"github.com/bcicen/ctop/connector/manager"
Expand All @@ -31,7 +31,8 @@ type StatusUpdate struct {
type Docker struct {
client *api.Client
containers map[string]*container.Container
needsRefresh chan string // container IDs requiring refresh
needsRefresh []string // container IDs requiring refresh
lastRefresh time.Time
statuses chan StatusUpdate
closed chan struct{}
lock sync.RWMutex
Expand All @@ -44,12 +45,11 @@ func NewDocker() (Connector, error) {
return nil, err
}
cm := &Docker{
client: client,
containers: make(map[string]*container.Container),
needsRefresh: make(chan string, 60),
statuses: make(chan StatusUpdate, 60),
closed: make(chan struct{}),
lock: sync.RWMutex{},
client: client,
containers: make(map[string]*container.Container),
statuses: make(chan StatusUpdate, 60),
closed: make(chan struct{}),
lock: sync.RWMutex{},
}

// query info as pre-flight healthcheck
Expand Down Expand Up @@ -109,7 +109,11 @@ func (cm *Docker) watchEvents() {
if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=create id=%s", e.ID)
}
cm.needsRefresh <- e.ID
c := cm.MustGet(e.ID)
c.SetMeta("name", manager.ShortName(e.Actor.Attributes["name"]))
c.SetMeta("image", e.Actor.Attributes["image"])
c.SetState("created")
cm.requestRefresh(c)
case "destroy":
if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=destroy id=%s", e.ID)
Expand All @@ -130,92 +134,99 @@ func (cm *Docker) watchEvents() {
close(cm.closed)
}

func portsFormat(ports map[api.Port][]api.PortBinding) string {
var exposed []string
var published []string
// Mark all container IDs for refresh
func (cm *Docker) refreshAll() {
cm.updateContainers(true)
}

for k, v := range ports {
if len(v) == 0 {
exposed = append(exposed, string(k))
continue
}
for _, binding := range v {
s := fmt.Sprintf("%s:%s -> %s", binding.HostIP, binding.HostPort, k)
published = append(published, s)
func (cm *Docker) updateContainers(all bool) {
opts := api.ListContainersOptions{All: true}
if !all {
opts.Filters = map[string][]string{
"id": cm.needsRefresh,
}
}
allContainers, err := cm.client.ListContainers(opts)
if err != nil {
log.Errorf("%s (%T)", err.Error(), err)
return
}
if all {
cm.cleanupDestroyedContainers(allContainers)
}

return strings.Join(append(exposed, published...), "\n")
}

func ipsFormat(networks map[string]api.ContainerNetwork) string {
var ips []string

for k, v := range networks {
s := fmt.Sprintf("%s:%s", k, v.IPAddress)
ips = append(ips, s)
for _, i := range allContainers {
c := cm.MustGet(i.ID)
c.SetMeta("name", manager.ShortName(i.Names[0]))
c.SetMeta("image", i.Image)
c.SetMeta("IPs", manager.IpsFormat(i.Networks.Networks))
c.SetMeta("ports", manager.PortsFormatArr(i.Ports))
c.SetMeta("created", time.Unix(i.Created, 0).Format("Mon Jan 2 15:04:05 2006"))
parseStatusHealth(c, i.Status)
c.SetState(i.State)
}
cm.lastRefresh = time.Now()
cm.needsRefresh = nil
}

return strings.Join(ips, "\n")
func (cm *Docker) requestRefresh(c *container.Container) {
cm.needsRefresh = append(cm.needsRefresh, c.Id)
refreshRequestedOn := time.Now()
go func() {
time.Sleep(5 * time.Second)
if refreshRequestedOn.Before(cm.lastRefresh) {
return
}
// batch refresh
cm.updateContainers(false)
}()
}

func (cm *Docker) refresh(c *container.Container) {
insp, found, failed := cm.inspect(c.Id)
if failed {
return
func (cm *Docker) cleanupDestroyedContainers(allContainers []api.APIContainers) {
var nonExistingContainers []string
for _, oldContainer := range cm.containers {
if !cm.hasContainer(oldContainer.Id, allContainers) {
nonExistingContainers = append(nonExistingContainers, oldContainer.Id)
}
}
// remove container if no longer exists
if !found {
cm.delByID(c.Id)
return
// remove containers that no longer exists
for _, cid := range nonExistingContainers {
cm.delByID(cid)
}
c.SetMeta("name", shortName(insp.Name))
c.SetMeta("image", insp.Config.Image)
c.SetMeta("IPs", ipsFormat(insp.NetworkSettings.Networks))
c.SetMeta("ports", portsFormat(insp.NetworkSettings.Ports))
c.SetMeta("created", insp.Created.Format("Mon Jan 2 15:04:05 2006"))
c.SetMeta("health", insp.State.Health.Status)
c.SetMeta("[ENV-VAR]", strings.Join(insp.Config.Env, ";"))
c.SetState(insp.State.Status)
}

func (cm *Docker) inspect(id string) (insp *api.Container, found bool, failed bool) {
c, err := cm.client.InspectContainer(id)
if err != nil {
if _, notFound := err.(*api.NoSuchContainer); notFound {
return c, false, false
func (cm *Docker) hasContainer(oldContainerId string, newContainers []api.APIContainers) bool {
for _, newContainer := range newContainers {
if newContainer.ID == oldContainerId {
return true
}
// other error e.g. connection failed
log.Errorf("%s (%T)", err.Error(), err)
return c, false, true
}
return c, true, false
return false
}

// Mark all container IDs for refresh
func (cm *Docker) refreshAll() {
opts := api.ListContainersOptions{All: true}
allContainers, err := cm.client.ListContainers(opts)
if err != nil {
log.Errorf("%s (%T)", err.Error(), err)
func parseStatusHealth(c *container.Container, status string) {
// Status may look like:
// Up About a minute (healthy)
// Up 7 minutes (unhealthy)
var health string
if strings.Contains(status, "(healthy)") {
health = "healthy"
} else if strings.Contains(status, "(unhealthy)") {
health = "unhealthy"
} else {
return
}

for _, i := range allContainers {
c := cm.MustGet(i.ID)
c.SetMeta("name", shortName(i.Names[0]))
c.SetState(i.State)
cm.needsRefresh <- c.Id
}
c.SetMeta("health", health)
}

func (cm *Docker) Loop() {
ticker := time.NewTicker(5 * time.Minute)
for {
select {
case id := <-cm.needsRefresh:
c := cm.MustGet(id)
cm.refresh(c)
case <-ticker.C:
cm.refreshAll()
case <-cm.closed:
ticker.Stop()
return
}
}
Expand Down Expand Up @@ -285,8 +296,3 @@ func (cm *Docker) All() (containers container.Containers) {
cm.lock.Unlock()
return containers
}

// use primary container name
func shortName(name string) string {
return strings.TrimPrefix(name, "/")
}
38 changes: 38 additions & 0 deletions connector/manager/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package manager

import (
"fmt"
"github.com/bcicen/ctop/logging"
"github.com/bcicen/ctop/models"
api "github.com/fsouza/go-dockerclient"
"github.com/pkg/errors"
"io"
"os"
"strings"
)

var (
log = logging.Init()
)

type Docker struct {
Expand Down Expand Up @@ -102,6 +109,37 @@ func (dc *Docker) Exec(cmd []string) error {
})
}

func (dc *Docker) inspect(id string) (insp *api.Container, found bool, err error) {
c, err := dc.client.InspectContainer(id)
if err != nil {
if _, notFound := err.(*api.NoSuchContainer); notFound {
return c, false, nil
}
// other error e.g. connection failed
log.Errorf("%s (%T)", err.Error(), err)
return c, false, err
}
return c, true, nil
}

func (dc *Docker) Inspect() (models.Meta, error) {
insp, found, err := dc.inspect(dc.id)
if !found {
return nil, err
}
newMeta := models.Meta{}
newMeta["name"] = ShortName(insp.Name)
newMeta["image"] = insp.Config.Image
newMeta["IPs"] = IpsFormat(insp.NetworkSettings.Networks)
newMeta["ports"] = PortsFormat(insp.NetworkSettings.Ports)
newMeta["created"] = insp.Created.Format("Mon Jan 2 15:04:05 2006")
newMeta["health"] = insp.State.Health.Status
newMeta["[ENV-VAR]"] = strings.Join(insp.Config.Env, ";")
newMeta["state"] = insp.State.Status

return newMeta, nil
}

func (dc *Docker) Start() error {
c, err := dc.client.InspectContainer(dc.id)
if err != nil {
Expand Down
61 changes: 61 additions & 0 deletions connector/manager/docker_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package manager

import (
"fmt"
api "github.com/fsouza/go-dockerclient"
"strings"
)

func PortsFormat(ports map[api.Port][]api.PortBinding) string {
var exposed []string
var published []string

for k, v := range ports {
if len(v) == 0 {
// 3306/tcp
exposed = append(exposed, string(k))
continue
}
for _, binding := range v {
// 0.0.0.0:3307 -> 3306/tcp
s := fmt.Sprintf("%s:%s -> %s", binding.HostIP, binding.HostPort, k)
published = append(published, s)
}
}

return strings.Join(append(exposed, published...), "\n")
}

func PortsFormatArr(ports []api.APIPort) string {
var exposed []string
var published []string
for _, binding := range ports {
if binding.PublicPort != 0 {
// 0.0.0.0:3307 -> 3306/tcp
s := fmt.Sprintf("%s:%d -> %d/%s", binding.IP, binding.PublicPort, binding.PrivatePort, binding.Type)
published = append(published, s)
} else {
// 3306/tcp
s := fmt.Sprintf("%d/%s", binding.PrivatePort, binding.Type)
exposed = append(exposed, s)
}
}

return strings.Join(append(exposed, published...), "\n")
}

func IpsFormat(networks map[string]api.ContainerNetwork) string {
var ips []string

for k, v := range networks {
s := fmt.Sprintf("%s:%s", k, v.IPAddress)
ips = append(ips, s)
}

return strings.Join(ips, "\n")
}

// use primary container name
func ShortName(name string) string {
return strings.TrimPrefix(name, "/")
}
6 changes: 5 additions & 1 deletion connector/manager/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package manager

import "errors"
import (
"errors"
"github.com/bcicen/ctop/models"
)

var ActionNotImplErr = errors.New("action not implemented")

Expand All @@ -12,4 +15,5 @@ type Manager interface {
Unpause() error
Restart() error
Exec(cmd []string) error
Inspect() (models.Meta, error)
}
6 changes: 6 additions & 0 deletions connector/manager/mock.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package manager

import models "github.com/bcicen/ctop/models"

type Mock struct{}

func NewMock() *Mock {
Expand Down Expand Up @@ -33,3 +35,7 @@ func (m *Mock) Restart() error {
func (m *Mock) Exec(cmd []string) error {
return ActionNotImplErr
}

func (m *Mock) Inspect() (models.Meta, error) {
return nil, nil
}
6 changes: 6 additions & 0 deletions connector/manager/runc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package manager

import "github.com/bcicen/ctop/models"

type Runc struct{}

func NewRunc() *Runc {
Expand Down Expand Up @@ -33,3 +35,7 @@ func (rc *Runc) Restart() error {
func (rc *Runc) Exec(cmd []string) error {
return ActionNotImplErr
}

func (rc *Runc) Inspect() (models.Meta, error) {
return nil, nil
}
Loading