Skip to content

Commit 0a127bb

Browse files
committed
update some docs, comments, docstrings
1 parent 8585e5c commit 0a127bb

File tree

7 files changed

+119
-22
lines changed

7 files changed

+119
-22
lines changed

docs/src/docs/ApiReference/task.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ An asynchronous task cannot set `multiprocessing` as `True`
233233

234234
See some [considerations](../UserGuide/AdvancedConcepts#cpu-bound-work) for when to set this parameter.
235235

236+
Note, also, that normal Python multiprocessing restrictions apply:
237+
238+
* Only [picklable](https://docs.python.org/3/library/pickle.html#module-pickle) functions can be multiprocessed, which excludes certain types of functions like lambdas and closures.
239+
* Arguments and return values of multiprocessed tasks must also be picklable, which excludes objects like file handles, connections, and (on Windows) generators.
240+
236241
{: .text-beta}
237242
### `bind`
238243

docs/src/docs/UserGuide/AdvancedConcepts.md

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ IO-bound tasks benefit from both concurrent and parallel execution.
3939
However, to avoid the overhead costs of creating processes, it is generally preferable to use either threading or async code.
4040

4141
{: .info}
42-
Threads incur a higher overhead cost compared to async coroutines, but are suitable if your application prefers or requires a synchronous implementation
42+
Threads incur a higher overhead cost compared to async coroutines, but are suitable if the function / application prefers or requires a synchronous implementation
4343

4444
Note that asynchronous functions need to `await` or `yield` something in order to benefit from concurrency.
4545
Any long-running call in an async task which does not yield execution will prevent other tasks from making progress:
@@ -87,7 +87,7 @@ def long_computation(data: int):
8787
return data
8888
```
8989

90-
Note, however, that processes incur a very high overhead cost (performance in creation and memory in maintaining inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
90+
Note, however, that processes incur a very high overhead cost (performance cost in creation and memory cost in inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
9191

9292
### Summary
9393

@@ -101,7 +101,7 @@ Note, however, that processes incur a very high overhead cost (performance in cr
101101
{: .text-green-200}
102102
**Key Considerations:**
103103

104-
* If a task is doing extremely expensive CPU-bound work, define it synchronously and set `multiprocess=True`
104+
* If a task is doing expensive CPU-bound work, define it synchronously and set `multiprocess=True`
105105
* If a task is doing expensive IO-bound work, consider implementing it asynchronously, or use threads
106106
* Do _not_ put expensive, blocking work in an async task, as this clogs up the async event loop
107107

@@ -111,7 +111,7 @@ Note, however, that processes incur a very high overhead cost (performance in cr
111111

112112
Writing clean code is partly about defining functions with single, clear responsibilities.
113113

114-
In Pyper specifically, it is especially important to separate out different types of work into different tasks if we want to optimize their performance. For example, consider a task which performs an IO-bound network request along with a CPU-bound function to parse the data.
114+
In Pyper, it is especially important to separate out different types of work into different tasks if we want to optimize their performance. For example, consider a task which performs an IO-bound network request along with a CPU-bound function to parse the data.
115115

116116
```python
117117
# Bad -- functions not separated
@@ -165,10 +165,10 @@ When defining a pipeline, these additional arguments are plugged into tasks usin
165165
async def main():
166166
async with ClientSession("http://localhost:8000/api") as session:
167167
user_data_pipeline = (
168-
task(list_user_ids, branch=True, bind=task.bind(session=session))
168+
task(list_user_ids, branch=True)
169169
| task(fetch_user_data, workers=10, bind=task.bind(session=session))
170170
)
171-
async for output in user_data_pipeline():
171+
async for output in user_data_pipeline(session):
172172
print(output)
173173
```
174174

@@ -208,4 +208,90 @@ async def main():
208208
> copy_to_db
209209
)
210210
await run()
211-
```
211+
```
212+
213+
## Generators
214+
215+
### Usage
216+
217+
Generators in Python are a mechanism for _lazy execution_, whereby results in an iterable are returned one by one (via underlying calls to `__next__`) instead of within a data structure, like a `list`, which requires all of its elements to be allocated in memory.
218+
219+
Using generators is an indispensible approach for processing large volumes of data in a memory-friendly way. We can define generator functions by using the `yield` keyword within a normal `def` block:
220+
221+
```python
222+
import typing
223+
from pyper import task
224+
225+
# Okay
226+
@task(branch=True)
227+
def generate_values_lazily() -> typing.Iterable[dict]:
228+
for i in range(10_000_000):
229+
yield {"data": i}
230+
231+
# Bad -- this creates 10 million values in memory
232+
# Subsequent tasks also cannot start executing until the entire list is created
233+
@task(branch=True)
234+
def create_values_in_list() -> typing.List[dict]:
235+
return [{"data": i} for i in range(10_000_000)]
236+
```
237+
238+
{: .info}
239+
Generator `functions` return immediately. They return `generator` objects, which are iterable
240+
241+
Using the `branch` task parameter in Pyper allows generators to generate multiple outputs, which get picked up by subsequent tasks as soon as the data is available.
242+
243+
Using a generator function without `branch=True` is also possible; this just means the task submits `generator` objects as output, instead of each generated value.
244+
245+
```python
246+
from pyper import task
247+
248+
def get_data():
249+
yield 1
250+
yield 2
251+
yield 3
252+
253+
if __name__ == "__main__":
254+
branched_pipeline = task(get_data, branch=True)
255+
for output in branched_pipeline():
256+
print(output)
257+
# Prints:
258+
# 1
259+
# 2
260+
# 3
261+
262+
non_branched_pipeline = task(get_data)
263+
for output in non_branched_pipeline():
264+
print(output)
265+
# Prints:
266+
# <generator object get_data at ...>
267+
```
268+
269+
### Limitations
270+
271+
Implementing generator objects in a pipeline can also come with some caveats that are important to keep in mind.
272+
273+
{: .text-green-200}
274+
**Synchronous Generators with Asynchronous Code**
275+
276+
Synchronous generators in an `AsyncPipeline` do not benefit from threading or multiprocessing.
277+
278+
This is because, in order to be scheduled in an async event loop, each synchronous task is run by a thread/process, and then wrapped in an `asyncio.Task`.
279+
280+
Generator functions, which return _immediately_, do most of their work outside of the thread/process and this synchronous work will therefore not benefit from multiple workers in an async context.
281+
282+
The alternatives are to:
283+
284+
1. Use a synchronous generator anyway (if its performance is unlikely to be a bottleneck)
285+
286+
2. Use a normal synchronous function, and return an iterable data structure (if memory is unlikely to be a bottleneck)
287+
288+
3. Use an async generator (if an async implementation of the function is appropriate)
289+
290+
{: .text-green-200}
291+
**Multiprocessing and Pickling**
292+
293+
In Python, anything that goes into and comes out of a process must be picklable.
294+
295+
On Windows, generator objects cannot be pickled, so cannot be passed as inputs and outputs when multiprocessing.
296+
297+
Note that, for example, using `branch=True` to pass individual outputs from a generator into a multiprocessed task is still fine, because the task input would not be a `generator` object.

docs/src/docs/UserGuide/ComposingPipelines.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ if __name__ == "__main__":
7878
writer(pipeline(limit=10)) # Run
7979
```
8080

81-
The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes a data stream) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
81+
The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes an `Iterable` of inputs) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
8282
```python
8383
if __name__ == "__main__":
8484
run = step1 | step2 > JsonFileWriter("data.json")
@@ -105,26 +105,26 @@ For example, let's say we have a theoretical pipeline which takes `(source: str)
105105

106106
```python
107107
download_files_from_source = (
108-
task(list_files, branch=True)
109-
| task(download_file, workers=20)
110-
| task(decrypt_file, workers=5, multiprocess=True)
108+
task(list_files, branch=True) # Return a list of file info
109+
| task(download_file, workers=20) # Return a filepath
110+
| task(decrypt_file, workers=5, multiprocess=True) # Return a filepath
111111
)
112112
```
113113

114114
This is a function which generates multiple outputs per source. But we may wish to process _batches of filepaths_ downstream, after waiting for a single source to finish downloading. This means a piping approach, where we pass each _individual_ filepath along to subsequent tasks, won't work.
115115

116-
Instead, we can define a function to create a list of filepaths as `download_files_from_source > list`. This is now a composable function which can be used in an outer pipeline.
116+
Instead, we can define `download_files_from_source` as a task within an outer pipeline, which is as simple as wrapping it in `task` like we would with any other function.
117117

118118
```python
119119
download_and_merge_files = (
120-
task(get_sources, branch=True)
121-
| task(download_files_from_source > list)
122-
| task(merge_files, workers=5, multiprocess=True)
120+
task(get_sources, branch=True) # Return a list of sources
121+
| task(download_files_from_source) # Return a batch of filepaths (as a generator)
122+
| task(sync_files, workers=5) # Do something with each batch
123123
)
124124
```
125125

126-
* `download_files_from source > list` takes a source as input, downloads all files, and creates a list of filepaths as output.
127-
* `merge_files` takes a list of filepaths as input.
126+
* `download_files_from_source` takes a source as input, and returns a generator of filepaths (note that we are _not_ setting `branch=True`; a batch of filepaths is being passed along per source)
127+
* `sync_files` takes each batch of filepaths as input, and works on them concurrently
128128

129129
## Asynchronous Code
130130

docs/src/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ It is designed with the following goals in mind:
5252
* **Error Handling**: Data flows fail fast, even in long-running threads, and propagate their errors cleanly
5353
* **Complex Data Flows**: Data pipelines support branching/joining data flows, as well as sharing contexts/resources between tasks
5454

55-
In addition, Pyper provides an extensible way to write code that can be integrated with other frameworks like those aforementioned.
55+
In addition, Pyper enables developers to write code in an extensible way that can be integrated naturally with other frameworks like those aforementioned.
5656

5757
## Installation
5858

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ dependencies = [
3636
]
3737

3838
[project.urls]
39+
Homepage = "https://pyper-dev.github.io/pyper/"
3940
Documentation = "https://pyper-dev.github.io/pyper/"
4041
Repository = "https://github.com/pyper-dev/pyper"
4142
Issues = "https://github.com/pyper-dev/pyper/issues"

src/pyper/_core/async_helper/queue_io.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from collections.abc import Iterable
3+
from collections.abc import AsyncIterable, Iterable
44
from typing import TYPE_CHECKING
55

66
from ..util.sentinel import StopSentinel
@@ -61,10 +61,11 @@ async def __call__(self, *args, **kwargs):
6161

6262
class _BranchingAsyncEnqueue(_AsyncEnqueue):
6363
async def __call__(self, *args, **kwargs):
64-
if self.task.is_gen:
65-
async for output in self.task.func(*args, **kwargs):
64+
result = self.task.func(*args, **kwargs)
65+
if isinstance(result, AsyncIterable):
66+
async for output in result:
6667
await self.q_out.put(output)
67-
elif isinstance(result := await self.task.func(*args, **kwargs), Iterable):
68+
elif isinstance(result := await result, Iterable):
6869
for output in result:
6970
await self.q_out.put(output)
7071
else:

src/pyper/_core/util/asynchronize.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ def ascynchronize(task: Task, tp: ThreadPoolExecutor, pp: ProcessPoolExecutor) -
1818
return task
1919

2020
if task.is_gen and task.branch:
21+
# Small optimization to convert sync generators to async generators
22+
# This saves from having to use a thread/process just to get the generator object
23+
# We also add asyncio.sleep(0) to unblock long synchronous generators
2124
@functools.wraps(task.func)
2225
async def wrapper(*args, **kwargs):
2326
for output in task.func(*args, **kwargs):
2427
yield output
28+
await asyncio.sleep(0)
2529
else:
2630
executor = pp if task.multiprocess else tp
2731
@functools.wraps(task.func)

0 commit comments

Comments
 (0)