44import os
55import math
66from pprint import pprint
7- from typing import Optional , Any , Sequence
8- from datetime import datetime
7+ from typing import Optional , Any , Sequence , cast
8+ from datetime import datetime , timedelta
99from uuid import UUID
1010
1111import json
7979
8080# pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements
8181# pylint: disable=invalid-name, too-many-lines, too-many-return-statements
82+ # pylint: disable=too-many-instance-attributes
8283# ============================================================================
8384class CrawlOperator (BaseOperator ):
8485 """CrawlOperator Handler"""
@@ -93,6 +94,8 @@ class CrawlOperator(BaseOperator):
9394
9495 min_avail_storage_ratio : float
9596
97+ paused_expires_delta : timedelta
98+
9699 def __init__ (self , * args ):
97100 super ().__init__ (* args )
98101
@@ -110,6 +113,13 @@ def __init__(self, *args):
110113 os .environ .get ("CRAWLER_MIN_AVAIL_STORAGE_RATIO" ) or 0
111114 )
112115
116+ # time in minutes before paused crawl is stopped - default is 7 days
117+ paused_crawl_limit_minutes = int (
118+ os .environ .get ("PAUSED_CRAWL_LIMIT_MINUTES" , "10080" )
119+ )
120+
121+ self .paused_expires_delta = timedelta (minutes = paused_crawl_limit_minutes )
122+
113123 def init_routes (self , app ):
114124 """init routes for this operator"""
115125
@@ -160,7 +170,7 @@ async def sync_crawls(self, data: MCSyncData):
160170 scale = spec .get ("scale" , 1 ),
161171 started = data .parent ["metadata" ]["creationTimestamp" ],
162172 stopping = spec .get ("stopping" , False ),
163- paused = spec .get ("paused" , False ),
173+ paused_at = str_to_date ( spec .get ("pausedAt" ) ),
164174 timeout = spec .get ("timeout" ) or 0 ,
165175 max_crawl_size = int (spec .get ("maxCrawlSize" ) or 0 ),
166176 scheduled = spec .get ("manual" ) != "1" ,
@@ -265,11 +275,25 @@ async def sync_crawls(self, data: MCSyncData):
265275 status .scale = 1
266276
267277 # stopping paused crawls
268- if crawl .paused and crawl .stopping :
269- status .stopReason = "stopped_by_user"
270- status .stopping = True
271- print (f"Paused crawl stopped by user, id: { crawl .id } " )
272- await self .mark_finished (crawl , status , "stopped_by_user" )
278+ if crawl .paused_at :
279+ stop_reason : Optional [StopReason ] = None
280+ state : Optional [TYPE_NON_RUNNING_STATES ] = None
281+ # Check if pause expiry limit is reached and if so, stop crawl
282+ if dt_now () >= (crawl .paused_at + self .paused_expires_delta ):
283+ print (f"Paused crawl expiry reached, stopping crawl, id: { crawl .id } " )
284+ stop_reason = "stopped_pause_expired"
285+ state = "stopped_pause_expired"
286+
287+ # Check if paused crawl was stopped manually
288+ elif crawl .stopping :
289+ print (f"Paused crawl stopped by user, id: { crawl .id } " )
290+ stop_reason = "stopped_by_user"
291+ state = "stopped_by_user"
292+
293+ if stop_reason and state :
294+ status .stopping = True
295+ status .stopReason = stop_reason
296+ await self .mark_finished (crawl , status , state )
273297
274298 children = self ._load_redis (params , status , data .children )
275299
@@ -335,7 +359,9 @@ async def sync_crawls(self, data: MCSyncData):
335359
336360 for i in range (0 , status .scale ):
337361 children .extend (
338- self ._load_crawler (params , i , status , data .children , crawl .paused )
362+ self ._load_crawler (
363+ params , i , status , data .children , bool (crawl .paused_at )
364+ )
339365 )
340366
341367 return {
@@ -858,12 +884,12 @@ async def sync_crawl_state(
858884 ):
859885 # mark as waiting (if already running)
860886 await self .set_state (
861- "waiting_capacity" if not crawl .paused else "paused" ,
887+ "waiting_capacity" if not crawl .paused_at else "paused" ,
862888 status ,
863889 crawl ,
864890 allowed_from = (
865891 RUNNING_AND_STARTING_ONLY
866- if not crawl .paused
892+ if not crawl .paused_at
867893 else RUNNING_AND_WAITING_STATES
868894 ),
869895 )
@@ -884,7 +910,7 @@ async def sync_crawl_state(
884910
885911 # crawler pods already shut down, remove redis pause key
886912 # for simple resume later
887- if crawl .paused :
913+ if crawl .paused_at :
888914 await redis .delete (f"{ crawl .id } :paused" )
889915
890916 elif crawler_running and not redis :
@@ -896,7 +922,7 @@ async def sync_crawl_state(
896922 return status
897923
898924 # only get here if at least one crawler pod is running
899- if crawl .paused :
925+ if crawl .paused_at :
900926 await redis .set (f"{ crawl .id } :paused" , "1" )
901927
902928 # update lastActiveTime if crawler is running
0 commit comments