-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstore.go
More file actions
109 lines (91 loc) · 2.39 KB
/
store.go
File metadata and controls
109 lines (91 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package gofeat
import (
"context"
"errors"
"fmt"
"time"
)
type Store struct {
storage Storage
features []Feature
}
func New(cfg Config) (*Store, error) {
if len(cfg.Features) == 0 {
return nil, errors.New("gofeat: at least one feature required")
}
for i, f := range cfg.Features {
if f.Name == "" {
return nil, errors.New("gofeat: feature name required")
}
if f.Aggregate == nil {
return nil, errors.New("gofeat: feature aggregate required")
}
if f.Window == nil {
cfg.Features[i].Window = Lifetime()
}
}
storage := cfg.Storage
if storage == nil {
storage = NewMemoryStorage(cfg.TTL)
}
return &Store{
storage: storage,
features: cfg.Features,
}, nil
}
func (s *Store) Push(ctx context.Context, entityID string, events ...Event) error {
for i, e := range events {
if err := s.validateEvent(e); err != nil {
return fmt.Errorf("invalid event %d: %w", i, err)
}
}
return s.storage.Push(ctx, entityID, events...)
}
func (s *Store) Get(ctx context.Context, entityID string) (Result, error) {
return s.GetAt(ctx, entityID, time.Now().UTC())
}
func (s *Store) GetAt(ctx context.Context, entityID string, at time.Time) (Result, error) {
events, err := s.storage.Get(ctx, entityID, at)
if err != nil {
return Result{}, err
}
values := make(map[string]any, len(s.features))
for _, f := range s.features {
selected := f.Window.Select(events, at)
agg := f.Aggregate()
for _, e := range selected {
agg.Add(e)
}
values[f.Name] = agg.Result()
}
return newResult(values), nil
}
func (s *Store) BatchGet(ctx context.Context, entityIDs ...string) (map[string]Result, error) {
return s.BatchGetAt(ctx, time.Now().UTC(), entityIDs...)
}
func (s *Store) BatchGetAt(ctx context.Context, at time.Time, entityIDs ...string) (map[string]Result, error) {
results := make(map[string]Result, len(entityIDs))
for _, entityID := range entityIDs {
result, err := s.GetAt(ctx, entityID, at)
if err != nil {
return nil, err
}
results[entityID] = result
}
return results, nil
}
func (s *Store) Evict(ctx context.Context) error {
return s.storage.Evict(ctx)
}
func (s *Store) Stats(ctx context.Context) (StorageStats, error) {
return s.storage.Stats(ctx)
}
func (s *Store) Close() error {
return s.storage.Close()
}
func (s *Store) validateEvent(e Event) error {
if e.Timestamp.Location() != time.UTC {
return errors.New("timestamp must be in UTC")
}
return nil
}