diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index a8f7e847b..b6d5b9e0d 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -14,6 +14,7 @@ import ( "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/pkg/profile" "go.uber.org/zap" "golang.org/x/sync/errgroup" "os" @@ -34,6 +35,8 @@ func main() { } func run() int { + defer profile.Start(profile.NoShutdownHook, profile.ProfilePath(".")).Stop() + cmd := command.New() logs, err := logging.NewLogging( cmd.Config.Logging.Level, @@ -165,6 +168,18 @@ func run() int { }) } + g.Go(func() error { + defer utils.Timed(time.Now(), func(elapsed time.Duration) { + logger.Infow("Finished config sync", zap.Duration("duration", elapsed)) + }) + + wg.Wait() + + sig <- syscall.SIGINT + + return nil + }) + wg.Add(1) g.Go(func() error { defer wg.Done() diff --git a/go.mod b/go.mod index 6634625a5..bcc2d0c7e 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,9 @@ require ( github.com/google/uuid v1.3.0 github.com/jessevdk/go-flags v1.5.0 github.com/jmoiron/sqlx v1.3.4 + github.com/json-iterator/go v1.1.12 github.com/pkg/errors v0.9.1 + github.com/pkg/profile v1.6.0 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.19.1 golang.org/x/exp v0.0.0-20210514180818-737f94c0881e diff --git a/go.sum b/go.sum index a73b5e51a..7d9e0ddd9 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -45,6 +46,8 @@ github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LF github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w= github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -55,6 +58,10 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -69,6 +76,8 @@ github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+t github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM= +github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/internal/internal.go b/internal/internal.go index 7352d4092..64017600a 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -1,10 +1,12 @@ package internal import ( - "encoding/json" + jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + // CantDecodeHex wraps the given error with the given string that cannot be hex-decoded. func CantDecodeHex(err error, s string) error { return errors.Wrapf(err, "can't decode hex %q", s) @@ -37,12 +39,12 @@ func CantUnmarshalYAML(err error, v interface{}) error { // MarshalJSON calls json.Marshal and wraps any resulting errors. func MarshalJSON(v interface{}) ([]byte, error) { - b, err := json.Marshal(v) + b, err := json.Marshal(&v) return b, errors.Wrapf(err, "can't marshal JSON from %T", v) } // UnmarshalJSON calls json.Unmarshal and wraps any resulting errors. func UnmarshalJSON(data []byte, v interface{}) error { - return errors.Wrapf(json.Unmarshal(data, v), "can't unmarshal JSON into %T", v) + return errors.Wrapf(json.Unmarshal(data, &v), "can't unmarshal JSON into %T", v) } diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index 515941442..a520559af 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -1,5 +1,7 @@ package contracts +import "crypto/sha1" + // Entity is implemented by every type Icinga DB should synchronize. type Entity interface { Fingerprinter @@ -13,12 +15,7 @@ type Fingerprinter interface { } // ID is a unique identifier of an entity. -type ID interface { - // String returns the string representation form of the ID. - // The String method is used to use the ID in functions - // where it needs to be compared or hashed. - String() string -} +type ID Checksum // IDer is implemented by every entity that uniquely identifies itself. type IDer interface { @@ -32,13 +29,7 @@ type Equaler interface { } // Checksum is a unique identifier of an entity. -type Checksum interface { - Equaler - // String returns the string representation form of the Checksum. - // The String method is used to use the Checksum in functions - // where it needs to be compared or hashed. - String() string -} +type Checksum [sha1.Size]byte // Checksumer is implemented by every entity with a checksum. type Checksumer interface { diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index b5bdcf2c4..31cc740f6 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -63,7 +63,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract } numActual++ - id := actualValue.ID().String() + id := actualValue.ID() if desiredValue, ok := desired[id]; ok { delete(desired, id) if update != nil && !checksumsMatch(actualValue, desiredValue) { @@ -81,7 +81,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract } numDesired++ - id := desiredValue.ID().String() + id := desiredValue.ID() if actualValue, ok := actual[id]; ok { delete(actual, id) if update != nil && !checksumsMatch(actualValue, desiredValue) { @@ -118,5 +118,5 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract func checksumsMatch(a, b contracts.Entity) bool { c1 := a.(contracts.Checksumer).Checksum() c2 := b.(contracts.Checksumer).Checksum() - return c1.Equal(c2) + return c1 == c2 } diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go index b40050e10..bb4439293 100644 --- a/pkg/icingadb/entitiesbyid.go +++ b/pkg/icingadb/entitiesbyid.go @@ -2,17 +2,18 @@ package icingadb import ( "context" + "encoding/hex" "github.com/icinga/icingadb/pkg/contracts" ) -// EntitiesById is a map of key-contracts.Entity pairs. -type EntitiesById map[string]contracts.Entity +// EntitiesById is a map contracts.ID-contracts.Entity pairs. +type EntitiesById map[contracts.ID]contracts.Entity // Keys returns the keys. func (ebi EntitiesById) Keys() []string { keys := make([]string, 0, len(ebi)) for k := range ebi { - keys = append(keys, k) + keys = append(keys, hex.EncodeToString(k[:])) } return keys @@ -21,8 +22,8 @@ func (ebi EntitiesById) Keys() []string { // IDs returns the contracts.ID of the entities. func (ebi EntitiesById) IDs() []interface{} { ids := make([]interface{}, 0, len(ebi)) - for _, v := range ebi { - ids = append(ids, v.(contracts.IDer).ID()) + for k := range ebi { + ids = append(ids, k) } return ids diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 44cd87b55..c3a6e9f55 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -276,7 +276,7 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { case <-time.After(timeout): query := "DELETE FROM icingadb_instance " + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?" - heartbeat := types.UnixMilli(time.Now().Add(-timeout)) + heartbeat := types.UnixMilli{NullInt64: sql.NullInt64{Valid: true, Int64: utils.UnixMilli(time.Now().Add(-timeout))}} result, err := h.db.ExecContext(h.ctx, query, h.instanceId, s.EnvironmentID(), s.EndpointId, heartbeat) if err != nil { diff --git a/pkg/icingadb/v1/history/comment.go b/pkg/icingadb/v1/history/comment.go index d3a5743a2..88c0458e1 100644 --- a/pkg/icingadb/v1/history/comment.go +++ b/pkg/icingadb/v1/history/comment.go @@ -16,13 +16,16 @@ func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter { } // ID implements part of the contracts.Entity interface. -func (che CommentHistoryEntity) ID() contracts.ID { - return che.CommentId +func (che CommentHistoryEntity) ID() (id contracts.ID) { + copy(id[:], che.CommentId) + return } // SetID implements part of the contracts.Entity interface. func (che *CommentHistoryEntity) SetID(id contracts.ID) { - che.CommentId = id.(types.Binary) + cp := make(types.Binary, len(id)) + copy(cp, id[:]) + che.CommentId = cp } type CommentHistoryUpserter struct { diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index 448c9ecf7..eb6e2810e 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -16,13 +16,16 @@ func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter { } // ID implements part of the contracts.Entity interface. -func (dhe DowntimeHistoryEntity) ID() contracts.ID { - return dhe.DowntimeId +func (dhe DowntimeHistoryEntity) ID() (id contracts.ID) { + copy(id[:], dhe.DowntimeId) + return } // SetID implements part of the contracts.Entity interface. func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) { - dhe.DowntimeId = id.(types.Binary) + cp := make(types.Binary, len(id)) + copy(cp, id[:]) + dhe.DowntimeId = cp } type DowntimeHistoryUpserter struct { diff --git a/pkg/icingadb/v1/history/meta.go b/pkg/icingadb/v1/history/meta.go index b2c817f92..c3d311db5 100644 --- a/pkg/icingadb/v1/history/meta.go +++ b/pkg/icingadb/v1/history/meta.go @@ -22,13 +22,14 @@ func (hte HistoryTableEntity) Fingerprint() contracts.Fingerprinter { } // ID implements part of the contracts.Entity interface. -func (hte HistoryTableEntity) ID() contracts.ID { - return hte.Id +func (hte HistoryTableEntity) ID() (id contracts.ID) { + copy(id[:], hte.Id.UUID[:]) + return } // SetID implements part of the contracts.Entity interface. func (hte *HistoryTableEntity) SetID(id contracts.ID) { - hte.Id = id.(types.UUID) + copy(hte.Id.UUID[:], id[:]) } // Upsert implements the contracts.Upserter interface. @@ -48,13 +49,14 @@ func (he HistoryEntity) Fingerprint() contracts.Fingerprinter { } // ID implements part of the contracts.Entity interface. -func (he HistoryEntity) ID() contracts.ID { - return he.Id +func (he HistoryEntity) ID() (id contracts.ID) { + copy(id[:], he.Id.UUID[:]) + return } // SetID implements part of the contracts.Entity interface. func (he *HistoryEntity) SetID(id contracts.ID) { - he.Id = id.(types.UUID) + copy(he.Id.UUID[:], id[:]) } // Upsert implements the contracts.Upserter interface. diff --git a/pkg/icingadb/v1/meta.go b/pkg/icingadb/v1/meta.go index 9266751e4..8a61b39f2 100644 --- a/pkg/icingadb/v1/meta.go +++ b/pkg/icingadb/v1/meta.go @@ -11,13 +11,16 @@ type ChecksumMeta struct { } // Checksum implements part of the contracts.Checksumer interface. -func (m ChecksumMeta) Checksum() contracts.Checksum { - return m.PropertiesChecksum +func (m ChecksumMeta) Checksum() (checksum contracts.Checksum) { + copy(checksum[:], m.PropertiesChecksum) + return } // SetChecksum implements part of the contracts.Checksumer interface. func (m *ChecksumMeta) SetChecksum(checksum contracts.Checksum) { - m.PropertiesChecksum = checksum.(types.Binary) + cp := make(types.Binary, len(checksum)) + copy(cp, checksum[:]) + m.PropertiesChecksum = cp } // EnvironmentMeta is embedded by every type which belongs to an environment. @@ -31,13 +34,16 @@ type IdMeta struct { } // ID implements part of the contracts.IDer interface. -func (m IdMeta) ID() contracts.ID { - return m.Id +func (m IdMeta) ID() (id contracts.ID) { + copy(id[:], m.Id) + return } // SetID implements part of the contracts.IDer interface. func (m *IdMeta) SetID(id contracts.ID) { - m.Id = id.(types.Binary) + cp := make(types.Binary, len(id)) + copy(cp, id[:]) + m.Id = cp } // NameMeta is embedded by every type with a name. diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 9176dba7d..2577e77c7 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -6,7 +6,7 @@ import ( "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" - "github.com/icinga/icingadb/pkg/types" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -46,9 +46,9 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc for i := 0; i < concurrent; i++ { g.Go(func() error { for pair := range pairs { - var id types.Binary + var id v1.IdMeta - if err := id.UnmarshalText([]byte(pair.Field)); err != nil { + if err := id.Id.UnmarshalText([]byte(pair.Field)); err != nil { return errors.Wrapf(err, "can't create ID from value %#v", pair.Field) } @@ -56,7 +56,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc if err := internal.UnmarshalJSON([]byte(pair.Value), e); err != nil { return err } - e.SetID(id) + e.SetID(id.ID()) select { case entities <- e: @@ -78,7 +78,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc // SetChecksums concurrently streams from the given entities and // sets their checksums using the specified map and // streams the results on a returned channel. -func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[contracts.ID]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { entitiesWithChecksum := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) @@ -90,7 +90,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu for i := 0; i < concurrent; i++ { g.Go(func() error { for entity := range entities { - if checksumer, ok := checksums[entity.ID().String()]; ok { + if checksumer, ok := checksums[entity.ID()]; ok { entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) } else { return errors.Errorf("no checksum for %#v", entity) diff --git a/pkg/types/binary.go b/pkg/types/binary.go index 00a5417f2..ab8aa30f6 100644 --- a/pkg/types/binary.go +++ b/pkg/types/binary.go @@ -127,7 +127,6 @@ func (binary Binary) Value() (driver.Value, error) { // Assert interface compliance. var ( - _ contracts.ID = (*Binary)(nil) _ encoding.TextMarshaler = (*Binary)(nil) _ encoding.TextUnmarshaler = (*Binary)(nil) _ json.Marshaler = (*Binary)(nil) diff --git a/pkg/types/unix_milli.go b/pkg/types/unix_milli.go index 3f14a013f..891f8a07a 100644 --- a/pkg/types/unix_milli.go +++ b/pkg/types/unix_milli.go @@ -13,31 +13,37 @@ import ( ) // UnixMilli is a nullable millisecond UNIX timestamp in databases and JSON. -type UnixMilli time.Time +type UnixMilli struct { + sql.NullInt64 +} // Time returns the time.Time conversion of UnixMilli. func (t UnixMilli) Time() time.Time { - return time.Time(t) + return utils.FromUnixMilli(t.Int64) } // MarshalJSON implements the json.Marshaler interface. // Marshals to milliseconds. Supports JSON null. func (t UnixMilli) MarshalJSON() ([]byte, error) { - if time.Time(t).IsZero() { + if !t.Valid { return nil, nil } - return []byte(strconv.FormatInt(utils.UnixMilli(time.Time(t)), 10)), nil + return []byte(strconv.FormatInt(t.Int64, 10)), nil } // UnmarshalText implements the encoding.TextUnmarshaler interface. func (t *UnixMilli) UnmarshalText(text []byte) error { - parsed, err := strconv.ParseFloat(string(text), 64) + ms, err := strconv.ParseFloat(string(text), 64) if err != nil { return internal.CantParseFloat64(err, string(text)) } - *t = UnixMilli(utils.FromUnixMilli(int64(parsed))) + *t = UnixMilli{sql.NullInt64{ + Int64: int64(ms), + Valid: true, + }} + return nil } @@ -52,8 +58,11 @@ func (t *UnixMilli) UnmarshalJSON(data []byte) error { if err != nil { return internal.CantParseFloat64(err, string(data)) } - tt := utils.FromUnixMilli(int64(ms)) - *t = UnixMilli(tt) + + *t = UnixMilli{sql.NullInt64{ + Int64: int64(ms), + Valid: true, + }} return nil } @@ -65,12 +74,15 @@ func (t *UnixMilli) Scan(src interface{}) error { return nil } - v, ok := src.(int64) + ms, ok := src.(int64) if !ok { return errors.Errorf("bad int64 type assertion from %#v", src) } - tt := utils.FromUnixMilli(v) - *t = UnixMilli(tt) + + *t = UnixMilli{sql.NullInt64{ + Int64: ms, + Valid: true, + }} return nil } @@ -78,11 +90,11 @@ func (t *UnixMilli) Scan(src interface{}) error { // Value implements the driver.Valuer interface. // Returns milliseconds. Supports SQL NULL. func (t UnixMilli) Value() (driver.Value, error) { - if t.Time().IsZero() { + if !t.Valid { return nil, nil } - return utils.UnixMilli(t.Time()), nil + return t.Int64, nil } // Assert interface compliance.