Skip to content

Commit ec8592b

Browse files
authored
Fix up Python 3.8 loop argument warnings (#246)
* Fix up Python 3.8 loop argument warnings * Remove all "loop=self.loop" expressions. * Rely on the currently running loop in the constructor of Queue. * Assuming that janus.Queue objects are created in the functions or coroutines called by the event loop, rewrite most test cases to be async. - No longer manage the event loop lifecycles by ourselves. - Adopt pytest-asyncio to seamlessly run test cases in an event loop. * Add missing .close() / .wait_closed() calls to the end of many test cases to ensure proper termination of the queues. * Insert asyncio.sleep(0) in the wait_closed() method so that all task-done callbacks for tasks spawned by _notify_async_not_empty(), _notify_async_not_full() internal methods are properly awaited. This eliminates hundreds of resource warnings after finishing the test suite. * Ensure dropping of Python 3.3/3.4 in CI configs. * Add Python 3.7 and 3.8 to CI configs. * Oops * Let tox install pytest-asyncio * Remove PY_33/PY_35 conditional branches as we no longer support Python 3.4 or older versions. * Update gitignore * Let requirements-dev.txt to include pytest-asyncio * Fix up errors for Python 3.5 and 3.6 * Fix too-long-line error * Add changelog and update README
1 parent 2543af6 commit ec8592b

File tree

11 files changed

+427
-301
lines changed

11 files changed

+427
-301
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@ target/
5858

5959
coverage
6060

61-
.pytest_cache
61+
.pytest_cache/
62+
.mypy_cache/
63+
64+
# pyenv
65+
.python-version

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ language: python
33
python:
44
- "3.5"
55
- "3.6"
6+
- "3.7"
7+
- "3.8"
68

79

810
install:

CHANGES.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
Changes
22
=======
33

4+
to be released
5+
--------------
6+
7+
- Remove explicit loop arguments and forbid creating queues outside event loops #246
8+
49
0.4.0 (2018-07-28)
510
------------------
611

README.rst

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,50 @@ Synchronous is fully compatible with `standard queue
2626
follows `asyncio queue design
2727
<https://docs.python.org/3/library/asyncio-queue.html>`_.
2828

29-
Usage example
30-
=============
29+
Usage example (Python 3.7+)
30+
===========================
31+
32+
.. code:: python
33+
34+
import asyncio
35+
import janus
36+
37+
38+
def threaded(sync_q):
39+
for i in range(100):
40+
sync_q.put(i)
41+
sync_q.join()
42+
43+
44+
async def async_coro(async_q):
45+
for i in range(100):
46+
val = await async_q.get()
47+
assert val == i
48+
async_q.task_done()
49+
50+
51+
async def main():
52+
queue = janus.Queue()
53+
loop = asyncio.get_running_loop()
54+
fut = loop.run_in_executor(None, threaded, queue.sync_q)
55+
await async_coro(queue.async_q)
56+
await fut
57+
queue.close()
58+
await queue.wait_closed()
59+
60+
61+
asyncio.run(main())
62+
63+
64+
Usage example (Python 3.5 and 3.6)
65+
==================================
3166

3267
.. code:: python
3368
3469
import asyncio
3570
import janus
3671
3772
loop = asyncio.get_event_loop()
38-
queue = janus.Queue(loop=loop)
3973
4074
4175
def threaded(sync_q):
@@ -51,9 +85,18 @@ Usage example
5185
async_q.task_done()
5286
5387
54-
fut = loop.run_in_executor(None, threaded, queue.sync_q)
55-
loop.run_until_complete(async_coro(queue.async_q))
56-
loop.run_until_complete(fut)
88+
async def main():
89+
queue = janus.Queue()
90+
fut = loop.run_in_executor(None, threaded, queue.sync_q)
91+
await async_coro(queue.async_q)
92+
await fut
93+
queue.close()
94+
await queue.wait_closed()
95+
96+
try:
97+
loop.run_until_complete(main())
98+
finally:
99+
loop.close()
57100
58101
59102
Communication channels

janus/__init__.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616

1717

1818
T = TypeVar('T')
19-
OptLoop = Optional[asyncio.AbstractEventLoop]
2019
OptInt = Optional[int]
2120

2221

23-
class Queue(Generic[T]):
24-
def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
25-
if loop is None:
26-
loop = asyncio.get_event_loop()
22+
current_loop = getattr(asyncio, 'get_running_loop', None)
23+
if current_loop is None:
24+
current_loop = asyncio.get_event_loop
25+
2726

28-
self._loop = loop # type: asyncio.AbstractEventLoop
27+
class Queue(Generic[T]):
28+
def __init__(self, maxsize: int = 0) -> None:
29+
self._loop = current_loop()
2930
self._maxsize = maxsize
3031

3132
self._init(maxsize)
@@ -37,12 +38,10 @@ def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
3738
self._sync_not_full = threading.Condition(self._sync_mutex)
3839
self._all_tasks_done = threading.Condition(self._sync_mutex)
3940

40-
self._async_mutex = asyncio.Lock(loop=self._loop)
41-
self._async_not_empty = asyncio.Condition(
42-
self._async_mutex, loop=self._loop)
43-
self._async_not_full = asyncio.Condition(
44-
self._async_mutex, loop=self._loop)
45-
self._finished = asyncio.Event(loop=self._loop)
41+
self._async_mutex = asyncio.Lock()
42+
self._async_not_empty = asyncio.Condition(self._async_mutex)
43+
self._async_not_full = asyncio.Condition(self._async_mutex)
44+
self._finished = asyncio.Event()
4645
self._finished.set()
4746

4847
self._closing = False
@@ -78,9 +77,14 @@ async def wait_closed(self) -> None:
7877
# so lock acquiring is not required
7978
if not self._closing:
8079
raise RuntimeError("Waiting for non-closed queue")
80+
# give execution chances for the task-done callbacks
81+
# of async tasks created inside
82+
# _notify_async_not_empty, _notify_async_not_full
83+
# methods.
84+
await asyncio.sleep(0)
8185
if not self._pending:
8286
return
83-
await asyncio.wait(self._pending, loop=self._loop)
87+
await asyncio.wait(self._pending)
8488

8589
@property
8690
def closed(self) -> bool:
@@ -143,7 +147,7 @@ async def f() -> None:
143147
self._async_not_empty.notify()
144148

145149
def task_maker() -> None:
146-
task = asyncio.ensure_future(f(), loop=self._loop)
150+
task = self._loop.create_task(f())
147151
task.add_done_callback(self._pending.discard)
148152
self._pending.add(task)
149153

@@ -158,7 +162,7 @@ async def f() -> None:
158162
self._async_not_full.notify()
159163

160164
def task_maker() -> None:
161-
task = asyncio.ensure_future(f(), loop=self._loop)
165+
task = self._loop.create_task(f())
162166
task.add_done_callback(self._pending.discard)
163167
self._pending.add(task)
164168

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ mypy==0.770
77
pyroma==2.6
88
pytest-cov==2.8.1
99
pytest==5.4.1
10+
pytest-asyncio==0.10.0
1011
tox==3.14.6
1112
wheel==0.34.2

setup.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
from setuptools.command.test import test as TestCommand
88

9-
PY_33 = sys.version_info < (3, 4)
10-
PY_35 = sys.version_info >= (3, 5)
11-
129

1310
class PyTest(TestCommand):
1411
user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")]
@@ -43,13 +40,10 @@ def read(f):
4340

4441
install_requires = []
4542

46-
if PY_33:
47-
install_requires.append('asyncio')
48-
49-
# if not PY_35:
50-
# install_requires.append('typing')
51-
52-
tests_require = install_requires + ['pytest']
43+
tests_require = install_requires + [
44+
'pytest>=5.4',
45+
'pytest-asyncio>=0.10.0',
46+
]
5347
extras_require = {}
5448

5549

@@ -66,6 +60,7 @@ def read(f):
6660
'Programming Language :: Python :: 3.5',
6761
'Programming Language :: Python :: 3.6',
6862
'Programming Language :: Python :: 3.7',
63+
'Programming Language :: Python :: 3.8',
6964
'Topic :: Software Development :: Libraries',
7065
'Framework :: AsyncIO',
7166
],

0 commit comments

Comments
 (0)