Skip to content

Commit

Permalink
use croniter for parse scheduler and singleton pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetSaroha committed Nov 18, 2024
1 parent b4eed4f commit 5ffc7c3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 50 deletions.
28 changes: 27 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rich = ">=10.11.0"
shellingham = ">=1.5.4"
jsonschema = ">=4"
paramiko = "^3.5.0"
croniter = "^5.0.1"

[tool.poetry.group.dev.dependencies]
containers-sugar = ">=1.11.1"
Expand Down
101 changes: 52 additions & 49 deletions src/makim/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import asyncio

from datetime import datetime
from pathlib import Path
from typing import Any

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from croniter import croniter

from makim.core import Makim
from makim.logs import MakimError, MakimLogs
Expand All @@ -16,6 +19,13 @@
class MakimScheduler:
"""Manages scheduled tasks for Makim using APScheduler."""

_instance = None # Singleton pattern for scheduler instance

def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(MakimScheduler, cls).__new__(cls)
return cls._instance

def __init__(self, makim_instance: 'Makim'):
self.makim = makim_instance
self.db_path = Path.home() / '.makim' / 'jobs.db'
Expand All @@ -34,6 +44,11 @@ def __init__(self, makim_instance: 'Makim'):
job_defaults={'coalesce': False, 'max_instances': 3},
)

# Listen for job events
self.scheduler.add_listener(
self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR
)

def _ensure_db_directory(self) -> None:
"""Ensure the database directory exists."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -46,14 +61,47 @@ def _execute_task(self, task_name: str, args: dict) -> None:
asyncio.set_event_loop(loop)

# Run the task
self.makim.run({'task': task_name, **args})
default_args = self.makim.global_data.get('default_args', {})
merged_args = {**default_args, **(args or {})}
self.makim.run({'task': task_name, **merged_args})
except Exception as e:
MakimLogs.print_error(
f'Error executing scheduled task {task_name}: {e!s}'
)
finally:
loop.close()

def _log_job_event(self, event) -> None:
"""Log job execution success or failure."""
job = self.scheduler.get_job(event.job_id)
if event.exception:
MakimLogs.print_error(f'Job {job.id} failed: {event.exception}')
else:
MakimLogs.print_info(f'Job {job.id} executed successfully.')

def _validate_and_parse_schedule(self, schedule: str) -> dict:
"""Validate and parse cron expressions."""
try:
# Use croniter to validate and compute next run time
base_time = datetime.now()
iter = croniter(schedule, base_time)

# Get parsed schedule for APScheduler
next_time = iter.get_next(datetime)
cron_params = {
'minute': iter.next_exact('minute'),
'hour': iter.next_exact('hour'),
'day': iter.next_exact('day'),
'month': iter.next_exact('month'),
'day_of_week': iter.next_exact('weekday'),
}
return cron_params
except ValueError:
MakimLogs.raise_error(
f'Invalid cron expression: {schedule}',
MakimError.SCHEDULER_INVALID_SCHEDULE,
)

def add_job(
self,
job_id: str,
Expand All @@ -75,13 +123,14 @@ def add_job(
args : dict[Any, Any], optional
Arguments to pass to the task
"""
cron_params = self._validate_and_parse_schedule(schedule)
try:
self.scheduler.add_job(
func=self._execute_task,
trigger='cron',
args=[task_name, args or {}],
id=job_id,
**self._parse_schedule(schedule),
**cron_params,
replace_existing=True,
)
MakimLogs.print_info(f"Successfully scheduled job '{job_id}'")
Expand All @@ -102,27 +151,13 @@ def remove_job(self, job_id: str) -> None:
MakimError.SCHEDULER_JOB_ERROR,
)

def get_job_status(self, job_id: str) -> dict:
"""Get the status of a scheduled job."""
job = self.scheduler.get_job(job_id)
if not job:
MakimLogs.raise_error(
f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND
)

return {
'id': job.id,
'next_run': job.next_run_time,
'schedule': str(job.trigger),
'active': job.next_run_time is not None,
}

def list_jobs(self) -> list[dict[str, Any]]:
"""List all scheduled jobs."""
return [
{
'id': job.id,
'next_run': job.next_run_time,
'last_run': job.last_run_time, # Include last run
'schedule': str(job.trigger),
}
for job in self.scheduler.get_jobs()
Expand All @@ -137,35 +172,3 @@ def stop(self) -> None:
"""Stop the scheduler."""
if self.scheduler.running:
self.scheduler.shutdown()

def _parse_schedule(self, schedule: str) -> dict:
"""Parse schedule string into APScheduler arguments."""
schedule = schedule.lower()

if schedule in ('hourly', 'daily', 'weekly', 'monthly', 'yearly'):
if schedule == 'hourly':
return {'hour': '*'}
elif schedule == 'daily':
return {'day': '*'}
elif schedule == 'weekly':
return {'day_of_week': '0'} # Sunday
elif schedule == 'monthly':
return {'day': '1'}
elif schedule == 'yearly':
return {'month': '1', 'day': '1'}

# Handle cron expressions
try:
minute, hour, day, month, day_of_week = schedule.split()[:5]
return {
'minute': minute,
'hour': hour,
'day': day,
'month': month,
'day_of_week': day_of_week,
}
except ValueError:
MakimLogs.raise_error(
f'Invalid schedule format: {schedule}',
MakimError.SCHEDULER_INVALID_SCHEDULE,
)

0 comments on commit 5ffc7c3

Please sign in to comment.