Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: basic coroutine and subprocess support #393

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pynvim/api/nvim.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,56 @@ def handler():
raise
self._session.threadsafe_call(handler)

if IS_PYTHON3:

def run_coroutine(self, coroutine):
""" Run a coroutine inside a response handler (or setup_cb)"""

return self._session.run_coroutine(coroutine)

def start_subprocess(self, cmd, on_data, on_exit, **args):
coro = self.loop.subprocess_exec(partial(NvimAsyncioProcess, self, on_data, on_exit),
*cmd, **args)
(transport, protocol) = self.run_coroutine(coro)
return transport

if IS_PYTHON3:

import asyncio


class NvimAsyncioProcess(asyncio.SubprocessProtocol):

def __init__(self, session, on_data, on_exit):
self.session = session
self.on_data = on_data
self.on_exit = on_exit

self.call_point = ''.join(format_stack(None, 6)[:-2])

def _callback(self, cb, *args):

def handler():
try:
cb(*args)
except Exception as err:
msg = ("error caught while executing subprocess callback:\n"
"{!r}\n{}\n \nthe process was created at\n{}"
.format(err, format_exc_skip(1), self.call_point))
self.session._err_cb(msg)
raise

self.session._session.threadsafe_call(handler)


def connection_made(self, transport):
pass

def pipe_data_received(self, fd, data):
self._callback(self.on_data, fd, data)

def process_exited(self):
self._callback(self.on_exit)

class Buffers(object):

Expand Down
6 changes: 3 additions & 3 deletions pynvim/msgpack_rpc/event_loop/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def _init(self):
self._queued_data = deque()
self._fact = lambda: self
self._raw_transport = None
if os.name != 'nt':
self._child_watcher = asyncio.get_child_watcher()
self._child_watcher.attach_loop(self._loop)

def _connect_tcp(self, address, port):
coroutine = self._loop.create_connection(self._fact, address, port)
Expand Down Expand Up @@ -118,9 +121,6 @@ def _connect_stdio(self):
debug("native stdout connection successful")

def _connect_child(self, argv):
if os.name != 'nt':
self._child_watcher = asyncio.get_child_watcher()
self._child_watcher.attach_loop(self._loop)
coroutine = self._loop.subprocess_exec(self._fact, *argv)
self._loop.run_until_complete(coroutine)

Expand Down
19 changes: 19 additions & 0 deletions pynvim/msgpack_rpc/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ def request(self, method, *args, **kwargs):
raise self.error_wrapper(err)
return rv

def run_coroutine(self, coroutine):
if not self._is_running:
# TODO: can has return value?
return self.loop._loop.run_until_complete(coroutine)
gr = greenlet.getcurrent()
parent = gr.parent

def result_cb(future):
debug('coroutine result is available for greenlet %s, switching back', gr)
gr.switch(future)

task = self.loop._loop.create_task(coroutine)
task.add_done_callback(result_cb)

debug('yielding from greenlet %s to wait for coroutine', gr)
future = parent.switch()
return future.result() # should re-raise any exception


def run(self, request_cb, notification_cb, setup_cb=None):
"""Run the event loop to receive requests and notifications from Nvim.
Expand Down