Skip to content

Commit

Permalink
opt playwright mualti pages
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexCXC committed Dec 4, 2024
1 parent 7982d2d commit 7e6c815
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, db_session_class):
def run(self):
sched = BlockingScheduler()
# fire at every hour in every day of week
@sched.scheduled_job('cron', day_of_week='*', hour='*', minute='8', misfire_grace_time=600)
@sched.scheduled_job('cron', day_of_week='*', hour='*', minute='32', misfire_grace_time=600)
def timed_job():
logging.info('Starts to scan automation rules...')

Expand Down
135 changes: 70 additions & 65 deletions dtable_events/convert_page/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from threading import Thread

from playwright.async_api import async_playwright
from playwright._impl._errors import TimeoutError

from dtable_events.app.config import INNER_DTABLE_DB_URL, DTABLE_WEB_SERVICE_URL
from dtable_events.convert_page.utils import get_pdf_print_options
from dtable_events.utils import get_inner_dtable_server_url, get_opt_from_conf_or_env, uuid_str_to_36_chars
from dtable_events.utils.dtable_db_api import DTableDBAPI
from dtable_events.utils.dtable_server_api import DTableServerAPI, NotFoundException
Expand All @@ -17,13 +19,14 @@

class BrowserWorker(Thread):

def __init__(self, index, task_queue: Queue):
def __init__(self, index, task_queue: Queue, pages=10):
super().__init__()
self.thread_id = index
self.task_queue = task_queue
self.playwright = None
self.browser = None
self.context = None
self.pages = pages

self.loop = asyncio.new_event_loop() # each thread has own event loop

Expand Down Expand Up @@ -99,6 +102,25 @@ def check_resources(self, dtable_uuid, plugin_type, page_id, table_id, target_co
'row_ids': row_ids
}, None

async def row_page_to_pdf(self, url, context, row_id, action_type, per_converted_callbacks):
page = await context.new_page()
page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}"))
page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}"))
page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}"))
try:
await page.goto(url, wait_until="load")
await page.wait_for_load_state('networkidle', timeout=180*1000)
content = await page.pdf(**get_pdf_print_options())
except TimeoutError:
content = await page.pdf(**get_pdf_print_options())
await page.close()
if action_type == 'convert_page_to_pdf':
for callback in per_converted_callbacks:
try:
callback(row_id, content)
except Exception as e:
logger.exception(e)

async def convert_with_rows(self, task_info, resources):
dtable_uuid = task_info.get('dtable_uuid')
plugin_type = task_info.get('plugin_type')
Expand All @@ -115,48 +137,35 @@ async def convert_with_rows(self, task_info, resources):
context = await self.get_context()

# convert
# open all tabs of rows step by step
# open all tabs of rows pages by pages
# wait render and convert to pdf one by one
step = 10
for i in range(0, len(row_ids), step):
try:
step_row_ids = row_ids[i: i+step]
# open rows
for row_id in step_row_ids:
url = ''
if plugin_type == 'page-design':
url = DTABLE_WEB_SERVICE_URL.strip('/') + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars(dtable_uuid), page_id, row_id)
if not url:
continue
dtable_server_api = DTableServerAPI('dtable-events', dtable_uuid, dtable_server_url)
url += '?access-token=%s&need_convert=%s' % (dtable_server_api.internal_access_token, 0)
page = await context.new_page()
page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}"))
page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}"))
page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}"))
await page.goto(url, wait_until="load")
await page.wait_for_load_state('networkidle')
pdf_content = await page.pdf(format='A4')
if action_type == 'convert_page_to_pdf':
for callback in per_converted_callbacks:
try:
callback(row_id, pdf_content)
except Exception as e:
logging.exception(e)
except Exception as e:
logger.exception('convert task: %s error: %s', task_info, e)
continue
finally:
for page in self.context.pages:
await page.close()
pages = self.pages
dtable_server_api = DTableServerAPI('dtable-events', dtable_uuid, dtable_server_url)
for i in range(0, len(row_ids), pages):
tasks = []
# open rows
for row_id in row_ids[i: i+pages]:
url = ''
if plugin_type == 'page-design':
url = DTABLE_WEB_SERVICE_URL.strip('/') + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars(dtable_uuid), page_id, row_id)
if not url:
continue
url += '?access-token=%s&need_convert=%s' % (dtable_server_api.internal_access_token, 0)

tasks.append(self.row_page_to_pdf(url, context, row_id, action_type, per_converted_callbacks))

results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.exception(result)

# callbacks
if action_type == 'convert_page_to_pdf':
for callback in all_converted_callbacks:
try:
callback(table, target_column)
except Exception as e:
logging.exception(e)
logger.exception(e)

async def convert_without_rows(self, task_info):
dtable_uuid = task_info.get('dtable_uuid')
Expand All @@ -175,26 +184,24 @@ async def convert_without_rows(self, task_info):
url += '?access-token=%s&need_convert=%s' % (dtable_server_api.access_token, 0)

context = await self.get_context()
page = await context.new_page()
page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}"))
page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}"))
page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}"))
try:
page = await context.new_page()
page.on("request", lambda request: logger.debug(f"Request: {request.method} {request.url}"))
page.on("response", lambda response: logger.debug(f"Response: {response.status} {response.url}"))
page.on("console", lambda msg: logger.debug(f"Console [{msg.type}]: {msg.text}"))
await page.goto(url, wait_until="load")
await page.wait_for_load_state('networkidle')
pdf_content = await page.pdf(format='A4')

if action_type == 'convert_document_to_pdf_and_send':
for callback in per_converted_callbacks:
try:
callback(pdf_content)
except Exception as e:
logging.exception(e)
except Exception as e:
logger.exception('convert task: %s error: %s', task_info, e)
finally:
for page in self.context.pages:
await page.close()
await page.wait_for_load_state('networkidle', timeout=180*1000)
pdf_content = await page.pdf(**get_pdf_print_options())
except TimeoutError:
pdf_content = await page.pdf(**get_pdf_print_options())

if action_type == 'convert_document_to_pdf_and_send':
for callback in per_converted_callbacks:
try:
callback(pdf_content)
except Exception as e:
logger.exception(e)
await page.close()

async def _do_convert(self, task_info):
dtable_uuid = task_info.get('dtable_uuid')
Expand Down Expand Up @@ -228,12 +235,12 @@ async def do_convert(self, task_info):
except Exception as e:
logger.exception(f'do convert Thread-{self.thread_id} Exception in loop.run_until_complete - {e}')
try:
if self.context:
await self.context.close()
await self.browser.close()
except Exception as e:
logger.exception(f'do convert Thread-{self.thread_id} close context error: {e}')
finally:
self.context = None
self.browser = None

def run(self):
asyncio.set_event_loop(self.loop)
Expand All @@ -251,29 +258,27 @@ class ConvertPageToPDFManager:
def __init__(self):
self.max_workers = 2
self.max_queue = 1000
self.pages = 10

def init(self, config):
section_name = 'CONERT-PAGE-TO-PDF'
key_max_workers = 'max_workers'
key_max_queue = 'max_queue'
key_pages = 'pages'

self.config = config

if config.has_section('CONERT-PAGE-TO-PDF'):
try:
self.max_workers = int(get_opt_from_conf_or_env(config, section_name, key_max_workers, default=self.max_workers))
except:
pass
try:
self.max_queue = int(get_opt_from_conf_or_env(config, section_name, key_max_queue, default=self.max_queue))
except:
pass
self.max_workers = int(get_opt_from_conf_or_env(config, section_name, key_max_workers, default=self.max_workers))
self.max_queue = int(get_opt_from_conf_or_env(config, section_name, key_max_queue, default=self.max_queue))
self.pages = int(get_opt_from_conf_or_env(config, section_name, key_pages, default=self.pages))

self.queue = Queue(self.max_queue) # element in queue is a dict about task

def start(self):
logger.debug('convert page to pdf max workers: %s max queue: %s', self.max_workers, self.max_queue)
logger.debug('convert page to pdf max workers: %s max queue: %s pages: %s', self.max_workers, self.max_queue, self.pages)
for i in range(self.max_workers):
t = BrowserWorker(i, self.queue)
t = BrowserWorker(i, self.queue, self.pages)
t.start()

def add_task(self, task_info):
Expand Down
7 changes: 7 additions & 0 deletions dtable_events/convert_page/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def get_pdf_print_options():
return {
'landscape': False,
'display_header_footer': False,
'print_background': True,
'prefer_css_page_size': True
}
9 changes: 7 additions & 2 deletions dtable_events/dtable_io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
from datetime import datetime
from playwright.async_api import async_playwright
from playwright._impl._errors import TimeoutError

from seaserv import seafile_api

Expand Down Expand Up @@ -35,6 +36,7 @@
from dtable_events.utils.email_sender import EmailSender
from dtable_events.dtable_io.utils import clear_tmp_dir, clear_tmp_file, clear_tmp_files_and_dirs
from dtable_events.app.log import setup_logger
from dtable_events.convert_page.utils import get_pdf_print_options

dtable_io_logger = setup_logger('dtable_events_io.log')
dtable_message_logger = setup_logger('dtable_events_message.log')
Expand Down Expand Up @@ -696,8 +698,11 @@ async def access_and_save():
page.on("response", lambda response: dtable_io_logger.debug(f"Response: {response.status} {response.url}"))
page.on("console", lambda msg: dtable_io_logger.debug(f"Console [{msg.type}]: {msg.text}"))
await page.goto(url, wait_until="load")
await page.wait_for_load_state('networkidle')
await page.pdf(path=target_path, format='A4')
await page.wait_for_load_state('networkidle', timeout=180*1000)
await page.pdf(path=target_path, **get_pdf_print_options())
except TimeoutError:
dtable_io_logger.exception('dtable: %s plugin: %s page: %s row: %s timeout', dtable_uuid, plugin_type, page_id, row_id)
await page.pdf(path=target_path, **get_pdf_print_options())
except Exception as e:
dtable_io_logger.exception('dtable: %s plugin: %s page: %s row: %s error: %s', dtable_uuid, plugin_type, page_id, row_id, e)

Expand Down

0 comments on commit 7e6c815

Please sign in to comment.