Skip to content

Commit

Permalink
Removed utils.utcnow() (#2056)
Browse files Browse the repository at this point in the history
* Bump version to 1.16.1

* Partially replaced utcnow() with now()

* Removed utils.utcnow

* Fix lint
  • Loading branch information
selwin committed Mar 11, 2024
1 parent 3a3787d commit bab0061
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 83 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Breaking Changes:
* `Queue.all()` requires `connection` argument.
* `@job` decorator now requires `connection` argument.

### RQ 1.16.1 (2024-03-09)
* Added `worker_pool.get_worker_process()` to make `WorkerPool` easier to extend. Thanks @selwin!

### RQ 1.16 (2024-02-24)
* Added a way for jobs to wait for latest result `job.latest_result(timeout=60)`. Thanks @ajnisbet!
* Fixed an issue where `stopped_callback` is not respected when job is enqueued via `enqueue_many()`. Thanks @eswolinsky3241!
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ maintainers = [
{name = "Selwin Ong"},
]
authors = [
{ name = "Selwin Ong", email = "[email protected]" },
{ name = "Vincent Driessen", email = "[email protected]" },
]
requires-python = ">=3.7"
Expand Down
10 changes: 5 additions & 5 deletions rq/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .job import Job
from .registry import BaseRegistry, StartedJobRegistry
from .utils import as_text, current_timestamp, utcnow
from .utils import as_text, current_timestamp, now


class Execution:
Expand All @@ -19,9 +19,9 @@ def __init__(self, id: str, job_id: str, connection: Redis):
self.id = id
self.job_id = job_id
self.connection = connection
now = utcnow()
self.created_at = now
self.last_heartbeat = now
right_now = now()
self.created_at = right_now
self.last_heartbeat = right_now

def __eq__(self, other: object) -> bool:
if not isinstance(other, Execution):
Expand Down Expand Up @@ -94,7 +94,7 @@ def serialize(self) -> Dict:
def heartbeat(self, started_job_registry: StartedJobRegistry, ttl: int, pipeline: 'Pipeline'):
"""Update execution heartbeat."""
# TODO: worker heartbeat should be tied to execution heartbeat
self.last_heartbeat = utcnow()
self.last_heartbeat = now()
pipeline.hset(self.key, 'last_heartbeat', self.last_heartbeat.timestamp())
pipeline.expire(self.key, ttl)
started_job_registry.add(self.job, ttl, pipeline=pipeline, xx=True)
Expand Down
8 changes: 4 additions & 4 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
get_call_string,
get_version,
import_attribute,
now,
parse_timeout,
str_to_date,
utcformat,
utcnow,
)

logger = logging.getLogger("rq.job")
Expand Down Expand Up @@ -641,7 +641,7 @@ def __init__(self, id: Optional[str] = None, connection: 'Redis' = None, seriali
raise TypeError("Job.__init__() missing 1 required argument: 'connection'")
self.connection = connection
self._id = id
self.created_at = utcnow()
self.created_at = now()
self._data = UNEVALUATED
self._func_name = UNEVALUATED
self._instance = UNEVALUATED
Expand Down Expand Up @@ -1020,7 +1020,7 @@ def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dic
dict: The Job serialized as a dictionary
"""
obj = {
'created_at': utcformat(self.created_at or utcnow()),
'created_at': utcformat(self.created_at or now()),
'data': zlib.compress(self.data),
'success_callback_name': self._success_callback_name if self._success_callback_name else '',
'failure_callback_name': self._failure_callback_name if self._failure_callback_name else '',
Expand Down Expand Up @@ -1320,7 +1320,7 @@ def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'):
pipeline (Pipeline): The Redis' piipeline to use
"""
self.worker_name = worker_name
self.last_heartbeat = utcnow()
self.last_heartbeat = now()
self.started_at = self.last_heartbeat
self._status = JobStatus.STARTED
mapping = {
Expand Down
4 changes: 2 additions & 2 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .logutils import blue, green
from .serializers import resolve_serializer
from .types import FunctionReferenceType, JobDependencyType
from .utils import as_text, backend_class, compact, get_version, import_attribute, parse_timeout, utcnow
from .utils import as_text, backend_class, compact, get_version, import_attribute, now, parse_timeout

logger = logging.getLogger("rq.queue")

Expand Down Expand Up @@ -1128,7 +1128,7 @@ def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_fro
job.set_status(JobStatus.QUEUED, pipeline=pipe)

job.origin = self.name
job.enqueued_at = utcnow()
job.enqueued_at = now()

if job.timeout is None:
job.timeout = self._default_timeout
Expand Down
12 changes: 3 additions & 9 deletions rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ def import_attribute(name: str) -> Callable[..., Any]:
return getattr(attribute_owner, attribute_name)


def utcnow():

return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)


def now():
"""Return now in UTC"""
return datetime.datetime.now(datetime.timezone.utc)
Expand All @@ -147,10 +142,9 @@ def utcparse(string: str) -> dt.datetime:
except ValueError:
# This catches any jobs remain with old datetime format
try:
return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%S.%fZ')
except:
return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ')

return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%S.%fZ')
except ValueError:
return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ')


def first(iterable: Iterable, default=None, key=None):
Expand Down
2 changes: 1 addition & 1 deletion rq/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '1.16.0'
VERSION = '1.16.1'
40 changes: 20 additions & 20 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
ensure_list,
get_connection_from_queues,
get_version,
now,
utcformat,
utcnow,
utcparse,
)
from .version import VERSION
Expand Down Expand Up @@ -415,7 +415,7 @@ def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every 10 minutes."""
if self.last_cleaned_at is None:
return True
if (utcnow() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval):
if (now() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval):
return True
return False

Expand Down Expand Up @@ -452,7 +452,7 @@ def clean_registries(self):
worker_registration.clean_worker_registry(queue)
clean_intermediate_queue(self, queue)
queue.release_maintenance_lock()
self.last_cleaned_at = utcnow()
self.last_cleaned_at = now()

def get_redis_server_version(self):
"""Return Redis server version of connection"""
Expand Down Expand Up @@ -819,9 +819,9 @@ def register_birth(self):
queues = ','.join(self.queue_names())
with self.connection.pipeline() as p:
p.delete(key)
now = utcnow()
now_in_string = utcformat(now)
self.birth_date = now
right_now = now()
now_in_string = utcformat(right_now)
self.birth_date = right_now

mapping = {
'birth': now_in_string,
Expand All @@ -846,7 +846,7 @@ def register_death(self):
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
worker_registration.unregister(self, p)
p.hset(self.key, 'death', utcformat(utcnow()))
p.hset(self.key, 'death', utcformat(now()))
p.expire(self.key, 60)
p.execute()

Expand Down Expand Up @@ -993,7 +993,7 @@ def dequeue_job_and_maintain_ttl(
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0
idle_since = utcnow()
idle_since = now()
idle_time_left = max_idle_time
while True:
try:
Expand Down Expand Up @@ -1027,7 +1027,7 @@ def dequeue_job_and_maintain_ttl(
break
except DequeueTimeout:
if max_idle_time is not None:
idle_for = (utcnow() - idle_since).total_seconds()
idle_for = (now() - idle_since).total_seconds()
idle_time_left = math.ceil(max_idle_time - idle_for)
if idle_time_left <= 0:
break
Expand Down Expand Up @@ -1062,7 +1062,7 @@ def heartbeat(self, timeout: Optional[int] = None, pipeline: Optional['Pipeline'
timeout = timeout or self.worker_ttl + 60
connection: Union[Redis, 'Pipeline'] = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
connection.hset(self.key, 'last_heartbeat', utcformat(now()))
self.log.debug('Sent heartbeat to prevent worker timeout. Next one should arrive in %s seconds.', timeout)

def teardown(self):
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def request_force_stop(self, signum: int, frame: Optional[FrameType]):
# One is sent by the pool when it calls `pool.stop_worker()` and another is sent by the OS
# when user hits Ctrl+C. In this case if we receive the second signal within 1 second,
# we ignore it.
if (utcnow() - self._shutdown_requested_date) < timedelta(seconds=1): # type: ignore
if (now() - self._shutdown_requested_date) < timedelta(seconds=1): # type: ignore
self.log.debug('Shutdown signal ignored, received twice in less than 1 second')
return

Expand All @@ -1229,7 +1229,7 @@ def request_stop(self, signum, frame):
frame (Any): Frame
"""
self.log.debug('Got signal %s', signal_name(signum))
self._shutdown_requested_date = utcnow()
self._shutdown_requested_date = now()

signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)
Expand Down Expand Up @@ -1297,7 +1297,7 @@ def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
queue (Queue): _description_
"""
retpid = ret_val = rusage = None
job.started_at = utcnow()
job.started_at = now()
while True:
try:
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
Expand All @@ -1306,7 +1306,7 @@ def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())
self.set_current_job_working_time((now() - job.started_at).total_seconds())

# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
Expand Down Expand Up @@ -1346,7 +1346,7 @@ def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
self.handle_job_failure(job, queue=queue, exc_string='Job stopped by user, work-horse terminated.')
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
job.ended_at = now()

# Unhandled failure: move the job to the failed queue
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ''
Expand Down Expand Up @@ -1376,7 +1376,7 @@ def maintain_heartbeats(self, job: 'Job'):

self.execution.heartbeat(job.started_job_registry, ttl, pipeline=pipeline) # type: ignore
# After transition to job execution is complete, `job.heartbeat()` is no longer needed
job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
job.heartbeat(now(), ttl, pipeline=pipeline, xx=True)
results = pipeline.execute()

# If job was enqueued with `result_ttl=0` (job is deleted as soon as it finishes),
Expand Down Expand Up @@ -1434,7 +1434,7 @@ def prepare_job_execution(self, job: 'Job', remove_from_intermediate_queue: bool

heartbeat_ttl = self.get_heartbeat_ttl(job)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
job.heartbeat(now(), heartbeat_ttl, pipeline=pipeline)

job.prepare_for_execution(self.name, pipeline=pipeline)
if remove_from_intermediate_queue:
Expand Down Expand Up @@ -1504,8 +1504,8 @@ def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: S

def handle_execution_ended(self, job: 'Job', queue: 'Queue', heartbeat_ttl: int):
"""Called after job has finished execution."""
job.ended_at = utcnow()
job.heartbeat(utcnow(), heartbeat_ttl)
job.ended_at = now()
job.heartbeat(now(), heartbeat_ttl)

def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
"""Performs the actual work of a job. Will/should only be called
Expand All @@ -1525,7 +1525,7 @@ def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
remove_from_intermediate_queue = len(self.queues) == 1
self.prepare_job_execution(job, remove_from_intermediate_queue)

job.started_at = utcnow()
job.started_at = now()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
self.log.debug('Performing Job...')
Expand Down
8 changes: 4 additions & 4 deletions tests/test_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from rq.executions import Execution, ExecutionRegistry
from rq.queue import Queue
from rq.utils import current_timestamp, utcnow
from rq.utils import current_timestamp, now
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import long_running_job, say_hello, start_worker_process
Expand Down Expand Up @@ -37,9 +37,9 @@ def test_add_delete_executions(self):
self.assertTrue(self.connection.ttl(execution.key) <= 100)

execution = Execution.fetch(id=execution.id, job_id=job.id, connection=self.connection)
self.assertEqual(execution.created_at, created_at)
self.assertEqual(execution.created_at.timestamp(), created_at.timestamp())
self.assertEqual(execution.composite_key, composite_key)
self.assertEqual(execution.last_heartbeat, created_at)
self.assertEqual(execution.last_heartbeat.timestamp(), created_at.timestamp())

execution.delete(job=job, pipeline=pipeline)
pipeline.execute()
Expand Down Expand Up @@ -158,7 +158,7 @@ def test_execution_added_to_started_job_registry(self):
self.assertIn(execution.composite_key, job.started_job_registry.get_job_ids())

last_heartbeat = execution.last_heartbeat
last_heartbeat = utcnow()
last_heartbeat = now()
self.assertTrue(30 < self.connection.ttl(execution.key) < 200)

sleep(2)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
StartedJobRegistry,
)
from rq.serializers import JSONSerializer
from rq.utils import as_text, get_version, utcformat, utcnow
from rq.utils import as_text, get_version, now, utcformat
from rq.worker import Worker
from tests import RQTestCase, fixtures

Expand Down Expand Up @@ -255,7 +255,7 @@ def test_persistence_of_typical_jobs(self):
self.assertEqual(job.last_heartbeat, None)
self.assertEqual(job.last_heartbeat, None)

ts = utcnow()
ts = now()
job.heartbeat(ts, 0)
self.assertEqual(job.last_heartbeat, ts)

Expand Down Expand Up @@ -1167,12 +1167,12 @@ def test_dependencies_finished_returns_true_if_all_dependencies_finished(self):
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()

now = utcnow()
right_now = now()

# Set ended_at timestamps
for i, job in enumerate(dependency_jobs):
job._status = JobStatus.FINISHED
job.ended_at = now - timedelta(seconds=i)
job.ended_at = right_now - timedelta(seconds=i)
job.save()

dependencies_finished = dependent_job.dependencies_are_met()
Expand All @@ -1183,7 +1183,7 @@ def test_dependencies_finished_returns_false_if_unfinished_job(self):
dependency_jobs = [Job.create(fixtures.say_hello, connection=self.connection) for _ in range(2)]

dependency_jobs[0]._status = JobStatus.FINISHED
dependency_jobs[0].ended_at = utcnow()
dependency_jobs[0].ended_at = now()
dependency_jobs[0].save()

dependency_jobs[1]._status = JobStatus.STARTED
Expand Down
Loading

0 comments on commit bab0061

Please sign in to comment.