Skip to content

Commit 1bfbf08

Browse files
committed
Merge PR #757 into 18.0
Signed-off-by sbidoul
2 parents f832ffd + 2aaeced commit 1bfbf08

File tree

1 file changed

+35
-6
lines changed

1 file changed

+35
-6
lines changed

queue_job/jobrunner/runner.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,17 @@
159159

160160
SELECT_TIMEOUT = 60
161161
ERROR_RECOVERY_DELAY = 5
162+
PG_ADVISORY_LOCK_ID = 2293787760715711918
162163

163164
_logger = logging.getLogger(__name__)
164165

165166
select = selectors.DefaultSelector
166167

167168

169+
class MasterElectionLost(Exception):
170+
pass
171+
172+
168173
# Unfortunately, it is not possible to extend the Odoo
169174
# server command line arguments, so we resort to environment variables
170175
# to configure the runner (channels mostly).
@@ -262,10 +267,15 @@ def __init__(self, db_name):
262267
self.db_name = db_name
263268
connection_info = _connection_info_for(db_name)
264269
self.conn = psycopg2.connect(**connection_info)
265-
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
266-
self.has_queue_job = self._has_queue_job()
267-
if self.has_queue_job:
268-
self._initialize()
270+
try:
271+
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
272+
self.has_queue_job = self._has_queue_job()
273+
if self.has_queue_job:
274+
self._acquire_master_lock()
275+
self._initialize()
276+
except BaseException:
277+
self.close()
278+
raise
269279

270280
def close(self):
271281
# pylint: disable=except-pass
@@ -278,6 +288,14 @@ def close(self):
278288
pass
279289
self.conn = None
280290

291+
def _acquire_master_lock(self):
292+
"""Acquire the master runner lock or raise MasterElectionLost"""
293+
with closing(self.conn.cursor()) as cr:
294+
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))
295+
if not cr.fetchone()[0]:
296+
msg = f"could not acquire master runner lock on {self.db_name}"
297+
raise MasterElectionLost(msg)
298+
281299
def _has_queue_job(self):
282300
with closing(self.conn.cursor()) as cr:
283301
cr.execute(
@@ -413,14 +431,17 @@ def close_databases(self, remove_jobs=True):
413431
self.db_by_name = {}
414432

415433
def initialize_databases(self):
416-
for db_name in self.get_db_names():
434+
for db_name in sorted(self.get_db_names()):
435+
# sorting is important to avoid deadlocks in acquiring the master lock
417436
db = Database(db_name)
418437
if db.has_queue_job:
419438
self.db_by_name[db_name] = db
420439
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
421440
for job_data in cr:
422441
self.channel_manager.notify(db_name, *job_data)
423442
_logger.info("queue job runner ready for db %s", db_name)
443+
else:
444+
db.close()
424445

425446
def run_jobs(self):
426447
now = _odoo_now()
@@ -507,7 +528,7 @@ def run(self):
507528
while not self._stop:
508529
# outer loop does exception recovery
509530
try:
510-
_logger.info("initializing database connections")
531+
_logger.debug("initializing database connections")
511532
# TODO: how to detect new databases or databases
512533
# on which queue_job is installed after server start?
513534
self.initialize_databases()
@@ -522,6 +543,14 @@ def run(self):
522543
except InterruptedError:
523544
# Interrupted system call, i.e. KeyboardInterrupt during select
524545
self.stop()
546+
except MasterElectionLost as e:
547+
_logger.debug(
548+
"master election lost: %s, sleeping %ds and retrying",
549+
e,
550+
ERROR_RECOVERY_DELAY,
551+
)
552+
self.close_databases()
553+
time.sleep(ERROR_RECOVERY_DELAY)
525554
except Exception:
526555
_logger.exception(
527556
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY

0 commit comments

Comments
 (0)