Skip to content

Commit 144517c

Browse files
authored
Merge pull request #15 from pyper-dev/dev
Safer multiprocessing implementation
2 parents e7e8edb + ec0201f commit 144517c

File tree

14 files changed

+190
-135
lines changed

14 files changed

+190
-135
lines changed

docs/src/docs/ApiReference/task.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ An asynchronous task cannot set `multiprocessing` as `True`
233233

234234
See some [considerations](../UserGuide/AdvancedConcepts#cpu-bound-work) for when to set this parameter.
235235

236+
Note, also, that normal Python multiprocessing restrictions apply:
237+
238+
* Only [picklable](https://docs.python.org/3/library/pickle.html#module-pickle) functions can be multiprocessed, which excludes certain types of functions like lambdas and closures.
239+
* Arguments and return values of multiprocessed tasks must also be picklable, which excludes objects like file handles, connections, and (on Windows) generators.
240+
236241
{: .text-beta}
237242
### `bind`
238243

docs/src/docs/UserGuide/AdvancedConcepts.md

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ IO-bound tasks benefit from both concurrent and parallel execution.
3939
However, to avoid the overhead costs of creating processes, it is generally preferable to use either threading or async code.
4040

4141
{: .info}
42-
Threads incur a higher overhead cost compared to async coroutines, but are suitable if your application prefers or requires a synchronous implementation
42+
Threads incur a higher overhead cost compared to async coroutines, but are suitable if the function / application prefers or requires a synchronous implementation
4343

4444
Note that asynchronous functions need to `await` or `yield` something in order to benefit from concurrency.
4545
Any long-running call in an async task which does not yield execution will prevent other tasks from making progress:
@@ -87,7 +87,7 @@ def long_computation(data: int):
8787
return data
8888
```
8989

90-
Note, however, that processes incur a very high overhead cost (performance in creation and memory in maintaining inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
90+
Note, however, that processes incur a very high overhead cost (performance cost in creation and memory cost in inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
9191

9292
### Summary
9393

@@ -101,7 +101,7 @@ Note, however, that processes incur a very high overhead cost (performance in cr
101101
{: .text-green-200}
102102
**Key Considerations:**
103103

104-
* If a task is doing extremely expensive CPU-bound work, define it synchronously and set `multiprocess=True`
104+
* If a task is doing expensive CPU-bound work, define it synchronously and set `multiprocess=True`
105105
* If a task is doing expensive IO-bound work, consider implementing it asynchronously, or use threads
106106
* Do _not_ put expensive, blocking work in an async task, as this clogs up the async event loop
107107

@@ -111,7 +111,7 @@ Note, however, that processes incur a very high overhead cost (performance in cr
111111

112112
Writing clean code is partly about defining functions with single, clear responsibilities.
113113

114-
In Pyper specifically, it is especially important to separate out different types of work into different tasks if we want to optimize their performance. For example, consider a task which performs an IO-bound network request along with a CPU-bound function to parse the data.
114+
In Pyper, it is especially important to separate out different types of work into different tasks if we want to optimize their performance. For example, consider a task which performs an IO-bound network request along with a CPU-bound function to parse the data.
115115

116116
```python
117117
# Bad -- functions not separated
@@ -165,10 +165,10 @@ When defining a pipeline, these additional arguments are plugged into tasks usin
165165
async def main():
166166
async with ClientSession("http://localhost:8000/api") as session:
167167
user_data_pipeline = (
168-
task(list_user_ids, branch=True, bind=task.bind(session=session))
168+
task(list_user_ids, branch=True)
169169
| task(fetch_user_data, workers=10, bind=task.bind(session=session))
170170
)
171-
async for output in user_data_pipeline():
171+
async for output in user_data_pipeline(session):
172172
print(output)
173173
```
174174

@@ -208,4 +208,90 @@ async def main():
208208
> copy_to_db
209209
)
210210
await run()
211-
```
211+
```
212+
213+
## Generators
214+
215+
### Usage
216+
217+
Generators in Python are a mechanism for _lazy execution_, whereby results in an iterable are returned one by one (via underlying calls to `__next__`) instead of within a data structure, like a `list`, which requires all of its elements to be allocated in memory.
218+
219+
Using generators is an indispensible approach for processing large volumes of data in a memory-friendly way. We can define generator functions by using the `yield` keyword within a normal `def` block:
220+
221+
```python
222+
import typing
223+
from pyper import task
224+
225+
# Okay
226+
@task(branch=True)
227+
def generate_values_lazily() -> typing.Iterable[dict]:
228+
for i in range(10_000_000):
229+
yield {"data": i}
230+
231+
# Bad -- this creates 10 million values in memory
232+
# Subsequent tasks also cannot start executing until the entire list is created
233+
@task(branch=True)
234+
def create_values_in_list() -> typing.List[dict]:
235+
return [{"data": i} for i in range(10_000_000)]
236+
```
237+
238+
{: .info}
239+
Generator `functions` return immediately. They return `generator` objects, which are iterable
240+
241+
Using the `branch` task parameter in Pyper allows generators to generate multiple outputs, which get picked up by subsequent tasks as soon as the data is available.
242+
243+
Using a generator function without `branch=True` is also possible; this just means the task submits `generator` objects as output, instead of each generated value.
244+
245+
```python
246+
from pyper import task
247+
248+
def get_data():
249+
yield 1
250+
yield 2
251+
yield 3
252+
253+
if __name__ == "__main__":
254+
branched_pipeline = task(get_data, branch=True)
255+
for output in branched_pipeline():
256+
print(output)
257+
# Prints:
258+
# 1
259+
# 2
260+
# 3
261+
262+
non_branched_pipeline = task(get_data)
263+
for output in non_branched_pipeline():
264+
print(output)
265+
# Prints:
266+
# <generator object get_data at ...>
267+
```
268+
269+
### Limitations
270+
271+
Implementing generator objects in a pipeline can also come with some caveats that are important to keep in mind.
272+
273+
{: .text-green-200}
274+
**Synchronous Generators with Asynchronous Code**
275+
276+
Synchronous generators in an `AsyncPipeline` do not benefit from threading or multiprocessing.
277+
278+
This is because, in order to be scheduled in an async event loop, each synchronous task is run by a thread/process, and then wrapped in an `asyncio.Task`.
279+
280+
Generator functions, which return _immediately_, do most of their work outside of the thread/process and this synchronous work will therefore not benefit from multiple workers in an async context.
281+
282+
The alternatives are to:
283+
284+
1. Use a synchronous generator anyway (if its performance is unlikely to be a bottleneck)
285+
286+
2. Use a normal synchronous function, and return an iterable data structure (if memory is unlikely to be a bottleneck)
287+
288+
3. Use an async generator (if an async implementation of the function is appropriate)
289+
290+
{: .text-green-200}
291+
**Multiprocessing and Pickling**
292+
293+
In Python, anything that goes into and comes out of a process must be picklable.
294+
295+
On Windows, generator objects cannot be pickled, so cannot be passed as inputs and outputs when multiprocessing.
296+
297+
Note that, for example, using `branch=True` to pass individual outputs from a generator into a multiprocessed task is still fine, because the task input would not be a `generator` object.

docs/src/docs/UserGuide/ComposingPipelines.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ if __name__ == "__main__":
7878
writer(pipeline(limit=10)) # Run
7979
```
8080

81-
The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes a data stream) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
81+
The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes an `Iterable` of inputs) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
8282
```python
8383
if __name__ == "__main__":
8484
run = step1 | step2 > JsonFileWriter("data.json")
@@ -105,26 +105,26 @@ For example, let's say we have a theoretical pipeline which takes `(source: str)
105105

106106
```python
107107
download_files_from_source = (
108-
task(list_files, branch=True)
109-
| task(download_file, workers=20)
110-
| task(decrypt_file, workers=5, multiprocess=True)
108+
task(list_files, branch=True) # Return a list of file info
109+
| task(download_file, workers=20) # Return a filepath
110+
| task(decrypt_file, workers=5, multiprocess=True) # Return a filepath
111111
)
112112
```
113113

114114
This is a function which generates multiple outputs per source. But we may wish to process _batches of filepaths_ downstream, after waiting for a single source to finish downloading. This means a piping approach, where we pass each _individual_ filepath along to subsequent tasks, won't work.
115115

116-
Instead, we can define a function to create a list of filepaths as `download_files_from_source > list`. This is now a composable function which can be used in an outer pipeline.
116+
Instead, we can define `download_files_from_source` as a task within an outer pipeline, which is as simple as wrapping it in `task` like we would with any other function.
117117

118118
```python
119119
download_and_merge_files = (
120-
task(get_sources, branch=True)
121-
| task(download_files_from_source > list)
122-
| task(merge_files, workers=5, multiprocess=True)
120+
task(get_sources, branch=True) # Return a list of sources
121+
| task(download_files_from_source) # Return a batch of filepaths (as a generator)
122+
| task(sync_files, workers=5) # Do something with each batch
123123
)
124124
```
125125

126-
* `download_files_from source > list` takes a source as input, downloads all files, and creates a list of filepaths as output.
127-
* `merge_files` takes a list of filepaths as input.
126+
* `download_files_from_source` takes a source as input, and returns a generator of filepaths (note that we are _not_ setting `branch=True`; a batch of filepaths is being passed along per source)
127+
* `sync_files` takes each batch of filepaths as input, and works on them concurrently
128128

129129
## Asynchronous Code
130130

docs/src/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ It is designed with the following goals in mind:
5252
* **Error Handling**: Data flows fail fast, even in long-running threads, and propagate their errors cleanly
5353
* **Complex Data Flows**: Data pipelines support branching/joining data flows, as well as sharing contexts/resources between tasks
5454

55-
In addition, Pyper provides an extensible way to write code that can be integrated with other frameworks like those aforementioned.
55+
In addition, Pyper enables developers to write code in an extensible way that can be integrated naturally with other frameworks like those aforementioned.
5656

5757
## Installation
5858

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ dependencies = [
3636
]
3737

3838
[project.urls]
39+
Homepage = "https://pyper-dev.github.io/pyper/"
3940
Documentation = "https://pyper-dev.github.io/pyper/"
4041
Repository = "https://github.com/pyper-dev/pyper"
4142
Issues = "https://github.com/pyper-dev/pyper/issues"

src/pyper/_core/async_helper/queue_io.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from collections.abc import Iterable
3+
from collections.abc import AsyncIterable, Iterable
44
from typing import TYPE_CHECKING
55

66
from ..util.sentinel import StopSentinel
@@ -61,10 +61,11 @@ async def __call__(self, *args, **kwargs):
6161

6262
class _BranchingAsyncEnqueue(_AsyncEnqueue):
6363
async def __call__(self, *args, **kwargs):
64-
if self.task.is_gen:
65-
async for output in self.task.func(*args, **kwargs):
64+
result = self.task.func(*args, **kwargs)
65+
if isinstance(result, AsyncIterable):
66+
async for output in result:
6667
await self.q_out.put(output)
67-
elif isinstance(result := await self.task.func(*args, **kwargs), Iterable):
68+
elif isinstance(result := await result, Iterable):
6869
for output in result:
6970
await self.q_out.put(output)
7071
else:

src/pyper/_core/async_helper/stage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
class AsyncProducer:
2020
def __init__(self, task: Task, next_task: Task):
2121
if task.workers > 1:
22-
raise RuntimeError(f"The first task in a pipeline ({task.func.__qualname__}) cannot have more than 1 worker")
22+
raise RuntimeError(f"The first task in a pipeline ({task.func}) cannot have more than 1 worker")
2323
if task.join:
24-
raise RuntimeError(f"The first task in a pipeline ({task.func.__qualname__}) cannot join previous results")
24+
raise RuntimeError(f"The first task in a pipeline ({task.func}) cannot join previous results")
2525
self.task = task
2626
self.q_out = asyncio.Queue(maxsize=task.throttle)
2727

src/pyper/_core/sync_helper/output.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,31 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING
4-
import queue
3+
from typing import TYPE_CHECKING, Union
54

65
from .stage import Producer, ProducerConsumer
76
from ..util.sentinel import StopSentinel
87
from ..util.worker_pool import ProcessPool, ThreadPool
98

109
if TYPE_CHECKING:
10+
import multiprocessing as mp
11+
import queue
1112
from ..pipeline import Pipeline
1213

1314

1415
class PipelineOutput:
1516
def __init__(self, pipeline: Pipeline):
1617
self.pipeline = pipeline
1718

18-
def _get_q_out(self, tp: ThreadPool, pp: ProcessPool, *args, **kwargs) -> queue.Queue:
19+
def _get_q_out(self, tp: ThreadPool, pp: ProcessPool, *args, **kwargs) -> Union[mp.Queue, queue.Queue]:
1920
"""Feed forward each stage to the next, returning the output queue of the final stage."""
2021
q_out = None
2122
for task, next_task in zip(self.pipeline.tasks, self.pipeline.tasks[1:] + [None]):
2223
pool = pp if task.multiprocess else tp
2324
if q_out is None:
24-
stage = Producer(task=task, next_task=next_task, q_err=pool.error_queue, shutdown_event=pool.shutdown_event)
25+
stage = Producer(task=task, next_task=next_task, manager=pp.manager, shutdown_event=pool.shutdown_event)
2526
stage.start(pool, *args, **kwargs)
2627
else:
27-
stage = ProducerConsumer(q_in=q_out, task=task, next_task=next_task, q_err=pool.error_queue, shutdown_event=pool.shutdown_event)
28+
stage = ProducerConsumer(q_in=q_out, task=task, next_task=next_task, manager=pp.manager, shutdown_event=pool.shutdown_event)
2829
stage.start(pool)
2930
q_out = stage.q_out
3031

@@ -34,18 +35,10 @@ def __call__(self, *args, **kwargs):
3435
"""Iterate through the pipeline, taking the inputs to the first task, and yielding each output from the last task."""
3536
with ThreadPool() as tp, ProcessPool() as pp:
3637
q_out = self._get_q_out(tp, pp, *args, **kwargs)
37-
while True:
38-
try:
39-
# Use the timeout strategy for unblocking main thread without busy waiting
40-
if (data := q_out.get(timeout=0.1)) is StopSentinel:
41-
tp.raise_error_if_exists()
42-
pp.raise_error_if_exists()
43-
break
38+
try:
39+
while (data := q_out.get()) is not StopSentinel:
4440
yield data
45-
except queue.Empty:
46-
tp.raise_error_if_exists()
47-
pp.raise_error_if_exists()
48-
except (KeyboardInterrupt, SystemExit): # pragma: no cover
49-
tp.shutdown_event.set()
50-
pp.shutdown_event.set()
51-
raise
41+
except (KeyboardInterrupt, SystemExit): # pragma: no cover
42+
tp.shutdown_event.set()
43+
pp.shutdown_event.set()
44+
raise

src/pyper/_core/sync_helper/queue_io.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ def __call__(self):
3030

3131

3232
class _SingleDequeue(_Dequeue):
33-
def __call__(self):
34-
for data in self._input_stream():
33+
def __call__(self):
34+
for data in self._input_stream():
3535
yield data
3636

3737

3838
class _JoiningDequeue(_Dequeue):
39-
def __call__(self):
39+
def __call__(self):
4040
yield self._input_stream()
4141

4242

@@ -56,12 +56,12 @@ def __call__(self, *args, **kwargs):
5656

5757

5858
class _SingleEnqueue(_Enqueue):
59-
def __call__(self, *args, **kwargs):
59+
def __call__(self, *args, **kwargs):
6060
self.q_out.put(self.task.func(*args, **kwargs))
6161

6262

6363
class _BranchingEnqueue(_Enqueue):
64-
def __call__(self, *args, **kwargs):
64+
def __call__(self, *args, **kwargs):
6565
if isinstance(result := self.task.func(*args, **kwargs), Iterable):
6666
for output in result:
6767
self.q_out.put(output)

0 commit comments

Comments
 (0)