forked from benbjohnson/litestream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
199 lines (166 loc) · 4.19 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package litestream
import (
"context"
"fmt"
"path/filepath"
"strings"
"sync"
"github.com/fsnotify/fsnotify"
"golang.org/x/sync/errgroup"
)
// Server represents the top-level container.
// It manage databases and routes global file system events.
type Server struct {
mu sync.Mutex
dbs map[string]*DB // databases by path
watcher *fsnotify.Watcher
ctx context.Context
cancel func()
errgroup errgroup.Group
}
// NewServer returns a new instance of Server.
func NewServer() *Server {
return &Server{
dbs: make(map[string]*DB),
}
}
// Open initializes the server and begins watching for file system events.
func (s *Server) Open() error {
var err error
s.watcher, err = fsnotify.NewWatcher()
if err != nil {
return err
}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.errgroup.Go(func() error {
if err := s.monitor(s.ctx); err != nil && err != context.Canceled {
return fmt.Errorf("server monitor error: %w", err)
}
return nil
})
return nil
}
// Close shuts down the server and all databases it manages.
func (s *Server) Close() (err error) {
// Cancel context and wait for goroutines to finish.
s.cancel()
if e := s.errgroup.Wait(); e != nil && err == nil {
err = e
}
s.mu.Lock()
defer s.mu.Unlock()
if s.watcher != nil {
if e := s.watcher.Close(); e != nil && err == nil {
err = fmt.Errorf("close watcher: %w", e)
}
}
for _, db := range s.dbs {
if e := db.Close(); e != nil && err == nil {
err = fmt.Errorf("close db: path=%s err=%w", db.Path(), e)
}
}
s.dbs = make(map[string]*DB)
return err
}
// DB returns the database with the given path, if it's managed by the server.
func (s *Server) DB(path string) *DB {
s.mu.Lock()
defer s.mu.Unlock()
return s.dbs[path]
}
// DBs returns a slice of all databases managed by the server.
func (s *Server) DBs() []*DB {
s.mu.Lock()
defer s.mu.Unlock()
a := make([]*DB, 0, len(s.dbs))
for _, db := range s.dbs {
a = append(a, db)
}
return a
}
// Watch adds a database path to be managed by the server.
func (s *Server) Watch(path string, fn func(path string) (*DB, error)) error {
s.mu.Lock()
defer s.mu.Unlock()
// Instantiate DB from factory function.
db, err := fn(path)
if err != nil {
return fmt.Errorf("new database: %w", err)
}
// Start watching the database for changes.
if err := db.Open(); err != nil {
return fmt.Errorf("open database: %w", err)
}
s.dbs[path] = db
// Watch for changes on the database file & WAL.
if err := s.watcher.Add(filepath.Dir(path)); err != nil {
return fmt.Errorf("watch db file: %w", err)
}
// Kick off an initial sync.
select {
case db.NotifyCh() <- struct{}{}:
default:
}
return nil
}
// Unwatch removes a database path from being managed by the server.
func (s *Server) Unwatch(path string) error {
s.mu.Lock()
defer s.mu.Unlock()
db := s.dbs[path]
if db == nil {
return nil
}
delete(s.dbs, path)
// Stop watching for changes on the database WAL.
if err := s.watcher.Remove(filepath.Dir(path)); err != nil {
return fmt.Errorf("unwatch file: %w", err)
}
// Shut down database.
if err := db.Close(); err != nil {
return fmt.Errorf("close db: %w", err)
}
return nil
}
func (s *Server) isWatched(event fsnotify.Event) bool {
path := event.Name
path = strings.TrimSuffix(path, "-wal")
if _, ok := s.dbs[path]; ok {
return true
}
return false
}
// monitor runs in a separate goroutine and dispatches notifications to managed DBs.
func (s *Server) monitor(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-s.watcher.Events:
if !s.isWatched(event) {
continue
}
if err := s.dispatchFileEvent(ctx, event); err != nil {
return err
}
}
}
}
// dispatchFileEvent dispatches a notification to the database which owns the file.
func (s *Server) dispatchFileEvent(ctx context.Context, event fsnotify.Event) error {
path := event.Name
path = strings.TrimSuffix(path, "-wal")
db := s.DB(path)
if db == nil {
return nil
}
// TODO: If deleted, remove from server and close DB.
select {
case <-ctx.Done():
return ctx.Err()
case db.NotifyCh() <- struct{}{}:
return nil // notify db
default:
return nil // already pending notification, skip
}
}