From 0f47fc3c16689f97f62cc3d7bb9c6b285f2d2586 Mon Sep 17 00:00:00 2001 From: r350178982 <32759763+r350178982@users.noreply.github.com> Date: Thu, 29 Dec 2022 17:48:36 +0800 Subject: [PATCH 1/5] initiate --- dtable_events/app/app.py | 3 + dtable_events/big_data/__init__.py | 1 + dtable_events/big_data/auto_archive_utils.py | 89 +++++++++++++++++ .../big_data/big_data_auto_archive_scanner.py | 95 +++++++++++++++++++ dtable_events/utils/dtable_server_api.py | 9 ++ dtable_events/utils/sql_generator.py | 8 +- 6 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 dtable_events/big_data/__init__.py create mode 100644 dtable_events/big_data/auto_archive_utils.py create mode 100644 dtable_events/big_data/big_data_auto_archive_scanner.py diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index 9a4008a2..b8c9a7ca 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from dtable_events.activities.handlers import MessageHandler +from dtable_events.big_data.big_data_auto_archive_scanner import DTableAutoArchiveTaskScanner from dtable_events.statistics.counter import UserActivityCounter from dtable_events.dtable_io.dtable_io_server import DTableIOServer from dtable_events.tasks.instant_notices_sender import InstantNoticeSender @@ -51,6 +52,7 @@ def __init__(self, config, task_mode): self._data_syncr = DataSyncer(config) self._workflow_schedule_scanner = WorkflowSchedulesScanner(config) self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner(config) + self._dtable_auto_archive_scanner = DTableAutoArchiveTaskScanner(config) def serve_forever(self): if self._enable_foreground_tasks: @@ -78,3 +80,4 @@ def serve_forever(self): self._data_syncr.start() # default True self._workflow_schedule_scanner.start() # default True self._dtable_asset_trash_cleaner.start() # always True + self._dtable_auto_archive_scanner.start() # default True diff --git a/dtable_events/big_data/__init__.py b/dtable_events/big_data/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/dtable_events/big_data/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py new file mode 100644 index 00000000..6c6122a4 --- /dev/null +++ b/dtable_events/big_data/auto_archive_utils.py @@ -0,0 +1,89 @@ +import logging +import json +from datetime import datetime +from dtable_events.utils import get_inner_dtable_server_url +from dtable_events.utils.dtable_server_api import DTableServerAPI +from dtable_events.utils.sql_generator import BaseSQLGenerator + +logger = logging.getLogger(__name__) +def update_last_run_time(task_id, db_session): + + cmd = "UPDATE dtable_auto_archive_task SET last_run_time=:new_time WHERE id=:task_id" + db_session.execute(cmd, {'new_time': datetime.utcnow(), 'task_id': task_id}) + +def set_invalid(task_id, db_session): + sql = "UPDATE dtable_auto_archive_task SET is_valid=:is_valid WHERE id=:task_id" + try: + db_session.execute(sql, {'is_valid': 0, 'task_id': task_id}) + except Exception as e: + logger.error(e) + +def meet_condition(run_condition, details): + if run_condition == 'per_day': + run_hour = details.get('run_hour', None) + cur_datetime = datetime.now() + cur_hour = int(cur_datetime.hour) + try: + if int(run_hour) == cur_hour: + return True + except: + return False + + + return False + + +def run_dtable_auto_archive_task(task, db_session): + + task_id = task[0] + run_condition = task[1] + table_id = task[2] + view_id = task[3] + last_run_time = task[4] + dtable_uuid = task[5] + details = task[6] + creator = task[7] + try: + details = json.loads(details) + if not meet_condition(run_condition, details): + return + + dtable_server_url = get_inner_dtable_server_url() + seatable = DTableServerAPI(creator, dtable_uuid, dtable_server_url) + current_table, current_view = None, None + metadata = seatable.get_metadata() + for table in metadata['tables']: + if table.get('_id') == table_id: + current_table = table + break + + if not current_table: + set_invalid(task_id, db_session) + return + + for view in current_table['views']: + if view.get('_id') == view_id: + current_view = view + break + + if not current_view: + set_invalid(task_id, db_session) + return + + table_name = current_table.get('name') + filter_conditions = { + "filters": current_view.get('filters') or [], + "filter_conjunction": current_view.get('filter_conjunction') or 'And', + } + + columns = seatable.list_columns(table_name) + sql_generator = BaseSQLGenerator(table_name, columns, filter_conditions=filter_conditions) + where_clause = sql_generator.get_where_clause() + seatable.archive_view(table_name, where_clause) + + except Exception as e: + logger.error(e) + set_invalid(task_id, db_session) + return + + update_last_run_time(task_id, db_session) diff --git a/dtable_events/big_data/big_data_auto_archive_scanner.py b/dtable_events/big_data/big_data_auto_archive_scanner.py new file mode 100644 index 00000000..42374fd1 --- /dev/null +++ b/dtable_events/big_data/big_data_auto_archive_scanner.py @@ -0,0 +1,95 @@ +import logging +from datetime import datetime, timedelta +from threading import Thread + +from apscheduler.schedulers.blocking import BlockingScheduler +from dtable_events.big_data.auto_archive_utils import run_dtable_auto_archive_task +from dtable_events.db import init_db_session_class +from dtable_events.utils import get_opt_from_conf_or_env, parse_bool + + +__all__ = [ + 'DTableAutoArchiveTaskScanner', +] + + +class DTableAutoArchiveTaskScanner(object): + + def __init__(self, config): + self._enabled = True + self._parse_config(config) + self._db_session_class = init_db_session_class(config) + + def _parse_config(self, config): + """parse send email related options from config file + """ + section_name = 'AUTOARCHIVE-SCANNER' + key_enabled = 'enabled' + + if not config.has_section(section_name): + section_name = 'AUTOARCHIVE SCANNER' + if not config.has_section(section_name): + return + + # enabled + enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) + enabled = parse_bool(enabled) + self._enabled = enabled + + def start(self): + if not self.is_enabled(): + logging.warning('Can not start big data auto archive scanner: it is not enabled!') + return + + logging.info('Start big data auto archive scanner') + + DTableAutoArchiveTaskScannerTimer(self._db_session_class).start() + + def is_enabled(self): + return self._enabled + + +def scan_auto_archive_tasks(db_session): + sql = ''' + SELECT `bdar`.`id`, `run_condition`, `table_id`, `view_id`, `last_run_time`, `dtable_uuid`, `details`, bdar.`creator` FROM dtable_auto_archive_task bdar + JOIN dtables d ON bdar.dtable_uuid=d.uuid + WHERE ((run_condition='per_day' AND (last_run_time<:per_day_check_time OR last_run_time IS NULL)) + AND bdar.is_valid=1 AND d.deleted=0 + ''' + per_day_check_time = datetime.utcnow() - timedelta(hours=23) + + tasks = db_session.execute(sql, { + 'per_day_check_time': per_day_check_time, + }) + + for task in tasks: + try: + run_dtable_auto_archive_task(task, db_session) + except Exception as e: + logging.exception(e) + logging.error(f'check task failed. {task}, error: {e}') + db_session.commit() + + +class DTableAutoArchiveTaskScannerTimer(Thread): + + def __init__(self, db_session_class): + super(DTableAutoArchiveTaskScannerTimer, self).__init__() + self.db_session_class = db_session_class + + def run(self): + sched = BlockingScheduler() + # fire at every hour in every day of week + @sched.scheduled_job('cron', day_of_week='*', hour='*') + def timed_job(): + logging.info('Starts to auto archive...') + + db_session = self.db_session_class() + try: + scan_auto_archive_tasks(db_session) + except Exception as e: + logging.exception('error when scanning big data auto archives: %s', e) + finally: + db_session.close() + + sched.start() diff --git a/dtable_events/utils/dtable_server_api.py b/dtable_events/utils/dtable_server_api.py index c2c8d851..756bb931 100644 --- a/dtable_events/utils/dtable_server_api.py +++ b/dtable_events/utils/dtable_server_api.py @@ -411,3 +411,12 @@ def batch_send_notification(self, user_msg_list): } response = requests.post(url, json=body, headers=self.headers) return parse_response(response) + + def archive_view(self, table_name, where_clause=""): + url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/archive-view/?from=dtable_events' + json_data = { + 'table_name': table_name, + 'where': where_clause, + } + response = requests.post(url, json=json_data, headers=self.headers, timeout=self.timeout) + return parse_response(response) diff --git a/dtable_events/utils/sql_generator.py b/dtable_events/utils/sql_generator.py index fd4098f6..bc1f4530 100644 --- a/dtable_events/utils/sql_generator.py +++ b/dtable_events/utils/sql_generator.py @@ -1655,7 +1655,7 @@ def _groupfilter2sql(self): group_conjunction_split.join(group_string_list) ) - def _filter2sql(self): + def _filter2sql(self, include_where_string=True): filter_conditions = self.filter_conditions filters = filter_conditions.get('filters', []) filter_conjunction = filter_conditions.get('filter_conjunction', 'And') @@ -1692,6 +1692,9 @@ def _filter2sql(self): ) else: return '' + + if not include_where_string: + return "%s" % filter_content return "%s%s" % ( filter_header, filter_content @@ -1733,6 +1736,9 @@ def to_sql(self, by_group=False): sql = "%s %s" % (sql, limit_clause) return sql + def get_where_clause(self, include_where_string=False): + return self._filter2sql(include_where_string=include_where_string) + class LinkRecordsSQLGenerator(object): From 63685e8e2543c170253265a4b45bbd12b275e79a Mon Sep 17 00:00:00 2001 From: r350178982 <32759763+r350178982@users.noreply.github.com> Date: Tue, 3 Jan 2023 18:17:44 +0800 Subject: [PATCH 2/5] update --- dtable_events/big_data/auto_archive_utils.py | 4 ++-- dtable_events/big_data/big_data_auto_archive_scanner.py | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py index 6c6122a4..b0d912fe 100644 --- a/dtable_events/big_data/auto_archive_utils.py +++ b/dtable_events/big_data/auto_archive_utils.py @@ -1,7 +1,7 @@ import logging import json from datetime import datetime -from dtable_events.utils import get_inner_dtable_server_url +from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_36_chars from dtable_events.utils.dtable_server_api import DTableServerAPI from dtable_events.utils.sql_generator import BaseSQLGenerator @@ -47,7 +47,7 @@ def run_dtable_auto_archive_task(task, db_session): details = json.loads(details) if not meet_condition(run_condition, details): return - + dtable_uuid = uuid_str_to_36_chars(dtable_uuid) dtable_server_url = get_inner_dtable_server_url() seatable = DTableServerAPI(creator, dtable_uuid, dtable_server_url) current_table, current_view = None, None diff --git a/dtable_events/big_data/big_data_auto_archive_scanner.py b/dtable_events/big_data/big_data_auto_archive_scanner.py index 42374fd1..374c317b 100644 --- a/dtable_events/big_data/big_data_auto_archive_scanner.py +++ b/dtable_events/big_data/big_data_auto_archive_scanner.py @@ -54,12 +54,18 @@ def scan_auto_archive_tasks(db_session): SELECT `bdar`.`id`, `run_condition`, `table_id`, `view_id`, `last_run_time`, `dtable_uuid`, `details`, bdar.`creator` FROM dtable_auto_archive_task bdar JOIN dtables d ON bdar.dtable_uuid=d.uuid WHERE ((run_condition='per_day' AND (last_run_time<:per_day_check_time OR last_run_time IS NULL)) + OR (run_condition='per_week' AND (last_run_time<:per_week_check_time OR last_run_time IS NULL)) + OR (run_condition='per_month' AND (last_run_time<:per_month_check_time OR last_run_time IS NULL))) AND bdar.is_valid=1 AND d.deleted=0 ''' - per_day_check_time = datetime.utcnow() - timedelta(hours=23) + per_day_check_time = datetime.now() - timedelta(hours=23) + per_week_check_time = datetime.now() - timedelta(days=6) + per_month_check_time = datetime.now() - timedelta(days=29) tasks = db_session.execute(sql, { 'per_day_check_time': per_day_check_time, + 'per_week_check_time': per_week_check_time, + 'per_month_check_time': per_month_check_time }) for task in tasks: From a04750dc419e8bf26137d9eaaaa1600fc89752c0 Mon Sep 17 00:00:00 2001 From: r350178982 <32759763+r350178982@users.noreply.github.com> Date: Wed, 4 Jan 2023 16:10:48 +0800 Subject: [PATCH 3/5] Update auto_archive_utils.py --- dtable_events/big_data/auto_archive_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py index b0d912fe..caade809 100644 --- a/dtable_events/big_data/auto_archive_utils.py +++ b/dtable_events/big_data/auto_archive_utils.py @@ -9,7 +9,7 @@ def update_last_run_time(task_id, db_session): cmd = "UPDATE dtable_auto_archive_task SET last_run_time=:new_time WHERE id=:task_id" - db_session.execute(cmd, {'new_time': datetime.utcnow(), 'task_id': task_id}) + db_session.execute(cmd, {'new_time': datetime.now(), 'task_id': task_id}) def set_invalid(task_id, db_session): sql = "UPDATE dtable_auto_archive_task SET is_valid=:is_valid WHERE id=:task_id" From 3685930b04382bc6d61dd8912ec62d77f7a7cc16 Mon Sep 17 00:00:00 2001 From: r350178982 <32759763+r350178982@users.noreply.github.com> Date: Thu, 5 Jan 2023 10:13:56 +0800 Subject: [PATCH 4/5] Update auto_archive_utils.py --- dtable_events/big_data/auto_archive_utils.py | 24 ++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py index caade809..617b08ff 100644 --- a/dtable_events/big_data/auto_archive_utils.py +++ b/dtable_events/big_data/auto_archive_utils.py @@ -19,16 +19,36 @@ def set_invalid(task_id, db_session): logger.error(e) def meet_condition(run_condition, details): + cur_datetime = datetime.now() + cur_hour = int(cur_datetime.hour) + cur_week_day = cur_datetime.isoweekday() + cur_month_day = cur_datetime.day if run_condition == 'per_day': run_hour = details.get('run_hour', None) - cur_datetime = datetime.now() - cur_hour = int(cur_datetime.hour) try: if int(run_hour) == cur_hour: return True except: return False + if run_condition == 'per_week': + run_week_day = details.get('run_week_day', None) + run_week_hour = details.get('run_week_hour', None) + try: + if (int(run_week_hour) == cur_hour) and (int(run_week_day) == cur_week_day): + return True + except: + return False + + if run_condition == 'per_month': + run_month_day = details.get('run_month_day', None) + run_month_hour = details.get('run_month_hour', None) + try: + if (int(run_month_hour) == cur_hour) and (int(run_month_day) == cur_month_day): + return True + except: + return False + return False From 287b39bba94dc106fb55162aaa430b1516965b37 Mon Sep 17 00:00:00 2001 From: r350178982 <32759763+r350178982@users.noreply.github.com> Date: Thu, 5 Jan 2023 11:29:46 +0800 Subject: [PATCH 5/5] Update auto_archive_utils.py --- dtable_events/big_data/auto_archive_utils.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py index 617b08ff..a696ac3f 100644 --- a/dtable_events/big_data/auto_archive_utils.py +++ b/dtable_events/big_data/auto_archive_utils.py @@ -6,6 +6,17 @@ from dtable_events.utils.sql_generator import BaseSQLGenerator logger = logging.getLogger(__name__) + +PER_DAY = 'per_day' +PER_WEEK = 'per_week' +PER_MONTH = 'per_month' + +VALID_RUN_CONDITIONS = [ + PER_DAY, + PER_WEEK, + PER_MONTH +] + def update_last_run_time(task_id, db_session): cmd = "UPDATE dtable_auto_archive_task SET last_run_time=:new_time WHERE id=:task_id" @@ -23,7 +34,7 @@ def meet_condition(run_condition, details): cur_hour = int(cur_datetime.hour) cur_week_day = cur_datetime.isoweekday() cur_month_day = cur_datetime.day - if run_condition == 'per_day': + if run_condition == PER_DAY: run_hour = details.get('run_hour', None) try: if int(run_hour) == cur_hour: @@ -31,7 +42,7 @@ def meet_condition(run_condition, details): except: return False - if run_condition == 'per_week': + if run_condition == PER_WEEK: run_week_day = details.get('run_week_day', None) run_week_hour = details.get('run_week_hour', None) try: @@ -40,7 +51,7 @@ def meet_condition(run_condition, details): except: return False - if run_condition == 'per_month': + if run_condition == PER_MONTH: run_month_day = details.get('run_month_day', None) run_month_hour = details.get('run_month_hour', None) try: