Skip to content

Commit cd959bb

Browse files
committed
update readme
1 parent b893e53 commit cd959bb

File tree

1 file changed

+62
-171
lines changed

1 file changed

+62
-171
lines changed

README.md

+62-171
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222

2323
---
2424

25-
Pyper is a generalized framework for concurrent data-processing, based on functional programming patterns. Used for 🌐 **Data Collection**, 🔀 **ETL systems**, and general-purpose 🛠️ **Python Scripting**
25+
Pyper is a comprehensive framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🌐 **Data Collection**, 🔀 **ETL Systems**, and general-purpose 🛠️ **Python Scripting**
2626

2727
See the [Documentation](https://pyper-dev.github.io/pyper/)
2828

2929
Key features:
3030

31-
* 💡**Intuitive API**: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded and asynchronous work.
31+
* 💡**Intuitive API**: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
3232
* 🚀 **Functional Paradigm**: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
33-
* 🛡️ **Safety**: Hides the heavy lifting of underlying task creation and execution. No more worrying about race conditions, memory leaks, and thread-level error handling.
33+
* 🛡️ **Safety**: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.
3434
***Efficiency**: Designed from the ground up for lazy execution, using queues, workers, and generators.
3535
***Pure Python**: Lightweight, with zero sub-dependencies.
3636

@@ -46,235 +46,124 @@ Note that `python-pyper` is the [pypi](https://pypi.org/project/python-pyper) re
4646

4747
## Usage
4848

49+
In Pyper, the `task` decorator is used to transform functions into composable pipelines.
50+
4951
Let's simulate a pipeline that performs a series of transformations on some data.
5052

5153
```python
5254
import asyncio
5355
import time
54-
from typing import AsyncIterable
5556

5657
from pyper import task
5758

5859

59-
def step1(limit: int):
60-
"""Generate some data."""
60+
def get_data(limit: int):
6161
for i in range(limit):
6262
yield i
6363

6464

65-
async def step2(data: int):
66-
"""Simulate some asynchronous work."""
65+
async def step1(data: int):
6766
await asyncio.sleep(1)
68-
print("Finished async sleep")
69-
return data + 1
67+
print("Finished async wait", data)
68+
return data
7069

7170

72-
def step3(data: int):
73-
"""Simulate some IO-bound (non awaitable) work."""
71+
def step2(data: int):
7472
time.sleep(1)
75-
print("Finished sync sleep")
76-
return 2 * data - 1
73+
print("Finished sync wait", data)
74+
return data
7775

7876

79-
async def print_sum(data: AsyncIterable[int]):
80-
"""Print the sum of values from a data stream."""
81-
total = 0
82-
async for output in data:
83-
total += output
84-
print("Total ", total)
77+
def step3(data: int):
78+
for i in range(10_000_000):
79+
_ = i*i
80+
print("Finished heavy computation", data)
81+
return data
8582

8683

8784
async def main():
8885
# Define a pipeline of tasks using `pyper.task`
89-
run = task(step1) | task(step2, concurrency=20) | task(step3, concurrency=20) > print_sum
90-
await run(limit=20)
91-
92-
93-
if __name__ == "__main__":
94-
asyncio.run(main()) # takes ~2 seconds
95-
```
96-
97-
Pyper provides an elegant abstraction of the concurrent execution of each function via `pyper.task`, allowing you to focus on building out the **logical** functions of your program.
98-
99-
In our pipeline:
100-
101-
* `task(step1)` generates 20 data values
102-
103-
* `task(step2, concurrency=20)` spins up 20 asynchronous workers, taking each value as input and returning an output
104-
105-
* `task(step3, concurrency=20)` spins up 20 threaded workers, taking each value as input and returning an output
106-
107-
The script therefore takes ~2 seconds to complete, as `step2` and `step3` in the pipeline only take the 1 second of sleep time, performed concurrently. If you'd like, experiment with tweaking the `limit` and `concurrency` values for yourself.
108-
109-
---
110-
111-
<details markdown="1">
112-
<summary><u>What does the logic translate to in non-concurrent code?</u></summary>
113-
114-
<br>
115-
116-
Having defined the logical operations we want to perform on our data as functions, all we are doing is piping the output of one function to the input of another. In sequential code, this could look like:
117-
118-
```python
119-
# Analogous to:
120-
# pipeline = task(step1) | task(step2) | task(step3)
121-
async def pipeline(limit: int):
122-
for data in step1(limit):
123-
data = await step2(data)
124-
data = step3(data)
125-
yield data
86+
pipeline = task(get_data, branch=True) \
87+
| task(step1, workers=20) \
88+
| task(step2, workers=20) \
89+
| task(step3, workers=20, multiprocess=True)
12690

127-
128-
# Analogous to:
129-
# run = pipeline > print_sum
130-
async def run(limit: int):
131-
await print_sum(pipeline(limit))
91+
# Call the pipeline
92+
total = 0
93+
async for output in pipeline(limit=20):
94+
total += output
95+
print("Total:", total)
13296

13397

134-
async def main():
135-
await run(20) # takes ~40 seconds
98+
if __name__ == "__main__":
99+
asyncio.run(main())
136100
```
137101

138-
Pyper uses the `|` (motivated by Unix's pipe operator) syntax as a representation of this input-output piping between tasks.
102+
Pyper provides an elegant abstraction of the execution of each function via `pyper.task`, allowing you to focus on building out the **logical** functions of your program. In the `main` function:
139103

140-
</details>
104+
* `pipeline` defines a function; this takes the parameters of its first task (`get_data`) and yields each output from its last task (`step3`)
105+
* Tasks are piped together using the `|` operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks.
141106

142-
<details markdown="1">
143-
<summary><u>What would the implementation look like without Pyper?</u></summary>
107+
In the pipeline, we are executing three different types of work:
144108

145-
<br>
146-
147-
Concurrent programming in Python is notoriously difficult to get right. In a concurrent data pipeline, some challenges are:
148-
149-
* We want producers to concurrently execute tasks and send results to the next stage as soon as it's done processing
150-
* We want consumers to lazily pick up output as soon as it's available from the previous stage
151-
* We need to somehow unify the execution of threads and coroutines, without letting non-awaitable tasks clog up the event-loop
109+
* `task(step1, workers=20)` spins up 20 `asyncio.Task`s to handle asynchronous IO-bound work
152110

153-
The basic approach to doing this is by using queues-- a simplified and very unabstracted implementation could be:
111+
* `task(step2, workers=20)` spins up 20 `threads` to handle synchronous IO-bound work
154112

155-
```python
156-
async def pipeline(limit: int):
157-
q1 = asyncio.Queue()
158-
q2 = asyncio.Queue()
159-
q3 = asyncio.Queue()
160-
161-
step2_concurrency=20
162-
step3_concurrency=20
163-
164-
async def worker1():
165-
for data in step1(limit):
166-
await q1.put(data)
167-
for _ in range(step2_concurrency):
168-
await q1.put(None)
169-
170-
worker2s_finished = 0
171-
async def worker2():
172-
nonlocal worker2s_finished
173-
while True:
174-
data = await q1.get()
175-
if data is None:
176-
break
177-
output = await step2(data)
178-
await q2.put(output)
179-
worker2s_finished += 1
180-
if worker2s_finished == step2_concurrency:
181-
for _ in range(step3_concurrency):
182-
await q2.put(None)
183-
184-
worker3s_finished = 0
185-
async def worker3():
186-
nonlocal worker3s_finished
187-
loop = asyncio.get_running_loop()
188-
while True:
189-
data = await q2.get()
190-
if data is None:
191-
break
192-
# Pyper uses a custom thread group handler instead of run_in_executor
193-
output = await loop.run_in_executor(None, step3, data)
194-
await q3.put(output)
195-
worker3s_finished += 1
196-
if worker3s_finished == step3_concurrency:
197-
await q3.put(None)
198-
199-
async with asyncio.TaskGroup() as tg:
200-
# Start all workers in the background
201-
tg.create_task(worker1())
202-
for _ in range(step2_concurrency):
203-
tg.create_task(worker2())
204-
for _ in range(step3_concurrency):
205-
tg.create_task(worker3())
206-
# Yield data until all workers have stopped
207-
while True:
208-
data = await q3.get()
209-
if data is None:
210-
break
211-
yield data
212-
213-
214-
async def run(limit: int):
215-
await print_sum(pipeline(limit))
113+
* `task(step3, workers=20, multiprocess=True)` spins up 20 `processes` to handle synchronous CPU-bound work
216114

115+
`task` acts as one intuitive API for unifying the execution of each different type of function.
217116

218-
async def main():
219-
await run(20) # takes ~2 seconds
220-
```
117+
Each task submits their outputs to the next task within the pipeline via queue-based data structures, which is the mechanism underpinning how concurrency and parallelism are achieved. See the [docs](https://pyper-dev.github.io/pyper/docs/UserGuide/BasicConcepts) for a breakdown of what a pipeline looks like under the hood.
221118

222-
This implementation achieves the basic desired concurrent data flow, but still lacks some quality-of-life features that Pyper takes care of, like error handling within threads.
223-
224-
Pyper handles the complexities of managing queues and workers, so that this code can be reduced to the two-line main function in the example above.
119+
---
225120

226121
</details>
227122

228123
<details markdown="1">
229-
<summary><u>Do I have to use <code>async</code>?</u></summary>
124+
<summary><u>See a non-async example</u></summary>
230125

231126
<br>
232127

233-
No-- not every program is asynchronous, so Pyper pipelines are by default synchronous, as long as their tasks are defined as synchronous functions. For example:
128+
Pyper pipelines are by default non-async, as long as their tasks are defined as synchronous functions. For example:
234129

235130
```python
236131
import time
237-
from typing import Iterable
238132

239133
from pyper import task
240134

241135

242-
def step1(limit: int):
136+
def get_data(limit: int):
243137
for i in range(limit):
244138
yield i
245139

246-
247-
def step2(data: int):
140+
def step1(data: int):
248141
time.sleep(1)
249-
return data + 1
250-
142+
print("Finished sync wait", data)
143+
return data
251144

252-
def step3(data: int):
253-
time.sleep(1)
254-
return 2 * data - 1
145+
def step2(data: int):
146+
for i in range(10_000_000):
147+
_ = i*i
148+
print("Finished heavy computation", data)
149+
return data
255150

256151

257-
def print_sum(data: Iterable[int]):
152+
def main():
153+
pipeline = task(get_data, branch=True) \
154+
| task(step1, workers=20) \
155+
| task(step2, workers=20, multiprocess=True)
258156
total = 0
259-
for output in data:
157+
for output in pipeline(limit=20):
260158
total += output
261-
print("Total ", total)
262-
263-
264-
def main():
265-
run = task(step1) \
266-
| task(step2, concurrency=20) \
267-
| task(step3, concurrency=20) \
268-
> print_sum
269-
# Run synchronously
270-
run(limit=20)
159+
print("Total:", total)
271160

272161

273162
if __name__ == "__main__":
274-
main() # takes ~2 seconds
163+
main()
275164
```
276165

277-
A pipeline consisting of _at least one asynchronous function_ becomes an `AsyncPipeline`, which exposes the same logical function, provided `async` and `await` syntax in all of the obvious places. This makes it effortless to unify synchronously defined and asynchronously defined functions where need be.
166+
A pipeline consisting of _at least one asynchronous function_ becomes an `AsyncPipeline`, which exposes the same usage API, provided `async` and `await` syntax in the obvious places. This makes it effortless to combine synchronously defined and asynchronously defined functions where need be.
278167

279168
</details>
280169

@@ -284,9 +173,11 @@ To explore more of Pyper's features, see some further [examples](https://pyper-d
284173

285174
## Dependencies
286175

287-
Pyper is implemented in pure Python, with no sub-dependencies. It relies heavily on the well-established built-in modules:
288-
* [asyncio](https://docs.python.org/3/library/asyncio.html) for handling async-based concurrency
289-
* [threading](https://docs.python.org/3/library/threading.html) for handling thread-based concurrency
176+
Pyper is implemented in pure Python, with no sub-dependencies. It is built on top of the well-established built-in Python modules:
177+
* [threading](https://docs.python.org/3/library/threading.html) for thread-based concurrency
178+
* [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) for parallelism
179+
* [asyncio](https://docs.python.org/3/library/asyncio.html) for async-based concurrency
180+
* [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) for unifying threads, processes, and async code
290181

291182
## License
292183

0 commit comments

Comments
 (0)