Skip to content

Commit 9514a5f

Browse files
committed
refactor: all run and schedule args can optionally configured via conf files
1 parent 8602ffb commit 9514a5f

File tree

3 files changed

+63
-84
lines changed

3 files changed

+63
-84
lines changed

.DS_Store

6 KB
Binary file not shown.

src/flowerpower/helpers/trigger.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ def _get_cron_trigger(
9090
if crontab is not None:
9191
return (CronTrigger.from_crontab(crontab), kwargs)
9292
else:
93-
return (
94-
CronTrigger(
93+
return CronTrigger(
9594
year=kwargs.pop("year", None),
9695
month=kwargs.pop("month", None),
9796
week=kwargs.pop("week", None),
@@ -103,9 +102,7 @@ def _get_cron_trigger(
103102
start_time=start_time,
104103
end_time=end_time,
105104
timezone=timezone,
106-
),
107-
kwargs,
108-
)
105+
)
109106

110107
def _get_interval_trigger(
111108
self,
@@ -115,8 +112,7 @@ def _get_interval_trigger(
115112
):
116113
from apscheduler.triggers.interval import IntervalTrigger
117114

118-
return (
119-
IntervalTrigger(
115+
return IntervalTrigger(
120116
weeks=kwargs.pop("weeks", 0),
121117
days=kwargs.pop("days", 0),
122118
hours=kwargs.pop("hours", 0),
@@ -125,9 +121,7 @@ def _get_interval_trigger(
125121
microseconds=kwargs.pop("microseconds", 0),
126122
start_time=start_time,
127123
end_time=end_time,
128-
),
129-
kwargs,
130-
)
124+
)
131125

132126
def _get_calendar_trigger(
133127
self,
@@ -138,8 +132,7 @@ def _get_calendar_trigger(
138132
):
139133
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
140134

141-
return (
142-
CalendarIntervalTrigger(
135+
return CalendarIntervalTrigger(
143136
weeks=kwargs.pop("weeks", 0),
144137
days=kwargs.pop("days", 0),
145138
hours=kwargs.pop("hours", 0),
@@ -148,14 +141,12 @@ def _get_calendar_trigger(
148141
start_time=start_time,
149142
end_time=end_time,
150143
timezone=timezone,
151-
),
152-
kwargs,
153-
)
144+
)
154145

155146
def _get_date_trigger(self, start_time: dt.datetime, **kwargs):
156147
from apscheduler.triggers.date import DateTrigger
157148

158-
return (DateTrigger(run_time=start_time), kwargs)
149+
return DateTrigger(run_time=start_time)
159150

160151

161152
def get_trigger(trigger_type: str, **kwargs):

src/flowerpower/pipeline.py

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import importlib
44
import os
55
import sys
6+
from tkinter import ALL
67
from typing import Any, Callable
78
from uuid import UUID
89

@@ -21,7 +22,7 @@
2122

2223

2324
from .helpers.executor import get_executor
24-
from .helpers.trigger import get_trigger
25+
from .helpers.trigger import get_trigger, ALL_TRIGGER_KWARGS
2526

2627

2728
class PipelineManager:
@@ -136,36 +137,26 @@ def _get_driver(
136137
self._load_module(name)
137138

138139
if with_tracker:
139-
project_id = kwargs.pop("project_id", None) or self.cfg.tracker.pipeline[
140-
name
141-
].get("project_id", None)
142-
username = kwargs.pop("username", None) or self.cfg.tracker.get(
143-
"username", None
144-
)
145-
dag_name = kwargs.pop("dag_name", None) or self.cfg.tracker.pipeline[
146-
name
147-
].get("dag_name", None)
148-
tags = kwargs.pop("tags", None) or self.cfg.tracker.pipeline[name].get(
149-
"tags", None
150-
)
151-
api_url = kwargs.pop("api_url", None) or self.cfg.tracker.get(
152-
"api_url", None
153-
)
154-
ui_url = kwargs.pop("ui_url", None) or self.cfg.tracker.get("ui_url", None)
140+
tracker_cfg = self.cfg.tracker.pipeline.get(name, {})
141+
tracker_kwargs = {
142+
key: kwargs.pop(key, None) or tracker_cfg.get(key, None)
143+
for key in [
144+
"project_id",
145+
"username",
146+
"dag_name",
147+
"tags",
148+
"api_url",
149+
"ui_url",
150+
]
151+
}
152+
project_id = tracker_kwargs.get("project_id", None)
155153

156154
if project_id is None:
157155
raise ValueError(
158156
"Please provide a project_id if you want to use the tracker"
159157
)
160158

161-
tracker = adapters.HamiltonTracker(
162-
project_id=project_id,
163-
username=username,
164-
dag_name=dag_name,
165-
tags=tags,
166-
hamilton_api_url=api_url,
167-
hamilton_ui_url=ui_url,
168-
)
159+
tracker = adapters.HamiltonTracker(project_id=project_id, **tracker_kwargs)
169160

170161
dr = (
171162
driver.Builder()
@@ -190,9 +181,9 @@ def _run(
190181
self,
191182
name: str,
192183
environment: str = "dev",
193-
executor: str | None = None,
194184
inputs: dict | None = None,
195185
final_vars: list | None = None,
186+
executor: str | None = None,
196187
with_tracker: bool | None = None,
197188
reload: bool = False,
198189
**kwargs,
@@ -215,18 +206,23 @@ def _run(
215206
"""
216207
logger.info(f"Starting pipeline {name} in environment {environment}")
217208

218-
pipeline_cfg = self.cfg.pipeline
219-
run_params = pipeline_cfg.run.get(name)[environment]
209+
run_params = self.cfg.pipeline.run.get(name)[environment]
220210

221211
final_vars = final_vars or run_params.get("final_vars", [])
222-
inputs = {**(run_params.get("inputs", {}) or {}), **(inputs or {})}
223-
with_tracker = with_tracker or run_params.get("with_tracker", False)
212+
inputs = {
213+
**(run_params.get("inputs", {}) or {}),
214+
**(inputs or {}),
215+
} # <-- inputs override and adds to run_params
216+
217+
kwargs.update(
218+
{
219+
arg: eval(arg) or run_params.get(arg, None)
220+
for arg in ["executor", "with_tracker", "reload"]
221+
}
222+
)
224223

225224
dr, shutdown = self._get_driver(
226225
name=name,
227-
executor=executor,
228-
with_tracker=with_tracker,
229-
reload=reload,
230226
**kwargs,
231227
)
232228

@@ -362,8 +358,6 @@ def add_job(
362358
with SchedulerManager(
363359
name=name, base_dir=self._base_dir, role="scheduler"
364360
) as sm:
365-
# if not any([task.id == "run-pipeline" for task in sm.get_tasks()]):
366-
# sm.configure_task(func_or_task_id="run-pipeline", func=self._run)
367361
return sm.add_job(
368362
self._run,
369363
args=(
@@ -385,11 +379,11 @@ def add_job(
385379
def schedule(
386380
self,
387381
name: str,
382+
inputs: dict | None = None,
383+
final_vars: list | None = None,
388384
environment: str = "dev",
389385
executor: str | None = None,
390386
trigger_type: str | None = None,
391-
inputs: dict | None = None,
392-
final_vars: list | None = None,
393387
with_tracker: bool | None = None,
394388
paused: bool = False,
395389
coalesce: str = "latest",
@@ -431,49 +425,43 @@ def schedule(
431425
if SchedulerManager is None:
432426
raise ValueError("APScheduler4 not installed. Please install it first.")
433427

434-
trigger_kwargs = {}
435428
if "pipeline" in self.cfg.scheduler:
436-
scheduler_cfg = self.cfg.scheduler.pipeline.get(name, None).copy()
429+
scheduler_cfg = self.cfg.scheduler.pipeline.get(name, None) # .copy()
437430
else:
438-
scheduler_cfg = None
439-
440-
if scheduler_cfg is not None:
441-
trigger_type = trigger_type or scheduler_cfg.pop("trigger_type", None)
442-
for key in [
443-
"crontab",
444-
"year",
445-
"month",
446-
"week",
447-
"day",
448-
"days_of_week",
449-
"hour",
450-
"minute",
451-
"second",
452-
"timezone",
453-
]:
454-
trigger_kwargs[key] = scheduler_cfg.pop(key, None)
431+
scheduler_cfg = {}
432+
433+
trigger_type = trigger_type or scheduler_cfg.get("trigger_type", None)
434+
435+
trigger_kwargs = {
436+
key: kwargs.pop(key, None) or scheduler_cfg.get(key, None)
437+
for key in ALL_TRIGGER_KWARGS.get(trigger_type, [])
438+
if key in kwargs or key in scheduler_cfg
439+
}
440+
441+
schedule_kwargs = {
442+
arg: eval(arg) or scheduler_cfg.get(arg, None)
443+
for arg in [
444+
"executor",
445+
"paused",
446+
"coalesce",
447+
"misfire_grace_time",
448+
"max_jitter",
449+
"max_running_jobs",
450+
"conflict_policy",
451+
]
452+
}
455453

456454
with SchedulerManager(
457455
name=name, base_dir=self._base_dir, role="scheduler"
458456
) as sm:
459-
# if not any([task.id == "run-pipeline" for task in sm.get_tasks()]):
460-
# sm.configure_task(func_or_task_id="run-pipeline", func=self._run)
461-
trigger, kwargs = get_trigger(trigger_type, **kwargs)
457+
trigger = get_trigger(trigger_type, **trigger_kwargs)
462458

463459
id_ = sm.add_schedule(
464460
self._run,
465461
trigger=trigger,
466462
args=(name, environment, executor, inputs, final_vars, with_tracker),
467463
kwargs=kwargs,
468-
job_executor=executor
469-
if executor in ["async", "threadpool", "processpool"]
470-
else "async",
471-
paused=paused,
472-
coalesce=coalesce,
473-
misfire_grace_time=misfire_grace_time,
474-
max_jitter=max_jitter,
475-
max_running_jobs=max_running_jobs,
476-
conflict_policy=conflict_policy,
464+
**schedule_kwargs,
477465
)
478466
logger.success(
479467
f"Added scheduler for {name} in environment {environment} with id {id_}"

0 commit comments

Comments
 (0)