forked from suryanshsk/Python-Voice-Assistant-Suryanshsk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Task_Scheduler
28 lines (25 loc) · 1.18 KB
/
Task_Scheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Task Scheduler Framework (task_scheduler.py)
import schedule
import time
import datetime
import subprocess
# Function to run the specified task
def run_task(task_name, allowed_runtime, command):
try:
start = datetime.datetime.now()
# Running an external command for the task
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
process.communicate()
exit_code = process.returncode
end = datetime.datetime.now()
# Calculate the duration in hours
duration = (end - start).total_seconds() / 3600
if duration > allowed_runtime:
print(f"Task '{task_name}' was terminated as it exceeded the maximum allowed runtime of {allowed_runtime} hours.")
process.terminate() # Terminate the task if it exceeds allowed runtime
else:
print(f"Task '{task_name}' completed successfully in {duration:.2f} hours.")
if exit_code != 0:
print(f"Task '{task_name}' finished with an error. Exit code: {exit_code}")
except Exception as e:
print(f"An error occurred while executing '{task_name}': {str(e)}")