Skip to content

Commit 64dd2c0

Browse files
committed
aio.core: Add interactive utils
Signed-off-by: Ryan Northey <[email protected]>
1 parent 816be6a commit 64dd2c0

File tree

11 files changed

+334
-3
lines changed

11 files changed

+334
-3
lines changed

aio.core/aio/core/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ pytooling_library(
2020
"functional/output.py",
2121
"functional/process.py",
2222
"functional/utils.py",
23+
"interactive/abstract/__init__.py",
24+
"interactive/abstract/interactive.py",
25+
"interactive/exceptions.py",
26+
"interactive/interactive.py",
2327
"output/abstract/__init__.py",
2428
"output/abstract/output.py",
2529
"output/exceptions.py",

aio.core/aio/core/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
from . import (
33
directory,
44
functional,
5+
interactive,
56
output,
7+
pipe,
68
stream,
79
subprocess,
810
tasks)
@@ -11,7 +13,9 @@
1113
__all__ = (
1214
"directory",
1315
"functional",
16+
"interactive",
1417
"output",
18+
"pipe",
1519
"stream",
1620
"subprocess",
1721
"tasks")
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
3+
from .abstract import AInteractive, APrompt
4+
from .interactive import Interactive, interactive, Prompt
5+
from . import exceptions
6+
7+
8+
__all__ = (
9+
"AInteractive",
10+
"APrompt",
11+
"exceptions",
12+
"interactive",
13+
"Interactive",
14+
"Prompt")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
from .interactive import AInteractive, APrompt
3+
4+
5+
__all__ = (
6+
"AInteractive",
7+
"APrompt")
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
2+
import asyncio
3+
import re
4+
import sys
5+
import time
6+
from functools import cached_property, partial
7+
from typing import Union
8+
9+
import psutil
10+
11+
import abstracts
12+
13+
from aio.core import functional, output, subprocess
14+
from aio.core.functional import async_property, AwaitableGenerator
15+
16+
17+
class APrompt(metaclass=abstracts.Abstraction):
18+
19+
def __init__(self, match, match_type="any"):
20+
self._match = match
21+
self.match_type = match
22+
23+
@cached_property
24+
def re_match(self):
25+
return re.compile(self._match)
26+
27+
def matches(self, counter, output):
28+
# print(counter)
29+
if isinstance(self._match, int):
30+
if counter.get("stdout", 0) >= self._match:
31+
return True
32+
return bool(self.re_match.match(str(output)))
33+
34+
35+
class InteractiveProcess:
36+
37+
def __init__(self, cmd, prompt, flush_delay=0, wait_for_prompt=True, start_prompt=None, parallel=False, start_byte=None):
38+
self.cmd = cmd
39+
self._prompt = prompt
40+
self._start_prompt = start_prompt if start_prompt is not None else prompt
41+
self.flush_delay = flush_delay
42+
self.wait_for_prompt = wait_for_prompt
43+
self.start_byte = start_byte
44+
45+
@cached_property
46+
def prompt(self):
47+
return (
48+
self.prompt_class(self._prompt)
49+
if not isinstance(self._prompt, self.prompt_class)
50+
else self._prompt)
51+
52+
@cached_property
53+
def start_prompt(self):
54+
return (
55+
self.prompt_class(self._start_prompt)
56+
if not isinstance(self._start_prompt, self.prompt_class)
57+
else self._start_prompt)
58+
59+
@property
60+
def prompt_class(self):
61+
return APrompt
62+
63+
@cached_property
64+
def buffer(self):
65+
return asyncio.Queue()
66+
67+
@async_property(cache=True)
68+
async def proc(self):
69+
return await asyncio.create_subprocess_shell(
70+
self.cmd,
71+
# shell=True,
72+
# universal_newlines=True,
73+
stdin=asyncio.subprocess.PIPE,
74+
stderr=asyncio.subprocess.PIPE,
75+
stdout=asyncio.subprocess.PIPE)
76+
77+
@cached_property
78+
def q(self):
79+
return asyncio.Queue()
80+
81+
@async_property(cache=True)
82+
async def stdin(self):
83+
return (await self.proc).stdin
84+
85+
@async_property(cache=True)
86+
async def stdout(self):
87+
return (await self.proc).stdout
88+
89+
@async_property(cache=True)
90+
async def stderr(self):
91+
return (await self.proc).stderr
92+
93+
@cached_property
94+
def write_lock(self):
95+
return asyncio.Lock()
96+
97+
async def send_stdin(self, message):
98+
# print(f"SEND STDIN {message}")
99+
async with self.write_lock:
100+
proc = await self.proc
101+
if message is not None:
102+
proc.stdin.write(message)
103+
await proc.stdin.drain()
104+
105+
async def start(self):
106+
proc = await self.proc
107+
asyncio.create_task(self.connect_outputs())
108+
if self._start_prompt != 0:
109+
header = "\n".join(str(h) for h in await self.header)
110+
print(header)
111+
print(f"Process ({self.cmd}) started on cpu {psutil.Process(proc.pid).cpu_num()}")
112+
self._started = True
113+
114+
async def connect_outputs(self):
115+
await self.stdout_listener
116+
await self.stderr_listener
117+
118+
@async_property(cache=True)
119+
async def stderr_listener(self):
120+
return asyncio.create_task(
121+
self.listen_to_pipe(
122+
"stderr",
123+
(await self.proc).stderr))
124+
125+
@async_property(cache=True)
126+
async def stdout_listener(self):
127+
return asyncio.create_task(
128+
self.listen_to_pipe(
129+
"stdout",
130+
(await self.proc).stdout))
131+
132+
async def listen_to_pipe(self, type, pipe):
133+
while True:
134+
result = await pipe.readline()
135+
await self.buffer.put(None)
136+
# If we havent completed writing, wait
137+
# print(f"GOT RESULT: {type} {result}")
138+
async with self.write_lock:
139+
await self.q.put(output.CapturedOutput(type, result))
140+
141+
async def interact(self, message):
142+
await self.send_stdin(message)
143+
counter = dict()
144+
returns = False
145+
while True:
146+
result = await self.q.get()
147+
yield result
148+
counter[result.type] = counter.get(result.type, 0) + 1
149+
await self.buffer.get()
150+
self.buffer.task_done()
151+
self.q.task_done()
152+
if self.interaction_returns(counter, result):
153+
returns = True
154+
if returns and await self.finished_reading:
155+
break
156+
157+
def __call__(self, message=None):
158+
return AwaitableGenerator(self.interact(message))
159+
160+
_started = False
161+
162+
@cached_property
163+
def header(self):
164+
return (
165+
(self(self.start_byte)
166+
if self.start_byte is not None
167+
else self())
168+
if (self.wait_for_prompt
169+
and self.start_prompt != 0)
170+
else [])
171+
172+
def interaction_returns(self, counter, result):
173+
return self.prompt.matches(counter, result)
174+
175+
@async_property
176+
async def finished_reading(self):
177+
if self.buffer.qsize():
178+
return False
179+
if not self.flush_delay:
180+
return True
181+
await asyncio.sleep(self.flush_delay)
182+
return not self.buffer.qsize()
183+
184+
185+
class AInteractive(metaclass=abstracts.Abstraction):
186+
187+
def __init__(self, cmd, prompt, flush_delay=0, wait_for_prompt=True, start_prompt=None, parallel=False, start_byte=None):
188+
self.cmd = cmd
189+
self._prompt = prompt
190+
self._start_prompt = start_prompt if start_prompt is not None else prompt
191+
self.flush_delay = flush_delay
192+
self.wait_for_prompt = wait_for_prompt
193+
self.parallel = parallel
194+
self.start_byte = start_byte
195+
196+
@async_property(cache=True)
197+
async def procs(self):
198+
return [
199+
InteractiveProcess(self.cmd, self._prompt, start_prompt=self._start_prompt, start_byte=self.start_byte)
200+
for x in range(0, self.number_of_procs)]
201+
202+
@property
203+
def number_of_procs(self):
204+
return 4
205+
206+
@cached_property
207+
def free_processor(self):
208+
return asyncio.Queue(maxsize=self.number_of_procs)
209+
210+
async def interact(self, message=None):
211+
# print(f"INTERACT REQUEST {self} ({self.cmd}): {message}")
212+
proc = await self.free_processor.get()
213+
self.free_processor.task_done()
214+
# print(f"INTERACT {self} ({self.cmd}): {message}")
215+
async for result in proc(message):
216+
# print(f"INTERACT RESPONSE {self} ({self.cmd}): {result}")
217+
yield result
218+
await self.free_processor.put(proc)
219+
220+
async def start(self):
221+
for proc in await self.procs:
222+
await proc.start()
223+
await self.free_processor.put(proc)
224+
225+
def __call__(self, message=None):
226+
return AwaitableGenerator(self.interact(message))
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
import contextlib
3+
4+
import abstracts
5+
6+
from aio.core import interactive
7+
8+
9+
@abstracts.implementer(interactive.APrompt)
10+
class Prompt:
11+
pass
12+
13+
14+
@abstracts.implementer(interactive.AInteractive)
15+
class Interactive:
16+
17+
@property
18+
def prompt_class(self):
19+
return Prompt
20+
21+
22+
@contextlib.asynccontextmanager
23+
async def interactive(*args, **kwargs):
24+
interaction = Interactive(*args, **kwargs)
25+
await interaction.start()
26+
yield interaction

aio.core/aio/core/output/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11

2-
from .abstract import ACapturedOutput, ABufferedOutputs, AQueueIO
3-
from .output import BufferedOutputs, CapturedOutput, QueueIO
2+
from .abstract import ACapturedOutput, ACapturedOutputs, ABufferedOutputs, AQueueIO
3+
from .output import BufferedOutputs, CapturedOutput, CapturedOutputs, QueueIO
44
from . import exceptions
55

66

77
__all__ = (
88
"ACapturedOutput",
9+
"ACapturedOutputs",
910
"ABufferedOutputs",
1011
"AQueueIO",
1112
"BufferedOutputs",
1213
"CapturedOutput",
14+
"CapturedOutputs",
1315
"exceptions",
1416
"output",
1517
"QueueIO")
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11

22

3-
from .output import ACapturedOutput, AQueueIO, ABufferedOutputs
3+
from .output import ACapturedOutput, ACapturedOutputs, AQueueIO, ABufferedOutputs
44

55

66
__all__ = (
77
"ACapturedOutput",
8+
"ACapturedOutputs",
89
"AQueueIO",
910
"ABufferedOutputs")

aio.core/aio/core/output/abstract/output.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,52 @@
1111
import abstracts
1212

1313
from aio import core
14+
from aio.core import functional
1415

1516

1617
DEATH_SENTINEL = object()
1718

1819

20+
class ACapturedOutputs(metaclass=abstracts.Abstraction):
21+
"""Wraps a list of captured outputs and allows you to
22+
print them, or filter them base on type."""
23+
24+
def __init__(self, outputs, output_types=None, out_file=None):
25+
self._outputs = outputs
26+
self._output_types = output_types
27+
self.out_file = functional.maybe_coro(out_file or print)
28+
29+
def __getitem__(self, type):
30+
return list(self.output_for(type))
31+
32+
@property
33+
def output(self):
34+
return "\n".join(
35+
f"{result.type}: {str(result)}"
36+
for result in self._outputs)
37+
38+
@cached_property
39+
def output_types(self):
40+
if self._output_types:
41+
return self._output_types
42+
return dict(
43+
stdout=sys.stdout,
44+
stderr=sys.stderr)
45+
46+
async def drain(self, type=None):
47+
types = [type] if type else self.output_types.keys()
48+
for output_type in types:
49+
for output in self[output_type]:
50+
await self.out_file(
51+
output,
52+
file=self.output_types[output_type])
53+
54+
def output_for(self, type):
55+
for result in self._outputs:
56+
if result.type == type:
57+
yield result
58+
59+
1960
class ACapturedOutput(metaclass=abstracts.Abstraction):
2061
"""Captured output of a given type, eg `stdout`, `stderr`"""
2162

0 commit comments

Comments
 (0)