Skip to content

Commit 80a76e5

Browse files
committed
WIP
1 parent efec262 commit 80a76e5

File tree

4 files changed

+138
-0
lines changed

4 files changed

+138
-0
lines changed

pkg/icingadb/sla.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"github.com/icinga/icingadb/pkg/com"
7+
"github.com/icinga/icingadb/pkg/contracts"
8+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
9+
"github.com/icinga/icingadb/pkg/types"
10+
"github.com/icinga/icingadb/pkg/utils"
11+
"github.com/pkg/errors"
12+
"golang.org/x/sync/errgroup"
13+
"time"
14+
)
15+
16+
type SlaHistoryTrail struct {
17+
v1.EntityWithoutChecksum `json:",inline"`
18+
EnvironmentId types.Binary `json:"environment_id"`
19+
ObjectType string `json:"object_type"`
20+
HostId types.Binary `json:"host_id"`
21+
ServiceId types.Binary `json:"service_id"`
22+
EventTime types.UnixMilli `json:"event_time"`
23+
EventType string `json:"event_type"`
24+
}
25+
26+
func CheckableToSlaTrailEntities(ctx context.Context, checkables <-chan contracts.Entity, eventType string) (<-chan contracts.Entity, <-chan error) {
27+
entities := make(chan contracts.Entity)
28+
g, ctx := errgroup.WithContext(ctx)
29+
30+
g.Go(func() error {
31+
defer close(entities)
32+
33+
for {
34+
select {
35+
case checkable, ok := <-checkables:
36+
if !ok {
37+
return nil
38+
}
39+
40+
id, err := generateBinaryId()
41+
if err != nil {
42+
return errors.Wrap(err, "can't generate sla history trail ID")
43+
}
44+
45+
entity := &SlaHistoryTrail{
46+
EntityWithoutChecksum: v1.EntityWithoutChecksum{
47+
IdMeta: v1.IdMeta{Id: id},
48+
},
49+
ObjectType: utils.Name(checkable),
50+
EventTime: types.UnixMilli(time.Now()),
51+
EventType: eventType,
52+
}
53+
54+
switch ptr := checkable.(type) {
55+
case *v1.Host:
56+
entity.HostId = ptr.Id
57+
entity.EnvironmentId = ptr.EnvironmentId
58+
case *v1.Service:
59+
entity.HostId = ptr.HostId
60+
entity.ServiceId = ptr.Id
61+
entity.EnvironmentId = ptr.EnvironmentId
62+
}
63+
64+
entities <- entity
65+
case <-ctx.Done():
66+
return ctx.Err()
67+
}
68+
}
69+
})
70+
71+
return entities, com.WaitAsync(g)
72+
}
73+
74+
// GenerateBinaryId generates a 20 byte length random id
75+
func generateBinaryId() (types.Binary, error) {
76+
id := make([]byte, 20)
77+
_, err := rand.Read(id)
78+
79+
return id, err
80+
}

pkg/icingadb/sync.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
106106
g, ctx := errgroup.WithContext(ctx)
107107
stat := getCounterForEntity(delta.Subject.Entity())
108108

109+
var subjectType = delta.Subject.Entity()
110+
109111
// Create
110112
if len(delta.Create) > 0 {
111113
s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
@@ -131,6 +133,19 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
131133
g.Go(func() error {
132134
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
133135
})
136+
137+
switch subjectType.(type) {
138+
case *v1.Host, *v1.Service:
139+
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
140+
141+
var slaTrails <-chan contracts.Entity
142+
slaTrails, errs := CheckableToSlaTrailEntities(ctx, entities, "create")
143+
com.ErrgroupReceive(g, errs)
144+
145+
g.Go(func() error {
146+
return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat))
147+
})
148+
}
134149
}
135150

136151
// Update
@@ -163,6 +178,18 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
163178
g.Go(func() error {
164179
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
165180
})
181+
182+
switch subjectType.(type) {
183+
case *v1.Host, *v1.Service:
184+
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
185+
var slaTrails <-chan contracts.Entity
186+
slaTrails, errors := CheckableToSlaTrailEntities(ctx, delta.Delete.Entities(ctx), "delete")
187+
com.ErrgroupReceive(g, errors)
188+
189+
g.Go(func() error {
190+
return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat))
191+
})
192+
}
166193
}
167194

168195
return g.Wait()

schema/mysql/schema.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,19 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id binary(20) NOT NULL,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
object_type enum('host', 'service') NOT NULL,
1328+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1329+
service_id binary(20) NOT NULL COMMENT 'service.id (may reference already deleted services)',
1330+
1331+
PRIMARY KEY (id),
1332+
1333+
event_type enum('delete', 'create') NOT NULL,
1334+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred'
1335+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1336+
13241337
CREATE TABLE icingadb_schema (
13251338
id int unsigned NOT NULL AUTO_INCREMENT,
13261339
version smallint unsigned NOT NULL,

tests/object_sync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
310310
t.Skip()
311311
})
312312

313+
t.Run("Sla History Trail", func(t *testing.T) {
314+
t.Parallel()
315+
316+
assert.Eventuallyf(t, func() bool {
317+
var count int
318+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL"))
319+
require.NoError(t, err, "querying hosts sla history trail should not fail")
320+
return count == len(data.Hosts)
321+
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")
322+
323+
assert.Eventuallyf(t, func() bool {
324+
var count int
325+
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL"))
326+
require.NoError(t, err, "querying services sla history trail should not fail")
327+
return count == len(data.Services)
328+
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
329+
})
330+
313331
t.Run("RuntimeUpdates", func(t *testing.T) {
314332
t.Parallel()
315333

0 commit comments

Comments
 (0)