diff --git a/dtable_events/automations/dtable_automation_rules_scanner.py b/dtable_events/automations/dtable_automation_rules_scanner.py index fc93610e..f0ed3312 100644 --- a/dtable_events/automations/dtable_automation_rules_scanner.py +++ b/dtable_events/automations/dtable_automation_rules_scanner.py @@ -91,7 +91,7 @@ def __init__(self, db_session_class): def run(self): sched = BlockingScheduler() # fire at every hour in every day of week - @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='8', misfire_grace_time=600) + @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='32', misfire_grace_time=600) def timed_job(): logging.info('Starts to scan automation rules...') diff --git a/dtable_events/convert_page/manager.py b/dtable_events/convert_page/manager.py index b34e72ef..61176c72 100644 --- a/dtable_events/convert_page/manager.py +++ b/dtable_events/convert_page/manager.py @@ -4,8 +4,10 @@ from threading import Thread from playwright.async_api import async_playwright +from playwright._impl._errors import TimeoutError from dtable_events.app.config import INNER_DTABLE_DB_URL, DTABLE_WEB_SERVICE_URL +from dtable_events.convert_page.utils import get_pdf_print_options from dtable_events.utils import get_inner_dtable_server_url, get_opt_from_conf_or_env, uuid_str_to_36_chars from dtable_events.utils.dtable_db_api import DTableDBAPI from dtable_events.utils.dtable_server_api import DTableServerAPI, NotFoundException @@ -17,13 +19,14 @@ class BrowserWorker(Thread): - def __init__(self, index, task_queue: Queue): + def __init__(self, index, task_queue: Queue, pages=10): super().__init__() self.thread_id = index self.task_queue = task_queue self.playwright = None self.browser = None self.context = None + self.pages = pages self.loop = asyncio.new_event_loop() # each thread has own event loop @@ -99,6 +102,25 @@ def check_resources(self, dtable_uuid, plugin_type, page_id, table_id, target_co 'row_ids': row_ids }, None + async def row_page_to_pdf(self, url, context, row_id, action_type, per_converted_callbacks): + page = await context.new_page() + page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}")) + page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}")) + page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}")) + try: + await page.goto(url, wait_until="load") + await page.wait_for_load_state('networkidle', timeout=180*1000) + content = await page.pdf(**get_pdf_print_options()) + except TimeoutError: + content = await page.pdf(**get_pdf_print_options()) + await page.close() + if action_type == 'convert_page_to_pdf': + for callback in per_converted_callbacks: + try: + callback(row_id, content) + except Exception as e: + logger.exception(e) + async def convert_with_rows(self, task_info, resources): dtable_uuid = task_info.get('dtable_uuid') plugin_type = task_info.get('plugin_type') @@ -115,40 +137,27 @@ async def convert_with_rows(self, task_info, resources): context = await self.get_context() # convert - # open all tabs of rows step by step + # open all tabs of rows pages by pages # wait render and convert to pdf one by one - step = 10 - for i in range(0, len(row_ids), step): - try: - step_row_ids = row_ids[i: i+step] - # open rows - for row_id in step_row_ids: - url = '' - if plugin_type == 'page-design': - url = DTABLE_WEB_SERVICE_URL.strip('/') + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars(dtable_uuid), page_id, row_id) - if not url: - continue - dtable_server_api = DTableServerAPI('dtable-events', dtable_uuid, dtable_server_url) - url += '?access-token=%s&need_convert=%s' % (dtable_server_api.internal_access_token, 0) - page = await context.new_page() - page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}")) - page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}")) - page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}")) - await page.goto(url, wait_until="load") - await page.wait_for_load_state('networkidle') - pdf_content = await page.pdf(format='A4') - if action_type == 'convert_page_to_pdf': - for callback in per_converted_callbacks: - try: - callback(row_id, pdf_content) - except Exception as e: - logging.exception(e) - except Exception as e: - logger.exception('convert task: %s error: %s', task_info, e) - continue - finally: - for page in self.context.pages: - await page.close() + pages = self.pages + dtable_server_api = DTableServerAPI('dtable-events', dtable_uuid, dtable_server_url) + for i in range(0, len(row_ids), pages): + tasks = [] + # open rows + for row_id in row_ids[i: i+pages]: + url = '' + if plugin_type == 'page-design': + url = DTABLE_WEB_SERVICE_URL.strip('/') + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars(dtable_uuid), page_id, row_id) + if not url: + continue + url += '?access-token=%s&need_convert=%s' % (dtable_server_api.internal_access_token, 0) + + tasks.append(self.row_page_to_pdf(url, context, row_id, action_type, per_converted_callbacks)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.exception(result) # callbacks if action_type == 'convert_page_to_pdf': @@ -156,7 +165,7 @@ async def convert_with_rows(self, task_info, resources): try: callback(table, target_column) except Exception as e: - logging.exception(e) + logger.exception(e) async def convert_without_rows(self, task_info): dtable_uuid = task_info.get('dtable_uuid') @@ -175,26 +184,24 @@ async def convert_without_rows(self, task_info): url += '?access-token=%s&need_convert=%s' % (dtable_server_api.access_token, 0) context = await self.get_context() + page = await context.new_page() + page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}")) + page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}")) + page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}")) try: - page = await context.new_page() - page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}")) - page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}")) - page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}")) await page.goto(url, wait_until="load") - await page.wait_for_load_state('networkidle') - pdf_content = await page.pdf(format='A4') - - if action_type == 'convert_document_to_pdf_and_send': - for callback in per_converted_callbacks: - try: - callback(pdf_content) - except Exception as e: - logging.exception(e) - except Exception as e: - logger.exception('convert task: %s error: %s', task_info, e) - finally: - for page in self.context.pages: - await page.close() + await page.wait_for_load_state('networkidle', timeout=180*1000) + pdf_content = await page.pdf(**get_pdf_print_options()) + except TimeoutError: + pdf_content = await page.pdf(**get_pdf_print_options()) + + if action_type == 'convert_document_to_pdf_and_send': + for callback in per_converted_callbacks: + try: + callback(pdf_content) + except Exception as e: + logger.exception(e) + await page.close() async def _do_convert(self, task_info): dtable_uuid = task_info.get('dtable_uuid') @@ -228,12 +235,12 @@ async def do_convert(self, task_info): except Exception as e: logger.exception(f'do convert Thread-{self.thread_id} Exception in loop.run_until_complete - {e}') try: - if self.context: - await self.context.close() + await self.browser.close() except Exception as e: logger.exception(f'do convert Thread-{self.thread_id} close context error: {e}') finally: self.context = None + self.browser = None def run(self): asyncio.set_event_loop(self.loop) @@ -251,29 +258,27 @@ class ConvertPageToPDFManager: def __init__(self): self.max_workers = 2 self.max_queue = 1000 + self.pages = 10 def init(self, config): section_name = 'CONERT-PAGE-TO-PDF' key_max_workers = 'max_workers' key_max_queue = 'max_queue' + key_pages = 'pages' self.config = config if config.has_section('CONERT-PAGE-TO-PDF'): - try: - self.max_workers = int(get_opt_from_conf_or_env(config, section_name, key_max_workers, default=self.max_workers)) - except: - pass - try: - self.max_queue = int(get_opt_from_conf_or_env(config, section_name, key_max_queue, default=self.max_queue)) - except: - pass + self.max_workers = int(get_opt_from_conf_or_env(config, section_name, key_max_workers, default=self.max_workers)) + self.max_queue = int(get_opt_from_conf_or_env(config, section_name, key_max_queue, default=self.max_queue)) + self.pages = int(get_opt_from_conf_or_env(config, section_name, key_pages, default=self.pages)) + self.queue = Queue(self.max_queue) # element in queue is a dict about task def start(self): - logger.debug('convert page to pdf max workers: %s max queue: %s', self.max_workers, self.max_queue) + logger.debug('convert page to pdf max workers: %s max queue: %s pages: %s', self.max_workers, self.max_queue, self.pages) for i in range(self.max_workers): - t = BrowserWorker(i, self.queue) + t = BrowserWorker(i, self.queue, self.pages) t.start() def add_task(self, task_info): diff --git a/dtable_events/convert_page/utils.py b/dtable_events/convert_page/utils.py new file mode 100644 index 00000000..2931b91d --- /dev/null +++ b/dtable_events/convert_page/utils.py @@ -0,0 +1,7 @@ +def get_pdf_print_options(): + return { + 'landscape': False, + 'display_header_footer': False, + 'print_background': True, + 'prefer_css_page_size': True + } diff --git a/dtable_events/dtable_io/__init__.py b/dtable_events/dtable_io/__init__.py index dd39f8c2..a8f033d3 100644 --- a/dtable_events/dtable_io/__init__.py +++ b/dtable_events/dtable_io/__init__.py @@ -6,6 +6,7 @@ import requests from datetime import datetime from playwright.async_api import async_playwright +from playwright._impl._errors import TimeoutError from seaserv import seafile_api @@ -35,6 +36,7 @@ from dtable_events.utils.email_sender import EmailSender from dtable_events.dtable_io.utils import clear_tmp_dir, clear_tmp_file, clear_tmp_files_and_dirs from dtable_events.app.log import setup_logger +from dtable_events.convert_page.utils import get_pdf_print_options dtable_io_logger = setup_logger('dtable_events_io.log') dtable_message_logger = setup_logger('dtable_events_message.log') @@ -696,8 +698,11 @@ async def access_and_save(): page.on("response", lambda response: dtable_io_logger.debug(f"Response: {response.status} {response.url}")) page.on("console", lambda msg: dtable_io_logger.debug(f"Console [{msg.type}]: {msg.text}")) await page.goto(url, wait_until="load") - await page.wait_for_load_state('networkidle') - await page.pdf(path=target_path, format='A4') + await page.wait_for_load_state('networkidle', timeout=180*1000) + await page.pdf(path=target_path, **get_pdf_print_options()) + except TimeoutError: + dtable_io_logger.exception('dtable: %s plugin: %s page: %s row: %s timeout', dtable_uuid, plugin_type, page_id, row_id) + await page.pdf(path=target_path, **get_pdf_print_options()) except Exception as e: dtable_io_logger.exception('dtable: %s plugin: %s page: %s row: %s error: %s', dtable_uuid, plugin_type, page_id, row_id, e)