Skip to content

Commit

Permalink
support connect_tab for ChromeEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
ClericPy committed Oct 12, 2020
1 parent e4e598e commit aad79cf
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 35 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ print(r.text)


## ChromeEngine normal usage

### Connect tab and do something
```python
import asyncio

from ichrome.pool import ChromeEngine


def test_chrome_engine_connect_tab():

async def _test_chrome_engine_connect_tab():

async with ChromeEngine(headless=True, disable_image=True) as ce:
async with ce.connect_tab() as tab:
await tab.goto('http://pypi.org')
print(await tab.title)

asyncio.get_event_loop().run_until_complete(
_test_chrome_engine_connect_tab())


if __name__ == "__main__":
test_chrome_engine_connect_tab()
# INFO 2020-10-13 00:43:37 [ichrome] pool.py(438): [enqueue](0) ChromeTask(<1>, PENDING) with timeout=None, tab_index=None, data=<ichrome.pool._TabWorker object at 0x0000018E44F7E970>
# INFO 2020-10-13 00:43:39 [ichrome] pool.py(168): [online] ChromeWorker(<9345>, 0/5, 1 todos) is online.
# INFO 2020-10-13 00:43:39 [ichrome] pool.py(192): ChromeWorker(<9345>, 0/5, 0 todos) get a new task ChromeTask(<1>, PENDING).
# PyPI · The Python Package Index
# INFO 2020-10-13 00:43:42 [ichrome] pool.py(178): [offline] ChromeWorker(<9345>, 0/5, 0 todos) is offline.
# INFO 2020-10-13 00:43:42 [ichrome] pool.py(227): [finished](0) ChromeTask(<1>, PENDING)
```

### Batch Tasks
```python
import asyncio
from inspect import getsource
Expand Down
6 changes: 6 additions & 0 deletions examples_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def test_drc(tab, data_dict):
title = await tab.current_title
assert TEST_DRC_OK, 'test default_recv_callback failed'
assert 'GitHub' in title
tab.default_recv_callback.clear()
logger.info('test init tab from chromed OK.')
# test on_startup
assert chromed.started
Expand Down Expand Up @@ -397,6 +398,11 @@ async def tab_callback2(self, tab, url, timeout):
results = [await task for task in tasks]
assert 1000 < len(results[0]['tags'][0]) < len(results[1]['html'])

# test connect_tab
async with ce.connect_tab() as tab:
await tab.goto('http://pypi.org')
title = await tab.title
assert 'PyPI' in title, title
logger.critical('test_chrome_engine OK')

# asyncio.run will raise aiohttp issue: https://github.com/aio-libs/aiohttp/issues/4324
Expand Down
2 changes: 1 addition & 1 deletion ichrome/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .pool import ChromeEngine
from .sync_utils import Chrome, Tab

__version__ = "2.5.0"
__version__ = "2.5.1"
__tips__ = "[github]: https://github.com/ClericPy/ichrome\n[cdp]: https://chromedevtools.github.io/devtools-protocol/\n[cmd args]: https://peter.sh/experiments/chromium-command-line-switches/"
__all__ = [
'Chrome', 'ChromeDaemon', 'Tab', 'Tag', 'AsyncChrome', 'AsyncTab', 'logger',
Expand Down
9 changes: 5 additions & 4 deletions ichrome/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ def ensure_callback_type(_default_recv_callback):
for func in _default_recv_callback:
if not callable(func):
raise ChromeTypeError(
f'callback function ({getattr(func, "__name__", func)}) should be callable')
f'callback function ({getattr(func, "__name__", func)}) should be callable'
)
if not inspect.isbuiltin(func) and len(
inspect.signature(func).parameters) != 2:
raise ChromeTypeError(
Expand Down Expand Up @@ -1021,7 +1022,7 @@ async def get_history_entry(self,
return result['entries'][index]
else:
raise ChromeValueError(
f'index and relative_index should not be both None.')
'index and relative_index should not be both None.')

async def history_back(self, timeout=NotSet):
return await self.goto_history_relative(relative_index=-1,
Expand Down Expand Up @@ -1807,10 +1808,10 @@ async def mouse_move(self,
start_y,
steps_count=steps_count)
else:
interval = 0
steps = [(target_x, target_y)]
for x, y in steps:
if duration:
await asyncio.sleep(interval)
await asyncio.sleep(interval)
await self.send('Input.dispatchMouseEvent',
type="mouseMoved",
x=int(round(x)),
Expand Down
2 changes: 1 addition & 1 deletion ichrome/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from asyncio import get_running_loop
from inspect import isawaitable
from pathlib import Path
from typing import Awaitable, List
from typing import List

import psutil
from torequests.utils import get_readable_size
Expand Down
114 changes: 85 additions & 29 deletions ichrome/pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import random
import time
from tkinter import EXCEPTION
import typing
from base64 import b64decode
from copy import deepcopy
Expand All @@ -15,7 +14,7 @@
class ChromeTask(asyncio.Future):
"""ExpireFuture"""
_ID = 0
MAX_TIMEOUT = 60
MAX_TIMEOUT = 60 * 5
MAX_TRIES = 5
EXEC_GLOBALS: typing.Dict[str, typing.Any] = {}
STOP_SIG = object()
Expand Down Expand Up @@ -64,11 +63,16 @@ async def run(self, tab: AsyncTab):
result = None
try:
result = await self._running_task
self.set_result(result)
except ChromeException as error:
raise error
except Exception as error:
logger.error(f'{self} catch an error while running task, {error!r}')
finally:
if self._state == 'PENDING':
self.set_result(result)
self.set_result(result)

def set_result(self, result):
if self._state == 'PENDING':
super().set_result(result)

@classmethod
def get_id(cls):
Expand Down Expand Up @@ -185,42 +189,66 @@ async def future_consumer(self, index=None):
self._need_restart.set()
await self._chrome_daemon_ready.wait()
future: ChromeTask = await self.q.get()
logger.info(f'{self} get a new task {future}.')
if future.data is ChromeTask.STOP_SIG:
await self.q.put(future)
break
if future.done() or future.expire_time < time.time():
# overdue task, skip
continue
if await self.chrome_daemon._check_chrome_connection():
# should not auto_close for int index (existing tab).
auto_close = not isinstance(future.tab_index, int)
async with self.chrome_daemon.connect_tab(
index=future.tab_index, auto_close=True) as tab:
try:
self._running_futures.add(future)
await future.run(tab)
continue
except ChromeEngine.ERRORS_NOT_HANDLED as error:
raise error
except asyncio.CancelledError:
continue
except ChromeException as error:
logger.error(f'{self} restarting for error {error!r}')
if not self._need_restart.is_set():
self._need_restart.set()
except Exception as error:
logger.error(
f'{self} catch an error {error!r} for {future}')
finally:
self._running_futures.discard(future)
if not future.done():
# retry
future.cancel_task()
await self.q.put(future)
index=future.tab_index, auto_close=auto_close) as tab:
if isinstance(future.data, _TabWorker):
await self.handle_tab_worker_future(tab, future)
else:
await self.handle_default_future(tab, future)
else:
if not self._need_restart.is_set():
self._need_restart.set()
await self.q.put(future)
return f'{self} future_consumer[{index}] done.'

async def handle_tab_worker_future(self, tab, future):
try:
tab_worker: _TabWorker = future.data
tab_worker.tab_future.set_result(tab)
return await asyncio.wait_for(tab_worker._done.wait(),
timeout=future.timeout)
except (asyncio.CancelledError, asyncio.TimeoutError):
return
except ChromeException as error:
logger.error(f'{self} restarting for error {error!r}')
if not self._need_restart.is_set():
self._need_restart.set()
finally:
logger.info(f'[finished]({self.todos}) {future}')
del future

async def handle_default_future(self, tab, future):
try:
self._running_futures.add(future)
await future.run(tab)
except ChromeEngine.ERRORS_NOT_HANDLED as error:
raise error
except asyncio.CancelledError:
pass
except ChromeException as error:
logger.error(f'{self} restarting for error {error!r}')
if not self._need_restart.is_set():
self._need_restart.set()
except Exception as error:
# other errors may give a retry
logger.error(f'{self} catch an error {error!r} for {future}')
finally:
self._running_futures.discard(future)
if not future.done():
# retry
future.cancel_task()
await self.q.put(future)

def start_tab_worker(self):
asyncio.create_task(self._start_chrome_daemon())

Expand All @@ -245,7 +273,7 @@ def __repr__(self) -> str:

class ChromeEngine:
START_PORT = 9345
DEFAULT_WORKERS_AMOUNT = 2
DEFAULT_WORKERS_AMOUNT = 1
ERRORS_NOT_HANDLED = (KeyboardInterrupt,)
SHORTEN_DATA_LENGTH = 150

Expand Down Expand Up @@ -305,10 +333,10 @@ async def do(self,
tab_callback,
timeout=timeout,
tab_index=tab_index)
await self.q.put(future)
logger.info(
f'[enqueue]({self.todos}) {future} with timeout={timeout}, tab_index={tab_index}, data={self.shorten_data(data)}'
)
await self.q.put(future)
try:
return await asyncio.wait_for(future, timeout=future.timeout)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -404,6 +432,34 @@ async def js(self,
timeout=timeout,
tab_index=None)

def connect_tab(self, tab_index=None, timeout: float = None):
data = _TabWorker()
future = ChromeTask(data, timeout=timeout, tab_index=tab_index)
logger.info(
f'[enqueue]({self.todos}) {future} with timeout={timeout}, tab_index={tab_index}, data={self.shorten_data(data)}'
)
self.q.put_nowait(future)
return data


class _TabWorker:
"""
Used with `async with` context for ChromeEngine.
"""

def __init__(self):
pass

async def __aenter__(self) -> AsyncTab:
self._done = asyncio.Event()
self.tab_future: typing.Any = asyncio.Future()
# waiting for a tab
await self.tab_future
return self.tab_future.result()

async def __aexit__(self, *_):
self._done.set()


class CommonUtils:
"""Some frequently-used callback functions."""
Expand Down

0 comments on commit aad79cf

Please sign in to comment.