Skip to content

Commit

Permalink
Fix database connection fallback logic in JobScheduler and UI
Browse files Browse the repository at this point in the history
  • Loading branch information
TheophileDiot committed May 22, 2024
1 parent caaa62b commit 5ee348c
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 71 deletions.
94 changes: 58 additions & 36 deletions src/common/db/Database.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def __init__(
"""Initialize the database"""
self.logger = logger
self.readonly = False
self.fallback_readonly = False

if pool:
self.logger.warning("The pool parameter is deprecated, it will be removed in the next version")
Expand Down Expand Up @@ -154,26 +153,30 @@ def __init__(

while not_connected:
try:
if not self.readonly:
if self.readonly:
with self.sql_engine.connect() as conn:
conn.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
conn.execute(text("DROP TABLE test"))
conn.execute(text("SELECT 1"))
else:
with self.sql_engine.connect() as conn:
conn.execute(text("SELECT 1"))
conn.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
conn.execute(text("DROP TABLE test"))

not_connected = False
except (OperationalError, DatabaseError) as e:
if retries <= 0:
if not self.readonly and "attempt to write a readonly database" in str(e):
self.logger.warning("The database is read-only, trying one last time to connect in read-only mode")
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(sqlalchemy_string_readonly, **self._engine_kwargs)
self.readonly = True
self.fallback_readonly = True
continue
self.logger.error(f"Can't connect to database : {format_exc()}")
_exit(1)
if "attempt to write a readonly database" in str(e):
if not self.readonly:
self.logger.warning("The database is read-only, trying one last time to connect in read-only mode")
self.readonly = True
elif self.database_uri_readonly and sqlalchemy_string != self.database_uri_readonly:
self.logger.warning("Can't connect to the database in read-only mode, falling back to read-only one")
sqlalchemy_string = self.database_uri_readonly
else:
self.logger.error(f"Can't connect to database : {format_exc()}")
_exit(1)
else:
self.logger.error(f"Can't connect to database : {format_exc()}")
_exit(1)

if "attempt to write a readonly database" in str(e):
if log:
Expand Down Expand Up @@ -203,24 +206,25 @@ def __del__(self) -> None:
if self.sql_engine:
self.sql_engine.dispose()

def retry_connection(self) -> None:
def retry_connection(self, *, readonly: bool = False, fallback: bool = False) -> None:
"""Retry the connection to the database"""

assert self.sql_engine is not None

try:
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri, **self._engine_kwargs)
self.fallback_readonly = False
self.readonly = False
except (OperationalError, DatabaseError) as e:
if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
self.fallback_readonly = True
self.readonly = True
return
raise e
if fallback and not self.database_uri_readonly:
raise ValueError("The fallback parameter is set to True but the read-only database URI is not set")

self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly if fallback else self.database_uri, **self._engine_kwargs)

if fallback or readonly:
with self.sql_engine.connect() as conn:
conn.execute(text("SELECT 1"))
return

with self.sql_engine.connect() as conn:
conn.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
conn.execute(text("DROP TABLE test"))

@contextmanager
def __db_session(self) -> Any:
Expand All @@ -230,10 +234,20 @@ def __db_session(self) -> Any:
self.logger.error("The database engine is not initialized")
_exit(1)

if self.fallback_readonly:
if self.database_uri and self.readonly:
# ? If the database is forced to be read-only, we try to connect as a non read-only user every time until the database is writable
with suppress(OperationalError, DatabaseError):
try:
self.retry_connection()
self.readonly = False
self.logger.info("The database is no longer read-only, defaulting to read-write mode")
except (OperationalError, DatabaseError):
try:
self.retry_connection(readonly=True)
except (OperationalError, DatabaseError):
if self.database_uri_readonly:
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)
self.readonly = True

with self.sql_engine.connect() as conn:
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
Expand All @@ -243,13 +257,21 @@ def __db_session(self) -> Any:
except BaseException as e:
session.rollback()

if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
self.fallback_readonly = True
if "attempt to write a readonly database" in str(e):
self.logger.warning("The database is read-only, retrying in read-only mode ...")
try:
self.retry_connection(readonly=True)
except (OperationalError, DatabaseError):
if self.database_uri_readonly:
self.logger.warning("Can't connect to the database in read-only mode, falling back to read-only one")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)

self.readonly = True
self.logger.warning("The database is read-only, falling back to read-only mode")
return
elif isinstance(e, ConnectionRefusedError) and self.database_uri_readonly:
self.logger.warning("Can't connect to the database, falling back to read-only one ...")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)

raise
finally:
Expand Down
51 changes: 39 additions & 12 deletions src/scheduler/JobScheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,19 @@ def setup(self):
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")

def run_pending(self) -> bool:
if self.db.readonly:
if self.db.fallback_readonly:
with suppress(BaseException):
self.db.retry_connection()
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.readonly = True

if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
Expand Down Expand Up @@ -277,10 +286,19 @@ def run_pending(self) -> bool:
return success

def run_once(self) -> bool:
if self.db.readonly:
if self.db.fallback_readonly:
with suppress(BaseException):
self.db.retry_connection()
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.readonly = True

if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
Expand Down Expand Up @@ -309,10 +327,19 @@ def run_once(self) -> bool:
return ret

def run_single(self, job_name: str) -> bool:
if self.db.readonly:
if self.db.fallback_readonly:
with suppress(BaseException):
self.db.retry_connection()
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.readonly = True

if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
Expand Down
36 changes: 25 additions & 11 deletions src/ui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ def get_ui_data():
return ui_data


def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: bool = False, was_draft: bool = False, threaded: bool = False):
def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: bool = False, was_draft: bool = False, threaded: bool = False) -> int:
# Do the operation
error = False
error = 0
ui_data = get_ui_data()
operation = ""

if "TO_FLASH" not in ui_data:
ui_data["TO_FLASH"] = []
Expand All @@ -225,7 +226,7 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
app.logger.error(f"Couldn't set the changes to checked in the database: {ret}")
ui_data["TO_FLASH"].append({"content": f"An error occurred when setting the changes to checked in the database : {ret}", "type": "error"})
elif method == "global_config":
operation = app.config["CONFIG"].edit_global_conf(args[0])
operation, error = app.config["CONFIG"].edit_global_conf(args[0])

if operation == "reload":
operation = app.config["INSTANCES"].reload_instance(args[0])
Expand All @@ -237,14 +238,12 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
operation = app.config["INSTANCES"].restart_instance(args[0])
elif not error:
operation = "The scheduler will be in charge of reloading the instances."
else:
operation = ""

if operation:
if isinstance(operation, list):
for op in operation:
ui_data["TO_FLASH"].append({"content": f"Reload failed for the instance {op}", "type": "error"})
elif operation.startswith("Can't"):
elif operation.startswith(("Can't", "The database is read-only")):
ui_data["TO_FLASH"].append({"content": operation, "type": "error"})
else:
ui_data["TO_FLASH"].append({"content": operation, "type": "success"})
Expand All @@ -262,6 +261,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")

return error


# UTILS
def run_action(plugin: str, function_name: str = ""):
Expand Down Expand Up @@ -391,9 +392,19 @@ def inject_variables():
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")

if db.readonly and db.fallback_readonly:
with suppress(BaseException):
if db.database_uri and db.readonly:
try:
db.retry_connection()
db.readonly = False
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
db.retry_connection(readonly=True)
except BaseException:
if db.database_uri_readonly:
with suppress(BaseException):
db.retry_connection(fallback=True)
db.readonly = True

# check that is value is in tuple
return dict(
Expand Down Expand Up @@ -720,15 +731,18 @@ def account():
metadata["last_pro_check"] = None
db.set_pro_metadata(metadata)

flash("Checking license key to upgrade.", "success")

curr_changes = db.check_changes()

# Reload instances
def update_global_config(threaded: bool = False):
wait_applying()

manage_bunkerweb("global_config", variable, threaded=threaded)
if not manage_bunkerweb("global_config", variable, threaded=threaded):
message = "Checking license key to upgrade."
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
else:
flash(message)

ui_data = get_ui_data()
ui_data["PRO_LOADING"] = True
Expand Down
29 changes: 17 additions & 12 deletions src/ui/src/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def __gen_conf(self, global_conf: dict, services_conf: list[dict], *, check_chan
conf["SERVER_NAME"] = " ".join(servers)
conf["DATABASE_URI"] = self.__db.database_uri

err = self.__db.save_config(conf, "ui", changed=check_changes)

if err:
self.__db.logger.warning(f"Couldn't save config to database : {err}, config may not work as expected")
return self.__db.save_config(conf, "ui", changed=check_changes)

def get_plugins_settings(self) -> dict:
return {
Expand Down Expand Up @@ -139,8 +136,8 @@ def check_variables(self, variables: dict) -> int:

return error

def reload_config(self) -> None:
self.__gen_conf(self.get_config(methods=False), self.get_services(methods=False))
def reload_config(self) -> str:
return self.__gen_conf(self.get_config(methods=False), self.get_services(methods=False))

def new_service(self, variables: dict, is_draft: bool = False) -> Tuple[str, int]:
"""Creates a new service from the given variables
Expand All @@ -167,7 +164,9 @@ def new_service(self, variables: dict, is_draft: bool = False) -> Tuple[str, int
return f"Service {service['SERVER_NAME'].split(' ')[0]} already exists.", 1

services.append(variables | {"IS_DRAFT": "yes" if is_draft else "no"})
self.__gen_conf(self.get_config(methods=False), services, check_changes=not is_draft)
ret = self.__gen_conf(self.get_config(methods=False), services, check_changes=not is_draft)
if ret:
return ret, 1
return f"Configuration for {variables['SERVER_NAME'].split(' ')[0]} has been generated.", 0

def edit_service(self, old_server_name: str, variables: dict, *, check_changes: bool = True, is_draft: bool = False) -> Tuple[str, int]:
Expand Down Expand Up @@ -205,10 +204,12 @@ def edit_service(self, old_server_name: str, variables: dict, *, check_changes:
if k.startswith(old_server_name_splitted[0]):
config.pop(k)

self.__gen_conf(config, services, check_changes=check_changes, changed_service=variables["SERVER_NAME"])
ret = self.__gen_conf(config, services, check_changes=check_changes, changed_service=variables["SERVER_NAME"])
if ret:
return ret, 1
return f"Configuration for {old_server_name_splitted[0]} has been edited.", 0

def edit_global_conf(self, variables: dict) -> str:
def edit_global_conf(self, variables: dict) -> Tuple[str, int]:
"""Edits the global conf
Parameters
Expand All @@ -221,8 +222,10 @@ def edit_global_conf(self, variables: dict) -> str:
str
the confirmation message
"""
self.__gen_conf(self.get_config(methods=False) | variables, self.get_services(methods=False))
return "The global configuration has been edited."
ret = self.__gen_conf(self.get_config(methods=False) | variables, self.get_services(methods=False))
if ret:
return ret, 1
return "The global configuration has been edited.", 0

def delete_service(self, service_name: str, *, check_changes: bool = True) -> Tuple[str, int]:
"""Deletes a service
Expand Down Expand Up @@ -269,5 +272,7 @@ def delete_service(self, service_name: str, *, check_changes: bool = True) -> Tu
if k in service:
service.pop(k)

self.__gen_conf(new_env, new_services, check_changes=check_changes)
ret = self.__gen_conf(new_env, new_services, check_changes=check_changes)
if ret:
return ret, 1
return f"Configuration for {service_name} has been deleted.", 0

0 comments on commit 5ee348c

Please sign in to comment.