Skip to content

Commit 46ff747

Browse files
authored
Merge pull request #1223 from MetPX/issue1214_another_scheduled_refactor
Issue1214 another scheduled refactor
2 parents ca6b04a + 858b2b3 commit 46ff747

File tree

5 files changed

+169
-164
lines changed

5 files changed

+169
-164
lines changed

sarracenia/flowcb/scheduled/__init__.py

Lines changed: 98 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -46,49 +46,18 @@ class hoho(Scheduled):
4646
4747
"""
4848

49-
def update_appointments(self,when):
50-
"""
51-
# make a flat list from values where comma separated on a single or multiple lines.
52-
53-
set self.appointments to a list of when something needs to be run during the current day.
54-
"""
55-
self.appointments=[]
56-
if self.o.scheduled_minute or self.o.scheduled_hour:
57-
for h in self.hours:
58-
for m in self.minutes:
59-
if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )):
60-
appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc )
61-
next_time = datetime.datetime.combine(when,appointment)
62-
self.appointments.append(next_time)
63-
else:
64-
pass # that time is passed for today.
65-
if self.o.scheduled_time:
66-
for time in self.sched_times:
67-
hour,minute=time.split(':')
68-
hour = int(hour)
69-
minute = int(minute)
70-
if ( hour > when.hour ) or ((hour == when.hour) and ( minute >= when.minute )):
71-
appointment = datetime.time(hour, minute, tzinfo=datetime.timezone.utc )
72-
next_time = datetime.datetime.combine(when,appointment)
73-
self.appointments.append(next_time)
74-
else:
75-
pass # that time is passed for today.
49+
def __init__(self,options,class_logger=logger):
50+
super().__init__(options,class_logger)
51+
# if logLevel is set for a subclass, apply it here too
52+
if hasattr(self.o,'logLevel') and logger:
53+
logger.setLevel(getattr(logging, self.o.logLevel.upper()))
7654

77-
self.appointments.sort()
78-
79-
80-
logger.info( f"for {when}: {json.dumps(list(map( lambda x: str(x), self.appointments))) } ")
81-
82-
83-
def __init__(self,options,logger=logger):
84-
super().__init__(options,logger)
8555
self.o.add_option( 'scheduled_interval', 'duration', 0 )
8656
self.o.add_option( 'scheduled_hour', 'list', [] )
8757
self.o.add_option( 'scheduled_minute', 'list', [] )
8858
self.o.add_option( 'scheduled_time', 'list', [] )
8959

9060
self.housekeeping_needed=False
91-
self.interrupted=None
9261

9362
self.sched_times = sum([ x.split(',') for x in self.o.scheduled_time],[])
9463
#self.sched_times.sort()
@@ -104,7 +73,6 @@ def __init__(self,options,logger=logger):
10473
self.minutes = list(map( lambda x: int(x), sched_min))
10574
#self.minutes.sort()
10675

107-
10876
self.default_wait=datetime.timedelta(seconds=300)
10977

11078
logger.debug( f'minutes: {self.minutes}')
@@ -114,104 +82,69 @@ def __init__(self,options,logger=logger):
11482
self.first_interval=True
11583

11684
if self.o.scheduled_interval <= 0 and not self.appointments:
117-
logger.info( f"no scheduled_interval or appointments (combination of scheduled_hour and scheduled_minute) set defaulting to every {self.default_wait.seconds} seconds" )
85+
logger.warning(f"no scheduled_interval or appointments (combination of scheduled_hour and scheduled_minute) set defaulting to every {self.default_wait.seconds} seconds")
11886

119-
def gather(self,messageCountMax):
120-
121-
# for next expected post
122-
self.wait_until_next()
123-
124-
if self.stop_requested or self.housekeeping_needed:
125-
return (False, [])
87+
# Determine the next gather time
88+
# For scheduled_interval, gather immediately after starting
89+
if self.o.scheduled_interval and self.o.scheduled_interval > 0:
90+
self.next_gather_time = now
91+
else:
92+
self.next_gather_time = None
93+
self.calc_next_gather_time()
12694

127-
logger.info('time to run')
95+
self.last_gather_time = 0
12896

129-
# always post the same file at different time
130-
gathered_messages = []
131-
132-
for relPath in self.o.path:
133-
st = FmdStat()
134-
m = sarracenia.Message.fromFileInfo(relPath, self.o, st)
135-
gathered_messages.append(m)
136-
137-
return (True, gathered_messages)
138-
139-
def on_housekeeping(self):
140-
141-
self.housekeeping_needed = False
142-
143-
144-
def wait_seconds(self,sleepfor):
145-
"""
146-
sleep for the given number of seconds, like time.sleep() but broken into
147-
shorter naps to be able to honour stop_requested, or when housekeeping is needed.
148-
97+
def update_appointments(self,when):
14998
"""
99+
# make a flat list from values where comma separated on a single or multiple lines.
150100
151-
housekeeping=datetime.timedelta(seconds=self.o.housekeeping)
152-
nap=datetime.timedelta(seconds=10)
153-
154-
if self.interrupted:
155-
sleepfor = self.interrupted
156-
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
157-
158-
# update sleep remaining based on how long other processing took.
159-
interruption_duration= now-self.interrupted_when
160-
sleepfor -= interruption_duration
161-
162-
if sleepfor < nap:
163-
nap=sleepfor
164-
165-
sleptfor=datetime.timedelta(seconds=0)
166-
167-
while sleepfor > datetime.timedelta(seconds=0):
168-
time.sleep(nap.total_seconds())
169-
if self.stop_requested:
170-
return
171-
172-
# how long is left to sleep.
173-
sleepfor -= nap
174-
self.interrupted=sleepfor
175-
self.interrupted_when = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
176-
177-
sleptfor += nap
178-
if sleptfor > housekeeping:
179-
self.housekeeping_needed=True
180-
return
181-
182-
# got to the end of the interval...
183-
self.interrupted=None
184-
185-
def wait_until( self, appointment ):
186-
187-
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
188-
189-
sleepfor=appointment-now
190-
191-
logger.info( f"appointment at: {appointment}, need to wait: {sleepfor})" )
192-
self.wait_seconds( sleepfor )
193-
194-
195-
def wait_until_next( self ):
196-
197-
one_min = datetime.timedelta(minutes=1)
198-
199-
if self.o.scheduled_interval > 0:
200-
if self.first_interval:
201-
self.first_interval=False
202-
return
101+
set self.appointments to a list of when something needs to be run during the current day.
102+
"""
103+
self.appointments=[]
104+
if self.o.scheduled_minute or self.o.scheduled_hour:
105+
for h in self.hours:
106+
for m in self.minutes:
107+
if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )):
108+
appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc )
109+
next_time = datetime.datetime.combine(when,appointment)
110+
self.appointments.append(next_time)
111+
else:
112+
pass # that time is passed for today.
113+
if self.o.scheduled_time:
114+
for time in self.sched_times:
115+
hour,minute=time.split(':')
116+
hour = int(hour)
117+
minute = int(minute)
118+
if ( hour > when.hour ) or ((hour == when.hour) and ( minute >= when.minute )):
119+
appointment = datetime.time(hour, minute, tzinfo=datetime.timezone.utc )
120+
next_time = datetime.datetime.combine(when,appointment)
121+
self.appointments.append(next_time)
122+
else:
123+
pass # that time is passed for today.
124+
125+
self.appointments.sort()
126+
logger.info( f"for {when}: {self.appointments_to_string()} ")
203127

204-
self.wait_seconds(datetime.timedelta(seconds=self.o.scheduled_interval))
205-
return
128+
def appointments_to_string(self):
129+
return json.dumps(list(map( lambda x: str(x), self.appointments)))
206130

207-
if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ) or self.o.scheduled_time:
208-
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
131+
def calc_next_gather_time(self, last_gather=0):
132+
if self.next_gather_time in self.appointments:
133+
self.appointments.remove(self.next_gather_time)
134+
if last_gather == 0:
135+
last_gather = datetime.datetime.now(datetime.timezone.utc)
136+
137+
# Scheduled interval overrides other options
138+
if self.o.scheduled_interval and self.o.scheduled_interval > 0:
139+
self.next_gather_time = last_gather + datetime.timedelta(seconds=self.o.scheduled_interval)
140+
logger.debug(f"next gather should be in {self.o.scheduled_interval}s, scheduled for {self.next_gather_time}")
141+
142+
# No scheduled interval --> try to use configured schedule
143+
elif len(self.o.scheduled_hour) > 0 or len(self.o.scheduled_minute) > 0 or len(self.o.scheduled_time) > 0:
209144
next_appointment=None
210145
missed_appointments=[]
211146
for t in self.appointments:
212-
# Need a little bit before or after apointment, to allow some wiggle room for sleep overhead.
213-
# See issue #1214 for more details
214-
if t < now < t + one_min:
147+
if last_gather < t:
215148
next_appointment=t
216149
break
217150
else:
@@ -230,21 +163,47 @@ def wait_until_next( self ):
230163
self.update_appointments(midnight)
231164
next_appointment=self.appointments[0]
232165

233-
self.wait_until(next_appointment)
234-
if self.interrupted:
235-
logger.info( f"sleep interrupted, returning for housekeeping." )
236-
else:
237-
self.appointments.remove(next_appointment)
238-
logger.info( f"ok {len(self.appointments)} appointments left today" )
239-
return
240-
241-
# default wait...
242-
243-
if self.first_interval:
244-
self.first_interval=False
245-
return
166+
self.next_gather_time = next_appointment
167+
logger.debug(f"next gather scheduled for {self.next_gather_time} from appointments {self.appointments_to_string()}")
168+
169+
# No scheduled interval and no scheduled hour/minutes/time
170+
else:
171+
self.next_gather_time = last_gather + self.default_wait
172+
logger.debug(f"next gather should be in {self.default_wait.seconds}s, scheduled for {self.next_gather_time} (default_wait")
173+
174+
def ready_to_gather(self):
175+
current_time = datetime.datetime.now(datetime.timezone.utc )
176+
if current_time >= self.next_gather_time and not self.stop_requested:
177+
late = (current_time - self.next_gather_time).total_seconds()
178+
logger.info(f"--> yes, now >= {self.next_gather_time} ({late}s late)")
179+
# NOTE: could also pass self.next_gather_time to calc_next_gather_time to get more precise intervals
180+
# See https://github.com/MetPX/sarracenia/issues/1214#issuecomment-2344711046 for discussion.
181+
self.calc_next_gather_time(current_time)
182+
self.last_gather_time = current_time
183+
return True
184+
else:
185+
logger.debug(f"--> no, next gather scheduled for {self.next_gather_time}")
186+
187+
def gather(self, messageCountMax):
188+
if self.ready_to_gather():
189+
# always post the same file at different time
190+
gathered_messages = []
191+
for relPath in self.o.path:
192+
st = FmdStat()
193+
m = sarracenia.Message.fromFileInfo(relPath, self.o, st)
194+
gathered_messages.append(m)
195+
return (True, gathered_messages)
196+
else:
197+
logger.debug(f"nothing to do")
198+
return (False, [])
246199

247-
self.wait_seconds(self.default_wait)
200+
def on_housekeeping(self):
201+
logger.info(f"next gather scheduled for {self.next_gather_time}")
202+
n_appointments = len(self.appointments)
203+
if n_appointments > 0:
204+
logger.info(f"{n_appointments} appointments remaining for today")
205+
logger.debug(f"remaining appointments: {self.appointments_to_string()}")
206+
self.housekeeping_needed = False
248207

249208
if __name__ == '__main__':
250209

sarracenia/flowcb/scheduled/http_with_metadata.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,7 @@ def __init__(self, options, logger=logger):
3434

3535
def gather(self,messageCountMax):
3636

37-
# for next expected post
38-
self.wait_until_next()
39-
40-
if self.stop_requested or self.housekeeping_needed:
37+
if not self.ready_to_gather():
4138
return (False, [])
4239

4340
logger.info('time to run')

sarracenia/flowcb/scheduled/poll.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,26 @@
11
import logging
2-
import requests
3-
import base64
4-
5-
import datetime
6-
import os
72
import sys
8-
import time
9-
10-
from datetime import date
113

124
import sarracenia
135
from sarracenia.flowcb.scheduled import Scheduled
146

157
logger = logging.getLogger(__name__)
168

17-
18-
19-
209
class Poll(Scheduled):
2110
"""
2211
2312
"""
13+
def __init__(self,options,logger=logger):
14+
super().__init__(options,logger)
2415

2516
def gather(self,messageCountMax): # placeholder
2617
"""
27-
2818
This gather aborts further gathers if the next interval has not yet arrived.
29-
3019
"""
31-
logger.info( f"waiting for next poll")
32-
self.wait_until_next()
33-
34-
return not (self.stop_requested or self.housekeeping_needed), []
20+
ready = self.ready_to_gather()
21+
if ready:
22+
logger.info("poll routine will run")
23+
return (ready, [])
3524

3625

3726
if __name__ == '__main__':

sarracenia/flowcb/scheduled/wiski.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,10 @@ def gather(self,messageCountMax): # placeholder
135135

136136
messages=[]
137137

138-
self.wait_until_next()
138+
if not self.ready_to_gather():
139+
return (False, [])
139140

141+
start_time = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
140142
while (1):
141143
if self.stop_requested or self.housekeeping_needed:
142144
return (False, messages)
@@ -156,6 +158,9 @@ def gather(self,messageCountMax): # placeholder
156158
break
157159
else:
158160
logger.info( f"request failed. Status code: {response.status_code}: {response.text}" )
161+
162+
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
163+
self.housekeeping_needed = ((now-start_time).total_seconds() >= self.o.housekeeping)
159164

160165
k = KIWIS(self.main_url, headers=headers )
161166

0 commit comments

Comments
 (0)