Skip to content

Commit

Permalink
Merge pull request #144 from rkdarst/dev
Browse files Browse the repository at this point in the history
Integration: #143 (inc. #141, #139), #130, #86, #90
  • Loading branch information
mbmilligan authored Jul 24, 2019
2 parents aec79ac + 7a28923 commit 325aab9
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 53 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ before_install:
install:
- pip install --pre -r jupyterhub/dev-requirements.txt
- pip install --pre -e jupyterhub
- pip install --pre -f travis-wheels/wheelhouse -r requirements.txt

script:
- travis_retry py.test --lf --cov batchspawner batchspawner/tests -v
58 changes: 55 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This package formerly included WrapSpawner and ProfilesSpawner, which provide me
```python
c = get_config()
c.JupyterHub.spawner_class = 'batchspawner.TorqueSpawner'
import batchspawner # Even though not used, needed to register batchspawner interface
```
3. Depending on the spawner, additional configuration will likely be needed.

Expand Down Expand Up @@ -52,6 +53,7 @@ to run Jupyter notebooks on an academic supercomputer cluster.

```python
# Select the Torque backend and increase the timeout since batch jobs may take time to start
import batchspawner
c.JupyterHub.spawner_class = 'batchspawner.TorqueSpawner'
c.Spawner.http_timeout = 120

Expand Down Expand Up @@ -117,6 +119,7 @@ clusters, as well as an option to run a local notebook directly on the jupyterhu

```python
# Same initial setup as the previous example
import batchspawner
c.JupyterHub.spawner_class = 'wrapspawner.ProfilesSpawner'
c.Spawner.http_timeout = 120
#------------------------------------------------------------------------------
Expand Down Expand Up @@ -152,29 +155,78 @@ clusters, as well as an option to run a local notebook directly on the jupyterhu
```


## Debugging batchspawner

Sometimes it can be hard to debug batchspawner, but it's not really
once you know how the pieces interact. Check the following places for
error messages:

* Check the JupyterHub logs for errors.

* Check the JupyterHub logs for the batch script that got submitted
and the command used to submit it. Are these correct? (Note that
there are submission environment variables too, which aren't
displayed.)

* At this point, it's a matter of checking the batch system. Is the
job ever scheduled? Does it run? Does it succeed? Check the batch
system status and output of the job. The most comon failure
patterns are a) job never starting due to bad scheduler options, b)
job waiting in the queue beyond the `start_timeout`, causing
JupyterHub to kill the job.

* At this point the job starts. Does it fail immediately, or before
Jupyter starts? Check the scheduler output files (stdout/stderr of
the job), wherever it is stored. To debug the job script, you can
add debugging into the batch script, such as an `env` or `set
-x`.

* At this point Jupyter itself starts - check its error messages. Is
it starting with the right options? Can it communicate with the
hub? At this point there usually isn't anything
batchspawner-specific, with the one exception below. The error log
would be in the batch script output (same file as above). There may
also be clues in the JupyterHub logfile.

Common problems:

* Did you `import batchspawner` in the `jupyterhub_config.py` file?
This is needed in order to activate the batchspawer API in
JupyterHub.



## Changelog

### dev (requires minimum JupyterHub 0.7.2 and Python 3.4)
### dev (requires minimum JupyterHub 0.9 and Python 3.5)

Added (user)

* Add Jinja2 templating as an option for all scripts and commands. If '{{' or `{%` is used anywhere in the string, it is used as a jinja2 template.
* Add new option exec_prefix, which defaults to `sudo -E -u {username}`. This replaces explicit `sudo` in every batch command - changes in local commands may be needed.
* New option: `req_keepvars_extra`, which allows keeping extra variables in addition to what is defined by JupyterHub itself (addition of variables to keep instead of replacement). #99
* Add `req_prologue` and `req_epilogue` options to scripts which are inserted before/after the main jupyterhub-singleuser command, which allow for generic setup/cleanup without overriding the entire script. #96
* SlurmSpawner: add the `req_reservation` option. #
* SlurmSpawner: add the `req_reservation` option. #91
* Add basic support for JupyterHub progress updates, but this is not used much yet. #86

Added (developer)

* Add many more tests.
* Add a new page `SPAWNERS.md` which information on specific spawners. Begin trying to collect a list of spawner-specific contacts. #97
* Rename `current_ip` and `current_port` commands to `ip` and `port`. No user impact. #139
* Update to Python 3.5 `async` / `await` syntax to support JupyterHub progress updates. #90

Changed

* Update minimum requirements to JupyterHub 0.8.1 and Python 3.4.
* PR #58 and #141 changes logic of port selection, so that it is selected *after* the singleuser server starts. This means that the port number has to be conveyed back to JupyterHub. This requires the following changes:
- `jupyterhub_config.py` *must* explicitely import `batchspawner`
- Add a new option `batchspawner_singleuser_cmd` which is used as a wrapper in the single-user servers, which conveys the remote port back to JupyterHub. This is now an integral part of the spawn process.
- If you have installed with `pip install -e`, you will have to re-install so that the new script `batchspawner-singleuser` is added to `$PATH`.
* Update minimum requirements to JupyterHub 0.9 and Python 3.5. #143
* Update Slurm batch script. Now, the single-user notebook is run in a job step, with a wrapper of `srun`. This may need to be removed using `req_srun=''` if you don't want environment variables limited.
* Pass the environment dictionary to the queue and cancel commands as well. This is mostly user environment, but may be useful to these commands as well in some cases. #108, #111 If these environment variables were used for authentication as an admin, be aware that there are pre-existing security issues because they may be passed to the user via the batch submit command, see #82.


Fixed

* Improve debugging on failed submission by raising errors including error messages from the commands. #106
Expand Down
129 changes: 79 additions & 50 deletions batchspawner/batchspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
* remote execution via submission of templated scripts
* job names instead of PIDs
"""
import asyncio
from async_generator import async_generator, yield_, yield_from_
import pwd
import os
import re
import sys

import xml.etree.ElementTree as ET

Expand Down Expand Up @@ -156,6 +159,12 @@ def _req_keepvars_default(self):
"Must include {cmd} which will be replaced with the jupyterhub-singleuser command line."
).tag(config=True)

batchspawner_singleuser_cmd = Unicode('batchspawner-singleuser',
help="A wrapper which is capable of special batchspawner setup: currently sets the port on "
"the remote host. Not needed to be set under normal circumstances, unless path needs "
"specification."
).tag(config=True)

# Raw output of job submission command unless overridden
job_id = Unicode()

Expand All @@ -181,58 +190,64 @@ def parse_job_id(self, output):
return output

def cmd_formatted_for_batch(self):
return ' '.join(['batchspawner-singleuser'] + self.cmd + self.get_args())
"""The command which is substituted inside of the batch script"""
return ' '.join([self.batchspawner_singleuser_cmd] + self.cmd + self.get_args())

async def run_command(self, cmd, input=None, env=None):
proc = await asyncio.create_subprocess_shell(cmd, env=env,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
inbytes=None

@gen.coroutine
def run_command(self, cmd, input=None, env=None):
proc = Subprocess(cmd, shell=True, env=env, stdin=Subprocess.STREAM, stdout=Subprocess.STREAM,stderr=Subprocess.STREAM)
inbytes = None
if input:
inbytes = input.encode()
try:
yield proc.stdin.write(inbytes)
except StreamClosedError as exp:
# Apparently harmless
pass
proc.stdin.close()
out, eout = yield [proc.stdout.read_until_close(),
proc.stderr.read_until_close()]
proc.stdout.close()
proc.stderr.close()
eout = eout.decode().strip()
inbytes=input.encode()

try:
err = yield proc.wait_for_exit()
except CalledProcessError:
out, eout = await proc.communicate(input=inbytes)
except:
self.log.debug("Exception raised when trying to run command: %s" % command)
proc.kill()
self.log.debug("Running command failed done kill")
out, eout = await proc.communicate()
out = out.decode.strip()
eout = eout.decode.strip()
self.log.error("Subprocess returned exitcode %s" % proc.returncode)
self.log.error('Stdout:')
self.log.error(out)
self.log.error('Stderr:')
self.log.error(eout)
raise RuntimeError('{} exit status {}: {}'.format(cmd, proc.returncode, eout))
if err != 0:
return err # exit error?
else:
out = out.decode().strip()
return out
eout = eout.decode().strip()
err = proc.returncode
if err != 0:
self.log.error("Subprocess returned exitcode %s" % err)
self.log.error(eout)
raise RuntimeError(eout)

@gen.coroutine
def _get_batch_script(self, **subvars):
out = out.decode().strip()
return out

async def _get_batch_script(self, **subvars):
"""Format batch script from vars"""
# Colud be overridden by subclasses, but mainly useful for testing
# Could be overridden by subclasses, but mainly useful for testing
return format_template(self.batch_script, **subvars)

@gen.coroutine
def submit_batch_script(self):
async def submit_batch_script(self):
subvars = self.get_req_subvars()
# `cmd` is submitted to the batch system
cmd = ' '.join((format_template(self.exec_prefix, **subvars),
format_template(self.batch_submit_cmd, **subvars)))
# `subvars['cmd']` is what is run _inside_ the batch script,
# put into the template.
subvars['cmd'] = self.cmd_formatted_for_batch()
if hasattr(self, 'user_options'):
subvars.update(self.user_options)
script = yield self._get_batch_script(**subvars)
script = await self._get_batch_script(**subvars)
self.log.info('Spawner submitting job using ' + cmd)
self.log.info('Spawner submitted script:\n' + script)
out = yield self.run_command(cmd, input=script, env=self.get_env())
out = await self.run_command(cmd, input=script, env=self.get_env())
try:
self.log.info('Job submitted. cmd: ' + cmd + ' output: ' + out)
self.job_id = self.parse_job_id(out)
Expand All @@ -247,8 +262,7 @@ def submit_batch_script(self):
"and self.job_id as {job_id}."
).tag(config=True)

@gen.coroutine
def read_job_state(self):
async def read_job_state(self):
if self.job_id is None or len(self.job_id) == 0:
# job not running
self.job_status = ''
Expand All @@ -259,7 +273,7 @@ def read_job_state(self):
format_template(self.batch_query_cmd, **subvars)))
self.log.debug('Spawner querying job: ' + cmd)
try:
out = yield self.run_command(cmd, env=self.get_env())
out = await self.run_command(cmd)
self.job_status = out
except Exception as e:
self.log.error('Error querying job ' + self.job_id)
Expand All @@ -271,14 +285,13 @@ def read_job_state(self):
help="Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
).tag(config=True)

@gen.coroutine
def cancel_batch_job(self):
async def cancel_batch_job(self):
subvars = self.get_req_subvars()
subvars['job_id'] = self.job_id
cmd = ' '.join((format_template(self.exec_prefix, **subvars),
format_template(self.batch_cancel_cmd, **subvars)))
self.log.info('Cancelling job ' + self.job_id + ': ' + cmd)
yield self.run_command(cmd, env=self.get_env())
await self.run_command(cmd)

def load_state(self, state):
"""load job_id from state"""
Expand Down Expand Up @@ -317,11 +330,10 @@ def state_gethost(self):
"Return string, hostname or addr of running job, likely by parsing self.job_status"
raise NotImplementedError("Subclass must provide implementation")

@gen.coroutine
def poll(self):
async def poll(self):
"""Poll the process"""
if self.job_id is not None and len(self.job_id) > 0:
yield self.read_job_state()
await self.read_job_state()
if self.state_isrunning() or self.state_ispending():
return None
else:
Expand All @@ -337,16 +349,15 @@ def poll(self):
help="Polling interval (seconds) to check job state during startup"
).tag(config=True)

@gen.coroutine
def start(self):
async def start(self):
"""Start the process"""
self.ip = self.traits()['ip'].default_value
self.port = self.traits()['port'].default_value

if jupyterhub.version_info >= (0,8) and self.server:
self.server.port = self.port

job = yield self.submit_batch_script()
job = await self.submit_batch_script()

# We are called with a timeout, and if the timeout expires this function will
# be interrupted at the next yield, and self.stop() will be called.
Expand All @@ -355,7 +366,7 @@ def start(self):
if len(self.job_id) == 0:
raise RuntimeError("Jupyter batch job submission failure (no jobid in output)")
while True:
yield self.poll()
await self.poll()
if self.state_isrunning():
break
else:
Expand All @@ -367,11 +378,11 @@ def start(self):
raise RuntimeError('The Jupyter batch job has disappeared'
' while pending in the queue or died immediately'
' after starting.')
yield gen.sleep(self.startup_poll_interval)
await gen.sleep(self.startup_poll_interval)

self.ip = self.state_gethost()
while self.port == 0:
yield gen.sleep(self.startup_poll_interval)
await gen.sleep(self.startup_poll_interval)
# Test framework: For testing, mock_port is set because we
# don't actually run the single-user server yet.
if hasattr(self, 'mock_port'):
Expand All @@ -388,27 +399,43 @@ def start(self):

return self.ip, self.port

@gen.coroutine
def stop(self, now=False):
async def stop(self, now=False):
"""Stop the singleuser server job.
Returns immediately after sending job cancellation command if now=True, otherwise
tries to confirm that job is no longer running."""

self.log.info("Stopping server job " + self.job_id)
yield self.cancel_batch_job()
await self.cancel_batch_job()
if now:
return
for i in range(10):
yield self.poll()
await self.poll()
if not self.state_isrunning():
return
yield gen.sleep(1.0)
await gen.sleep(1.0)
if self.job_id:
self.log.warn("Notebook server job {0} at {1}:{2} possibly failed to terminate".format(
self.job_id, self.ip, self.port)
)

@async_generator
async def progress(self):
while True:
if self.state_ispending():
await yield_({
"message": "Pending in queue...",
})
elif self.state_isrunning():
await yield_({
"message": "Cluster job running... waiting to connect",
})
return
else:
await yield_({
"message": "Unknown status...",
})
await gen.sleep(1)

class BatchSpawnerRegexStates(BatchSpawnerBase):
"""Subclass of BatchSpawnerBase that uses config-supplied regular expressions
Expand Down Expand Up @@ -612,6 +639,8 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates):
def parse_job_id(self, output):
# make sure jobid is really a number
try:
# use only last line to circumvent slurm bug
output = output.splitlines()[-1]
id = output.split(';')[0]
int(id)
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
async_generator>=1.8
jinja2
jupyterhub>=0.5

0 comments on commit 325aab9

Please sign in to comment.