diff --git a/.travis.yml b/.travis.yml index 3db3714a..9b490bce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,7 @@ before_install: install: - pip install --pre -r jupyterhub/dev-requirements.txt - pip install --pre -e jupyterhub + - pip install --pre -f travis-wheels/wheelhouse -r requirements.txt script: - travis_retry py.test --lf --cov batchspawner batchspawner/tests -v diff --git a/README.md b/README.md index 22ab2232..4e000149 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ This package formerly included WrapSpawner and ProfilesSpawner, which provide me ```python c = get_config() c.JupyterHub.spawner_class = 'batchspawner.TorqueSpawner' + import batchspawner # Even though not used, needed to register batchspawner interface ``` 3. Depending on the spawner, additional configuration will likely be needed. @@ -52,6 +53,7 @@ to run Jupyter notebooks on an academic supercomputer cluster. ```python # Select the Torque backend and increase the timeout since batch jobs may take time to start + import batchspawner c.JupyterHub.spawner_class = 'batchspawner.TorqueSpawner' c.Spawner.http_timeout = 120 @@ -117,6 +119,7 @@ clusters, as well as an option to run a local notebook directly on the jupyterhu ```python # Same initial setup as the previous example + import batchspawner c.JupyterHub.spawner_class = 'wrapspawner.ProfilesSpawner' c.Spawner.http_timeout = 120 #------------------------------------------------------------------------------ @@ -152,9 +155,50 @@ clusters, as well as an option to run a local notebook directly on the jupyterhu ``` +## Debugging batchspawner + +Sometimes it can be hard to debug batchspawner, but it's not really +once you know how the pieces interact. Check the following places for +error messages: + +* Check the JupyterHub logs for errors. + +* Check the JupyterHub logs for the batch script that got submitted + and the command used to submit it. Are these correct? (Note that + there are submission environment variables too, which aren't + displayed.) + +* At this point, it's a matter of checking the batch system. Is the + job ever scheduled? Does it run? Does it succeed? Check the batch + system status and output of the job. The most comon failure + patterns are a) job never starting due to bad scheduler options, b) + job waiting in the queue beyond the `start_timeout`, causing + JupyterHub to kill the job. + +* At this point the job starts. Does it fail immediately, or before + Jupyter starts? Check the scheduler output files (stdout/stderr of + the job), wherever it is stored. To debug the job script, you can + add debugging into the batch script, such as an `env` or `set + -x`. + +* At this point Jupyter itself starts - check its error messages. Is + it starting with the right options? Can it communicate with the + hub? At this point there usually isn't anything + batchspawner-specific, with the one exception below. The error log + would be in the batch script output (same file as above). There may + also be clues in the JupyterHub logfile. + +Common problems: + +* Did you `import batchspawner` in the `jupyterhub_config.py` file? + This is needed in order to activate the batchspawer API in + JupyterHub. + + + ## Changelog -### dev (requires minimum JupyterHub 0.7.2 and Python 3.4) +### dev (requires minimum JupyterHub 0.9 and Python 3.5) Added (user) @@ -162,19 +206,27 @@ Added (user) * Add new option exec_prefix, which defaults to `sudo -E -u {username}`. This replaces explicit `sudo` in every batch command - changes in local commands may be needed. * New option: `req_keepvars_extra`, which allows keeping extra variables in addition to what is defined by JupyterHub itself (addition of variables to keep instead of replacement). #99 * Add `req_prologue` and `req_epilogue` options to scripts which are inserted before/after the main jupyterhub-singleuser command, which allow for generic setup/cleanup without overriding the entire script. #96 -* SlurmSpawner: add the `req_reservation` option. # +* SlurmSpawner: add the `req_reservation` option. #91 +* Add basic support for JupyterHub progress updates, but this is not used much yet. #86 Added (developer) * Add many more tests. * Add a new page `SPAWNERS.md` which information on specific spawners. Begin trying to collect a list of spawner-specific contacts. #97 +* Rename `current_ip` and `current_port` commands to `ip` and `port`. No user impact. #139 +* Update to Python 3.5 `async` / `await` syntax to support JupyterHub progress updates. #90 Changed -* Update minimum requirements to JupyterHub 0.8.1 and Python 3.4. +* PR #58 and #141 changes logic of port selection, so that it is selected *after* the singleuser server starts. This means that the port number has to be conveyed back to JupyterHub. This requires the following changes: + - `jupyterhub_config.py` *must* explicitely import `batchspawner` + - Add a new option `batchspawner_singleuser_cmd` which is used as a wrapper in the single-user servers, which conveys the remote port back to JupyterHub. This is now an integral part of the spawn process. + - If you have installed with `pip install -e`, you will have to re-install so that the new script `batchspawner-singleuser` is added to `$PATH`. +* Update minimum requirements to JupyterHub 0.9 and Python 3.5. #143 * Update Slurm batch script. Now, the single-user notebook is run in a job step, with a wrapper of `srun`. This may need to be removed using `req_srun=''` if you don't want environment variables limited. * Pass the environment dictionary to the queue and cancel commands as well. This is mostly user environment, but may be useful to these commands as well in some cases. #108, #111 If these environment variables were used for authentication as an admin, be aware that there are pre-existing security issues because they may be passed to the user via the batch submit command, see #82. + Fixed * Improve debugging on failed submission by raising errors including error messages from the commands. #106 diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 5185d618..b79dd05c 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -15,9 +15,12 @@ * remote execution via submission of templated scripts * job names instead of PIDs """ +import asyncio +from async_generator import async_generator, yield_, yield_from_ import pwd import os import re +import sys import xml.etree.ElementTree as ET @@ -156,6 +159,12 @@ def _req_keepvars_default(self): "Must include {cmd} which will be replaced with the jupyterhub-singleuser command line." ).tag(config=True) + batchspawner_singleuser_cmd = Unicode('batchspawner-singleuser', + help="A wrapper which is capable of special batchspawner setup: currently sets the port on " + "the remote host. Not needed to be set under normal circumstances, unless path needs " + "specification." + ).tag(config=True) + # Raw output of job submission command unless overridden job_id = Unicode() @@ -181,58 +190,64 @@ def parse_job_id(self, output): return output def cmd_formatted_for_batch(self): - return ' '.join(['batchspawner-singleuser'] + self.cmd + self.get_args()) + """The command which is substituted inside of the batch script""" + return ' '.join([self.batchspawner_singleuser_cmd] + self.cmd + self.get_args()) + + async def run_command(self, cmd, input=None, env=None): + proc = await asyncio.create_subprocess_shell(cmd, env=env, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + inbytes=None - @gen.coroutine - def run_command(self, cmd, input=None, env=None): - proc = Subprocess(cmd, shell=True, env=env, stdin=Subprocess.STREAM, stdout=Subprocess.STREAM,stderr=Subprocess.STREAM) - inbytes = None if input: - inbytes = input.encode() - try: - yield proc.stdin.write(inbytes) - except StreamClosedError as exp: - # Apparently harmless - pass - proc.stdin.close() - out, eout = yield [proc.stdout.read_until_close(), - proc.stderr.read_until_close()] - proc.stdout.close() - proc.stderr.close() - eout = eout.decode().strip() + inbytes=input.encode() + try: - err = yield proc.wait_for_exit() - except CalledProcessError: + out, eout = await proc.communicate(input=inbytes) + except: + self.log.debug("Exception raised when trying to run command: %s" % command) + proc.kill() + self.log.debug("Running command failed done kill") + out, eout = await proc.communicate() + out = out.decode.strip() + eout = eout.decode.strip() self.log.error("Subprocess returned exitcode %s" % proc.returncode) self.log.error('Stdout:') self.log.error(out) self.log.error('Stderr:') self.log.error(eout) raise RuntimeError('{} exit status {}: {}'.format(cmd, proc.returncode, eout)) - if err != 0: - return err # exit error? else: - out = out.decode().strip() - return out + eout = eout.decode().strip() + err = proc.returncode + if err != 0: + self.log.error("Subprocess returned exitcode %s" % err) + self.log.error(eout) + raise RuntimeError(eout) - @gen.coroutine - def _get_batch_script(self, **subvars): + out = out.decode().strip() + return out + + async def _get_batch_script(self, **subvars): """Format batch script from vars""" - # Colud be overridden by subclasses, but mainly useful for testing + # Could be overridden by subclasses, but mainly useful for testing return format_template(self.batch_script, **subvars) - @gen.coroutine - def submit_batch_script(self): + async def submit_batch_script(self): subvars = self.get_req_subvars() + # `cmd` is submitted to the batch system cmd = ' '.join((format_template(self.exec_prefix, **subvars), format_template(self.batch_submit_cmd, **subvars))) + # `subvars['cmd']` is what is run _inside_ the batch script, + # put into the template. subvars['cmd'] = self.cmd_formatted_for_batch() if hasattr(self, 'user_options'): subvars.update(self.user_options) - script = yield self._get_batch_script(**subvars) + script = await self._get_batch_script(**subvars) self.log.info('Spawner submitting job using ' + cmd) self.log.info('Spawner submitted script:\n' + script) - out = yield self.run_command(cmd, input=script, env=self.get_env()) + out = await self.run_command(cmd, input=script, env=self.get_env()) try: self.log.info('Job submitted. cmd: ' + cmd + ' output: ' + out) self.job_id = self.parse_job_id(out) @@ -247,8 +262,7 @@ def submit_batch_script(self): "and self.job_id as {job_id}." ).tag(config=True) - @gen.coroutine - def read_job_state(self): + async def read_job_state(self): if self.job_id is None or len(self.job_id) == 0: # job not running self.job_status = '' @@ -259,7 +273,7 @@ def read_job_state(self): format_template(self.batch_query_cmd, **subvars))) self.log.debug('Spawner querying job: ' + cmd) try: - out = yield self.run_command(cmd, env=self.get_env()) + out = await self.run_command(cmd) self.job_status = out except Exception as e: self.log.error('Error querying job ' + self.job_id) @@ -271,14 +285,13 @@ def read_job_state(self): help="Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd." ).tag(config=True) - @gen.coroutine - def cancel_batch_job(self): + async def cancel_batch_job(self): subvars = self.get_req_subvars() subvars['job_id'] = self.job_id cmd = ' '.join((format_template(self.exec_prefix, **subvars), format_template(self.batch_cancel_cmd, **subvars))) self.log.info('Cancelling job ' + self.job_id + ': ' + cmd) - yield self.run_command(cmd, env=self.get_env()) + await self.run_command(cmd) def load_state(self, state): """load job_id from state""" @@ -317,11 +330,10 @@ def state_gethost(self): "Return string, hostname or addr of running job, likely by parsing self.job_status" raise NotImplementedError("Subclass must provide implementation") - @gen.coroutine - def poll(self): + async def poll(self): """Poll the process""" if self.job_id is not None and len(self.job_id) > 0: - yield self.read_job_state() + await self.read_job_state() if self.state_isrunning() or self.state_ispending(): return None else: @@ -337,8 +349,7 @@ def poll(self): help="Polling interval (seconds) to check job state during startup" ).tag(config=True) - @gen.coroutine - def start(self): + async def start(self): """Start the process""" self.ip = self.traits()['ip'].default_value self.port = self.traits()['port'].default_value @@ -346,7 +357,7 @@ def start(self): if jupyterhub.version_info >= (0,8) and self.server: self.server.port = self.port - job = yield self.submit_batch_script() + job = await self.submit_batch_script() # We are called with a timeout, and if the timeout expires this function will # be interrupted at the next yield, and self.stop() will be called. @@ -355,7 +366,7 @@ def start(self): if len(self.job_id) == 0: raise RuntimeError("Jupyter batch job submission failure (no jobid in output)") while True: - yield self.poll() + await self.poll() if self.state_isrunning(): break else: @@ -367,11 +378,11 @@ def start(self): raise RuntimeError('The Jupyter batch job has disappeared' ' while pending in the queue or died immediately' ' after starting.') - yield gen.sleep(self.startup_poll_interval) + await gen.sleep(self.startup_poll_interval) self.ip = self.state_gethost() while self.port == 0: - yield gen.sleep(self.startup_poll_interval) + await gen.sleep(self.startup_poll_interval) # Test framework: For testing, mock_port is set because we # don't actually run the single-user server yet. if hasattr(self, 'mock_port'): @@ -388,27 +399,43 @@ def start(self): return self.ip, self.port - @gen.coroutine - def stop(self, now=False): + async def stop(self, now=False): """Stop the singleuser server job. Returns immediately after sending job cancellation command if now=True, otherwise tries to confirm that job is no longer running.""" self.log.info("Stopping server job " + self.job_id) - yield self.cancel_batch_job() + await self.cancel_batch_job() if now: return for i in range(10): - yield self.poll() + await self.poll() if not self.state_isrunning(): return - yield gen.sleep(1.0) + await gen.sleep(1.0) if self.job_id: self.log.warn("Notebook server job {0} at {1}:{2} possibly failed to terminate".format( self.job_id, self.ip, self.port) ) + @async_generator + async def progress(self): + while True: + if self.state_ispending(): + await yield_({ + "message": "Pending in queue...", + }) + elif self.state_isrunning(): + await yield_({ + "message": "Cluster job running... waiting to connect", + }) + return + else: + await yield_({ + "message": "Unknown status...", + }) + await gen.sleep(1) class BatchSpawnerRegexStates(BatchSpawnerBase): """Subclass of BatchSpawnerBase that uses config-supplied regular expressions @@ -612,6 +639,8 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates): def parse_job_id(self, output): # make sure jobid is really a number try: + # use only last line to circumvent slurm bug + output = output.splitlines()[-1] id = output.split(';')[0] int(id) except Exception as e: diff --git a/requirements.txt b/requirements.txt index ee0a79e1..55ac7a89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ +async_generator>=1.8 jinja2 jupyterhub>=0.5