Skip to content

Commit

Permalink
Fix Stop() race conditions, and control stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed Sep 17, 2019
1 parent 8a6907c commit b2c9a71
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 56 deletions.
122 changes: 66 additions & 56 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,21 @@ func (e *Events) BindChan(event EventType, channel chan Event) {

// Stop stops Watch() loops and disconnects from the event stream.
// No further callback messages will fire after this is called.
// This also closes all channels that were passed to BindChan.
func (e *Events) Stop() {
// Closes all channels that were passed to BindChan if closeChans=true.
// Stop writing to the channels with Custom() before calling Stop().
func (e *Events) Stop(closeChans bool) {
defer func() { e.Running = false }()
if e.Running {
e.custom(eventStreamStop, -1, -1, "") // signal
if e.stream != nil {
_ = e.stream.Close()
e.stream = nil
}
close(e.eventChan)
}
if !closeChans {
return
}
for _, chans := range e.eventChans {
for i := range chans {
close(chans[i])
Expand All @@ -72,8 +81,10 @@ func (e *Events) Stop() {
func (e *Events) UnbindAll() {
e.binds.Lock()
e.chans.Lock()
defer e.binds.Unlock()
defer e.chans.Unlock()
defer func() {
e.binds.Unlock()
e.chans.Unlock()
}()
e.eventBinds = make(map[EventType][]func(Event))
e.eventChans = make(map[EventType][]chan Event)
}
Expand All @@ -96,25 +107,25 @@ func (e *Events) UnbindFunc(event EventType) {
// Watch kicks off the routines to watch the eventStream and fire callback bindings.
// If your application relies on event stream messages, call this at least once
// to connect the stream. If you have no call back functions or channels then do not
// call this.
// call this. Call Stop() to close the connection when you're done with it.
func (e *Events) Watch(retryInterval time.Duration, refreshOnConfigChange bool) {
e.Running = true
e.eventChan = make(chan Event, 1000) // allow 1000 events to buffer
go e.eventChannelSelector(refreshOnConfigChange)
e.eventStreamScanner(retryInterval)
go e.eventStreamSelector(refreshOnConfigChange, retryInterval)
go e.eventStreamScanner()
}

// Custom fires an event into the running event Watcher. Any functions or
// channels bound to the CUSTOM Event type will also be called.
func (e *Events) Custom(cameraNum int, msg string) {
if !e.Running {
return
}
e.custom(EventStreamCustom, -11000, cameraNum, msg)
}

// custom allows a quick way to make events.
func (e *Events) custom(t EventType, id int, cam int, msg string) {
if !e.Running {
return
}
e.eventChan <- Event{
Time: time.Now().Round(time.Second),
When: time.Now().Round(time.Second),
Expand All @@ -128,65 +139,64 @@ func (e *Events) custom(t EventType, id int, cam int, msg string) {
/* INTERFACE HELPER METHODS FOLLOW */

// eventStreamScanner connects to the securityspy event stream and fires events into a channel.
func (e *Events) eventStreamScanner(retryInterval time.Duration) {
body, scanner := e.eventStreamConnect(retryInterval)
if scanner != nil {
scanner.Split(scanLinesCR)
func (e *Events) eventStreamScanner() {
defer e.custom(EventStreamDisconnect, -10000, -1, "Connection Closed")
if err := e.eventStreamConnect(); err != nil {
return
}
for {
if !e.Running {
if body != nil {
_ = body.Close()
}
return // we all done here. stop got called
}
e.custom(EventStreamConnect, -9999, -1, EventNames[EventStreamConnect])
scanner := bufio.NewScanner(e.stream)
scanner.Split(scanLinesCR)
for scanner.Scan() {
// Constantly scan for new events, then report them to the event channel.
if scanner != nil && scanner.Scan() {
if text := scanner.Text(); strings.Count(text, " ") > 2 {
e.eventChan <- e.UnmarshalEvent(text)
}
}
if err := scanner.Err(); err != nil {
e.custom(EventStreamDisconnect, -10000, -1, err.Error())
_ = body.Close()
time.Sleep(retryInterval)
body, scanner = e.eventStreamConnect(retryInterval)
scanner.Split(scanLinesCR)
if text := scanner.Text(); strings.Count(text, " ") > 2 {
e.eventChan <- e.UnmarshalEvent(text)
}
}
}

// eventStreamConnect establishes a connection to the event stream and passes off the http Reader.
func (e *Events) eventStreamConnect(retryInterval time.Duration) (io.ReadCloser, *bufio.Scanner) {
httpClient := e.server.api.getClient(0)
resp, err := e.server.api.secReq("++eventStream", url.Values{"version": []string{"3"}}, httpClient)
for err != nil {
// This for loops attempts to reconnect if the stream is down.
e.custom(EventStreamDisconnect, -9999, -1, EventNames[EventStreamDisconnect]+" "+err.Error())
time.Sleep(retryInterval)
if !e.Running {
return nil, nil // Stopped externally while sleeping, bail out.
}
resp, err = e.server.api.secReq("++eventStream", url.Values{"version": []string{"3"}}, httpClient)
func (e *Events) eventStreamConnect() error {
if e.stream != nil {
_ = e.stream.Close()
e.stream = nil
}
e.custom(EventStreamConnect, -9999, -1, EventNames[EventStreamConnect])
return resp.Body, bufio.NewScanner(resp.Body)
httpClient := e.server.api.getClient(0) // timeout=0
httpParams := url.Values{"version": []string{"3"}}
resp, err := e.server.api.secReq("++eventStream", httpParams, httpClient)
if err != nil {
return err
}
e.stream = resp.Body
return nil
}

// eventChannelSelector watches the event channel.
// eventStreamSelector watches the event channel.
// Fires bound event call back functions.
func (e *Events) eventChannelSelector(refreshOnConfigChange bool) {
// Also reconnects to the event stream if the connection fails.
// There is a "loop" that occurs among the eventStream* methods.
// Stop() properly handles the shutdown of the loop, so if can be safely restarted w/ Watch()
func (e *Events) eventStreamSelector(refreshOnConfigChange bool, retryInterval time.Duration) {
Loop:
for event := range e.eventChan {
if refreshOnConfigChange && event.Type == EventConfigChange {
go e.serverRefresh()
switch event.Type {
case eventStreamStop:
break Loop // Stop() called.
case EventConfigChange:
if refreshOnConfigChange {
go e.serverRefresh()
}
case EventStreamDisconnect:
// reconnect to event stream
go func() {
time.Sleep(retryInterval)
e.eventStreamScanner()
}()
}
// these can punt and fire in any order.
go func() {
e.binds.RLock()
event.callBacks(e.eventBinds)
e.binds.RUnlock()
}()
// these fire in order.
// All events run binds.
e.binds.RLock()
event.callBacks(e.eventBinds)
e.binds.RUnlock()
e.chans.RLock()
event.eventChans(e.eventChans)
e.chans.RUnlock()
Expand Down
3 changes: 3 additions & 0 deletions events_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package securityspy

import (
"io"
"sync"
"time"

Expand Down Expand Up @@ -48,6 +49,7 @@ const (
// watcher routine is active.
type Events struct {
server *Server
stream io.ReadCloser
eventChan chan Event
eventBinds map[EventType][]func(event Event)
eventChans map[EventType][]chan Event
Expand Down Expand Up @@ -100,6 +102,7 @@ const (
EventWatcherRefreshed EventType = "REFRESH"
EventWatcherRefreshFail EventType = "REFRESHFAIL"
EventStreamCustom EventType = "CUSTOM"
eventStreamStop EventType = "STOP"
)

// EventNames contains the human readable names for each event.
Expand Down

0 comments on commit b2c9a71

Please sign in to comment.