@@ -2,14 +2,25 @@ package icingaredis
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"github.com/icinga/icinga-go-library/database"
6
7
"github.com/icinga/icinga-go-library/redis"
8
+ "github.com/icinga/icinga-go-library/types"
7
9
"github.com/icinga/icingadb/pkg/icingadb/v1"
8
10
"github.com/stretchr/testify/require"
9
11
"testing"
10
12
"time"
11
13
)
12
14
15
+ var latencies = []struct {
16
+ name string
17
+ latency time.Duration
18
+ }{
19
+ {"instantly" , 0 },
20
+ {"1us" , time .Microsecond },
21
+ {"20ms" , 20 * time .Millisecond },
22
+ }
23
+
13
24
type testEntity struct {
14
25
v1.EntityWithoutChecksum `json:",inline"`
15
26
Data int `json:"data"`
@@ -60,15 +71,6 @@ func TestCreateEntities(t *testing.T) {
60
71
},
61
72
}
62
73
63
- latencies := []struct {
64
- name string
65
- latency time.Duration
66
- }{
67
- {"instantly" , 0 },
68
- {"1us" , time .Microsecond },
69
- {"20ms" , 20 * time .Millisecond },
70
- }
71
-
72
74
for _ , st := range subtests {
73
75
t .Run (st .name , func (t * testing.T ) {
74
76
for _ , l := range latencies {
@@ -147,3 +149,226 @@ func TestCreateEntities(t *testing.T) {
147
149
})
148
150
}
149
151
}
152
+
153
+ type testEntityWithChecksum struct {
154
+ v1.EntityWithChecksum `json:",inline"`
155
+ Data types.Binary `json:"data"`
156
+ }
157
+
158
+ func newTestEntityWithChecksum (id , checksum , data []byte ) * testEntityWithChecksum {
159
+ return & testEntityWithChecksum {
160
+ EntityWithChecksum : v1.EntityWithChecksum {
161
+ EntityWithoutChecksum : v1.EntityWithoutChecksum {IdMeta : v1.IdMeta {Id : id }},
162
+ ChecksumMeta : v1.ChecksumMeta {PropertiesChecksum : checksum },
163
+ },
164
+ Data : data ,
165
+ }
166
+ }
167
+
168
+ func newEntityWithChecksum (id , checksum []byte ) * v1.EntityWithChecksum {
169
+ return & v1.EntityWithChecksum {
170
+ EntityWithoutChecksum : v1.EntityWithoutChecksum {IdMeta : v1.IdMeta {Id : id }},
171
+ ChecksumMeta : v1.ChecksumMeta {PropertiesChecksum : checksum },
172
+ }
173
+ }
174
+
175
+ func TestSetChecksums (t * testing.T ) {
176
+ subtests := []struct {
177
+ name string
178
+ input []database.Entity
179
+ checksums map [string ]database.Entity
180
+ output []database.Entity
181
+ error bool
182
+ }{
183
+ {name : "nil" },
184
+ {
185
+ name : "empty" ,
186
+ checksums : map [string ]database.Entity {},
187
+ },
188
+ {
189
+ name : "one" ,
190
+ input : []database.Entity {newTestEntityWithChecksum ([]byte {1 }, nil , []byte {3 })},
191
+ checksums : map [string ]database.Entity {"01" : newEntityWithChecksum ([]byte {1 }, []byte {2 })},
192
+ output : []database.Entity {newTestEntityWithChecksum ([]byte {1 }, []byte {2 }, []byte {3 })},
193
+ },
194
+ {
195
+ name : "two" ,
196
+ input : []database.Entity {
197
+ newTestEntityWithChecksum ([]byte {4 }, nil , []byte {6 }),
198
+ newTestEntityWithChecksum ([]byte {7 }, nil , []byte {9 }),
199
+ },
200
+ checksums : map [string ]database.Entity {
201
+ "04" : newEntityWithChecksum ([]byte {4 }, []byte {5 }),
202
+ "07" : newEntityWithChecksum ([]byte {7 }, []byte {8 }),
203
+ },
204
+ output : []database.Entity {
205
+ newTestEntityWithChecksum ([]byte {4 }, []byte {5 }, []byte {6 }),
206
+ newTestEntityWithChecksum ([]byte {7 }, []byte {8 }, []byte {9 }),
207
+ },
208
+ },
209
+ {
210
+ name : "three" ,
211
+ input : []database.Entity {
212
+ newTestEntityWithChecksum ([]byte {10 }, nil , []byte {12 }),
213
+ newTestEntityWithChecksum ([]byte {13 }, nil , []byte {15 }),
214
+ newTestEntityWithChecksum ([]byte {16 }, nil , []byte {18 }),
215
+ },
216
+ checksums : map [string ]database.Entity {
217
+ "0a" : newEntityWithChecksum ([]byte {10 }, []byte {11 }),
218
+ "0d" : newEntityWithChecksum ([]byte {13 }, []byte {14 }),
219
+ "10" : newEntityWithChecksum ([]byte {16 }, []byte {17 }),
220
+ },
221
+ output : []database.Entity {
222
+ newTestEntityWithChecksum ([]byte {10 }, []byte {11 }, []byte {12 }),
223
+ newTestEntityWithChecksum ([]byte {13 }, []byte {14 }, []byte {15 }),
224
+ newTestEntityWithChecksum ([]byte {16 }, []byte {17 }, []byte {18 }),
225
+ },
226
+ },
227
+ {
228
+ name : "superfluous-checksum" ,
229
+ checksums : map [string ]database.Entity {"13" : newEntityWithChecksum ([]byte {19 }, []byte {20 })},
230
+ },
231
+ {
232
+ name : "missing-checksum" ,
233
+ input : []database.Entity {newTestEntityWithChecksum ([]byte {22 }, nil , []byte {24 })},
234
+ error : true ,
235
+ },
236
+ }
237
+
238
+ for _ , st := range subtests {
239
+ t .Run (st .name , func (t * testing.T ) {
240
+ for _ , concurrency := range []int {1 , 2 , 30 } {
241
+ t .Run (fmt .Sprint (concurrency ), func (t * testing.T ) {
242
+ for _ , l := range latencies {
243
+ t .Run (l .name , func (t * testing.T ) {
244
+ ctx , cancel := context .WithCancel (context .Background ())
245
+ defer cancel ()
246
+
247
+ input := make (chan database.Entity , 1 )
248
+ go func () {
249
+ defer close (input )
250
+
251
+ for _ , v := range st .input {
252
+ if l .latency > 0 {
253
+ select {
254
+ case <- time .After (l .latency ):
255
+ case <- ctx .Done ():
256
+ return
257
+ }
258
+ }
259
+
260
+ select {
261
+ case input <- v :
262
+ case <- ctx .Done ():
263
+ return
264
+ }
265
+ }
266
+ }()
267
+
268
+ output , errs := SetChecksums (ctx , input , st .checksums , concurrency )
269
+
270
+ require .NotNil (t , output , "output channel should not be nil" )
271
+ require .NotNil (t , errs , "error channel should not be nil" )
272
+
273
+ for _ , expected := range st .output {
274
+ select {
275
+ case actual , ok := <- output :
276
+ require .True (t , ok , "output channel should not be closed, yet" )
277
+ if concurrency == 1 || l .latency >= time .Millisecond {
278
+ require .Equal (t , expected , actual )
279
+ }
280
+ case <- time .After (time .Second ):
281
+ require .Fail (t , "output channel should not block" )
282
+ }
283
+ }
284
+
285
+ if st .error {
286
+ select {
287
+ case err , ok := <- errs :
288
+ require .True (t , ok , "error channel should not be closed, yet" )
289
+ require .Error (t , err )
290
+ case <- time .After (time .Second ):
291
+ require .Fail (t , "error channel should not block" )
292
+ }
293
+ }
294
+
295
+ select {
296
+ case actual , ok := <- output :
297
+ require .False (t , ok , "output channel should be closed, got %#v" , actual )
298
+ case <- time .After (time .Second ):
299
+ require .Fail (t , "output channel should not block" )
300
+ }
301
+
302
+ select {
303
+ case err , ok := <- errs :
304
+ require .False (t , ok , "error channel should be closed, got %#v" , err )
305
+ case <- time .After (time .Second ):
306
+ require .Fail (t , "error channel should not block" )
307
+ }
308
+ })
309
+ }
310
+ })
311
+ }
312
+
313
+ for _ , concurrency := range []int {0 , - 1 , - 2 , - 30 } {
314
+ t .Run (fmt .Sprint (concurrency ), func (t * testing.T ) {
315
+ ctx , cancel := context .WithCancel (context .Background ())
316
+ defer cancel ()
317
+
318
+ input := make (chan database.Entity , 1 )
319
+ input <- nil
320
+
321
+ output , errs := SetChecksums (ctx , input , st .checksums , concurrency )
322
+
323
+ require .NotNil (t , output , "output channel should not be nil" )
324
+ require .NotNil (t , errs , "error channel should not be nil" )
325
+
326
+ select {
327
+ case v , ok := <- output :
328
+ require .False (t , ok , "output channel should be closed, got %#v" , v )
329
+ case <- time .After (time .Second ):
330
+ require .Fail (t , "output channel should not block" )
331
+ }
332
+
333
+ select {
334
+ case err , ok := <- errs :
335
+ require .False (t , ok , "error channel should be closed, got %#v" , err )
336
+ case <- time .After (time .Second ):
337
+ require .Fail (t , "error channel should not block" )
338
+ }
339
+
340
+ select {
341
+ case input <- nil :
342
+ require .Fail (t , "input channel should not be read from" )
343
+ default :
344
+ }
345
+ })
346
+ }
347
+ })
348
+ }
349
+
350
+ t .Run ("cancel-ctx" , func (t * testing.T ) {
351
+ ctx , cancel := context .WithCancel (context .Background ())
352
+ cancel ()
353
+
354
+ output , errs := SetChecksums (ctx , make (chan database.Entity ), map [string ]database.Entity {}, 1 )
355
+
356
+ require .NotNil (t , output , "output channel should not be nil" )
357
+ require .NotNil (t , errs , "error channel should not be nil" )
358
+
359
+ select {
360
+ case v , ok := <- output :
361
+ require .False (t , ok , "output channel should be closed, got %#v" , v )
362
+ case <- time .After (time .Second ):
363
+ require .Fail (t , "output channel should not block" )
364
+ }
365
+
366
+ select {
367
+ case err , ok := <- errs :
368
+ require .True (t , ok , "error channel should not be closed, yet" )
369
+ require .Error (t , err )
370
+ case <- time .After (time .Second ):
371
+ require .Fail (t , "error channel should not block" )
372
+ }
373
+ })
374
+ }
0 commit comments