diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index 9a4008a2..b8c9a7ca 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from dtable_events.activities.handlers import MessageHandler +from dtable_events.big_data.big_data_auto_archive_scanner import DTableAutoArchiveTaskScanner from dtable_events.statistics.counter import UserActivityCounter from dtable_events.dtable_io.dtable_io_server import DTableIOServer from dtable_events.tasks.instant_notices_sender import InstantNoticeSender @@ -51,6 +52,7 @@ def __init__(self, config, task_mode): self._data_syncr = DataSyncer(config) self._workflow_schedule_scanner = WorkflowSchedulesScanner(config) self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner(config) + self._dtable_auto_archive_scanner = DTableAutoArchiveTaskScanner(config) def serve_forever(self): if self._enable_foreground_tasks: @@ -78,3 +80,4 @@ def serve_forever(self): self._data_syncr.start() # default True self._workflow_schedule_scanner.start() # default True self._dtable_asset_trash_cleaner.start() # always True + self._dtable_auto_archive_scanner.start() # default True diff --git a/dtable_events/big_data/__init__.py b/dtable_events/big_data/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/dtable_events/big_data/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/dtable_events/big_data/auto_archive_utils.py b/dtable_events/big_data/auto_archive_utils.py new file mode 100644 index 00000000..a696ac3f --- /dev/null +++ b/dtable_events/big_data/auto_archive_utils.py @@ -0,0 +1,120 @@ +import logging +import json +from datetime import datetime +from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_36_chars +from dtable_events.utils.dtable_server_api import DTableServerAPI +from dtable_events.utils.sql_generator import BaseSQLGenerator + +logger = logging.getLogger(__name__) + +PER_DAY = 'per_day' +PER_WEEK = 'per_week' +PER_MONTH = 'per_month' + +VALID_RUN_CONDITIONS = [ + PER_DAY, + PER_WEEK, + PER_MONTH +] + +def update_last_run_time(task_id, db_session): + + cmd = "UPDATE dtable_auto_archive_task SET last_run_time=:new_time WHERE id=:task_id" + db_session.execute(cmd, {'new_time': datetime.now(), 'task_id': task_id}) + +def set_invalid(task_id, db_session): + sql = "UPDATE dtable_auto_archive_task SET is_valid=:is_valid WHERE id=:task_id" + try: + db_session.execute(sql, {'is_valid': 0, 'task_id': task_id}) + except Exception as e: + logger.error(e) + +def meet_condition(run_condition, details): + cur_datetime = datetime.now() + cur_hour = int(cur_datetime.hour) + cur_week_day = cur_datetime.isoweekday() + cur_month_day = cur_datetime.day + if run_condition == PER_DAY: + run_hour = details.get('run_hour', None) + try: + if int(run_hour) == cur_hour: + return True + except: + return False + + if run_condition == PER_WEEK: + run_week_day = details.get('run_week_day', None) + run_week_hour = details.get('run_week_hour', None) + try: + if (int(run_week_hour) == cur_hour) and (int(run_week_day) == cur_week_day): + return True + except: + return False + + if run_condition == PER_MONTH: + run_month_day = details.get('run_month_day', None) + run_month_hour = details.get('run_month_hour', None) + try: + if (int(run_month_hour) == cur_hour) and (int(run_month_day) == cur_month_day): + return True + except: + return False + + + return False + + +def run_dtable_auto_archive_task(task, db_session): + + task_id = task[0] + run_condition = task[1] + table_id = task[2] + view_id = task[3] + last_run_time = task[4] + dtable_uuid = task[5] + details = task[6] + creator = task[7] + try: + details = json.loads(details) + if not meet_condition(run_condition, details): + return + dtable_uuid = uuid_str_to_36_chars(dtable_uuid) + dtable_server_url = get_inner_dtable_server_url() + seatable = DTableServerAPI(creator, dtable_uuid, dtable_server_url) + current_table, current_view = None, None + metadata = seatable.get_metadata() + for table in metadata['tables']: + if table.get('_id') == table_id: + current_table = table + break + + if not current_table: + set_invalid(task_id, db_session) + return + + for view in current_table['views']: + if view.get('_id') == view_id: + current_view = view + break + + if not current_view: + set_invalid(task_id, db_session) + return + + table_name = current_table.get('name') + filter_conditions = { + "filters": current_view.get('filters') or [], + "filter_conjunction": current_view.get('filter_conjunction') or 'And', + } + + columns = seatable.list_columns(table_name) + sql_generator = BaseSQLGenerator(table_name, columns, filter_conditions=filter_conditions) + where_clause = sql_generator.get_where_clause() + seatable.archive_view(table_name, where_clause) + + except Exception as e: + logger.error(e) + set_invalid(task_id, db_session) + return + + update_last_run_time(task_id, db_session) diff --git a/dtable_events/big_data/big_data_auto_archive_scanner.py b/dtable_events/big_data/big_data_auto_archive_scanner.py new file mode 100644 index 00000000..374c317b --- /dev/null +++ b/dtable_events/big_data/big_data_auto_archive_scanner.py @@ -0,0 +1,101 @@ +import logging +from datetime import datetime, timedelta +from threading import Thread + +from apscheduler.schedulers.blocking import BlockingScheduler +from dtable_events.big_data.auto_archive_utils import run_dtable_auto_archive_task +from dtable_events.db import init_db_session_class +from dtable_events.utils import get_opt_from_conf_or_env, parse_bool + + +__all__ = [ + 'DTableAutoArchiveTaskScanner', +] + + +class DTableAutoArchiveTaskScanner(object): + + def __init__(self, config): + self._enabled = True + self._parse_config(config) + self._db_session_class = init_db_session_class(config) + + def _parse_config(self, config): + """parse send email related options from config file + """ + section_name = 'AUTOARCHIVE-SCANNER' + key_enabled = 'enabled' + + if not config.has_section(section_name): + section_name = 'AUTOARCHIVE SCANNER' + if not config.has_section(section_name): + return + + # enabled + enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True) + enabled = parse_bool(enabled) + self._enabled = enabled + + def start(self): + if not self.is_enabled(): + logging.warning('Can not start big data auto archive scanner: it is not enabled!') + return + + logging.info('Start big data auto archive scanner') + + DTableAutoArchiveTaskScannerTimer(self._db_session_class).start() + + def is_enabled(self): + return self._enabled + + +def scan_auto_archive_tasks(db_session): + sql = ''' + SELECT `bdar`.`id`, `run_condition`, `table_id`, `view_id`, `last_run_time`, `dtable_uuid`, `details`, bdar.`creator` FROM dtable_auto_archive_task bdar + JOIN dtables d ON bdar.dtable_uuid=d.uuid + WHERE ((run_condition='per_day' AND (last_run_time<:per_day_check_time OR last_run_time IS NULL)) + OR (run_condition='per_week' AND (last_run_time<:per_week_check_time OR last_run_time IS NULL)) + OR (run_condition='per_month' AND (last_run_time<:per_month_check_time OR last_run_time IS NULL))) + AND bdar.is_valid=1 AND d.deleted=0 + ''' + per_day_check_time = datetime.now() - timedelta(hours=23) + per_week_check_time = datetime.now() - timedelta(days=6) + per_month_check_time = datetime.now() - timedelta(days=29) + + tasks = db_session.execute(sql, { + 'per_day_check_time': per_day_check_time, + 'per_week_check_time': per_week_check_time, + 'per_month_check_time': per_month_check_time + }) + + for task in tasks: + try: + run_dtable_auto_archive_task(task, db_session) + except Exception as e: + logging.exception(e) + logging.error(f'check task failed. {task}, error: {e}') + db_session.commit() + + +class DTableAutoArchiveTaskScannerTimer(Thread): + + def __init__(self, db_session_class): + super(DTableAutoArchiveTaskScannerTimer, self).__init__() + self.db_session_class = db_session_class + + def run(self): + sched = BlockingScheduler() + # fire at every hour in every day of week + @sched.scheduled_job('cron', day_of_week='*', hour='*') + def timed_job(): + logging.info('Starts to auto archive...') + + db_session = self.db_session_class() + try: + scan_auto_archive_tasks(db_session) + except Exception as e: + logging.exception('error when scanning big data auto archives: %s', e) + finally: + db_session.close() + + sched.start() diff --git a/dtable_events/utils/dtable_server_api.py b/dtable_events/utils/dtable_server_api.py index c2c8d851..756bb931 100644 --- a/dtable_events/utils/dtable_server_api.py +++ b/dtable_events/utils/dtable_server_api.py @@ -411,3 +411,12 @@ def batch_send_notification(self, user_msg_list): } response = requests.post(url, json=body, headers=self.headers) return parse_response(response) + + def archive_view(self, table_name, where_clause=""): + url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/archive-view/?from=dtable_events' + json_data = { + 'table_name': table_name, + 'where': where_clause, + } + response = requests.post(url, json=json_data, headers=self.headers, timeout=self.timeout) + return parse_response(response) diff --git a/dtable_events/utils/sql_generator.py b/dtable_events/utils/sql_generator.py index fd4098f6..bc1f4530 100644 --- a/dtable_events/utils/sql_generator.py +++ b/dtable_events/utils/sql_generator.py @@ -1655,7 +1655,7 @@ def _groupfilter2sql(self): group_conjunction_split.join(group_string_list) ) - def _filter2sql(self): + def _filter2sql(self, include_where_string=True): filter_conditions = self.filter_conditions filters = filter_conditions.get('filters', []) filter_conjunction = filter_conditions.get('filter_conjunction', 'And') @@ -1692,6 +1692,9 @@ def _filter2sql(self): ) else: return '' + + if not include_where_string: + return "%s" % filter_content return "%s%s" % ( filter_header, filter_content @@ -1733,6 +1736,9 @@ def to_sql(self, by_group=False): sql = "%s %s" % (sql, limit_clause) return sql + def get_where_clause(self, include_where_string=False): + return self._filter2sql(include_where_string=include_where_string) + class LinkRecordsSQLGenerator(object):