Skip to content

Commit

Permalink
simplify jupyter notebook API
Browse files Browse the repository at this point in the history
Signed-off-by: Lance-Drane <[email protected]>
  • Loading branch information
Lance-Drane committed Aug 6, 2024
1 parent 62f6d8d commit 106f6dd
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 43 deletions.
33 changes: 21 additions & 12 deletions examples-proposed/004-time-loop/mymodule/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

from ipsframework import Component

NOTEBOOK_1_TEMPLATE = 'base-notebook-iterative.ipynb'
NOTEBOOK_1_NAME = 'full_state_iterative.ipynb'
NOTEBOOK_2_TEMPLATE = 'base-notebook-one-pass.ipynb'
NOTEBOOK_2_NAME = 'full_state_one_pass.ipynb'


class Init(Component):
"""Empty init component."""
Expand All @@ -16,14 +21,19 @@ class Driver(Component):
"""In this example, the driver iterates through the time loop and calls both the worker and the monitor component on each timestep."""

def step(self, timestamp=0.0):
NOTEBOOK_TEMPLATE = 'base-notebook.ipynb'

worker = self.services.get_port('WORKER')
monitor = self.services.get_port('MONITOR')

self.services.call(worker, 'init', 0)
# Needed for notebook template
self.services.stage_input_files(NOTEBOOK_TEMPLATE)
self.services.stage_input_files([NOTEBOOK_1_TEMPLATE, NOTEBOOK_2_TEMPLATE])

# Example of a notebook we want to initialize and then periodically append to during the run
self.services.initialize_jupyter_notebook(
dest_notebook_name=NOTEBOOK_1_NAME, # path is relative to JupyterHub directory
source_notebook_path=NOTEBOOK_1_TEMPLATE, # path is relative to input directory
)
# Initialize second notebook

# The time loop is configured in its own section of sim.conf
# It is shared across all components
Expand All @@ -33,15 +43,12 @@ def step(self, timestamp=0.0):
# TODO - perhaps monitor timestep does not need to be called every step, but only every 20 steps?
self.services.call(monitor, 'step', t)

# create notebook here
NOTEBOOK_NAME = 'full_state.ipynb'
jupyter_state_files = self.services.get_staged_jupyterhub_files()
self.services.stage_jupyter_notebook(
dest_notebook_name=NOTEBOOK_NAME, # path is relative to JupyterHub directory
source_notebook_path='base-notebook.ipynb', # path is relative to input directory
tags=jupyter_state_files,
# With this second "example" notebook, we only create it once and only write to it once.
self.services.initialize_jupyter_notebook(
dest_notebook_name=NOTEBOOK_2_NAME, # path is relative to JupyterHub directory
source_notebook_path=NOTEBOOK_2_TEMPLATE, # path is relative to input directory
initial_data_files=self.services.get_staged_jupyterhub_files(),
)
self.services.portal_register_jupyter_notebook(NOTEBOOK_NAME)

self.services.call(worker, 'finalize', 0)

Expand Down Expand Up @@ -95,7 +102,9 @@ def step(self, timestamp=0.0, **keywords):
data = f.read()

# stage the state file in the JupyterHub directory
self.services.jupyterhub_make_state(state_file, timestamp)
data_file = self.services.jupyterhub_make_state(state_file, timestamp)
print('ADD DATA FILE', data_file)
self.services.add_data_file_to_notebook(NOTEBOOK_1_NAME, data_file)

print('SEND PORTAL DATA', timestamp, data, file=stderr)
self.services.send_portal_data(timestamp, data)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"# Notebook template, the IPS Framework will add a cell before this one\n",
"# defining FILES as a list of state file paths.\n",
"\n",
"# In this example, this notebook is generated during the time loop.\n",
"\n",
"mapping = {}\n",
"for file in FILES:\n",
" with open(file, 'rb') as f:\n",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "5d75faa3",
"metadata": {},
"outputs": [],
"source": [
"# Notebook template, the IPS Framework will add a cell before this one\n",
"# defining FILES as a list of state file paths.\n",
"\n",
"# In this example, this notebook is only generated at the end of the run.\n",
"\n",
"mapping = {}\n",
"for file in FILES:\n",
" with open(file, 'rb') as f:\n",
" mapping[file] = f.read()\n",
"print(mapping)\n"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
70 changes: 61 additions & 9 deletions ipsframework/jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,88 @@
...in a shell on Jupyter NERSC.
"""

from os.path import sep
from typing import List
from typing import List, Optional

import nbformat as nbf

HOOK = '### This cell autogenerated by IPS Framework. DO NOT EDIT UNTIL IPS RUN IS FINALIZED. ###'
"""This hook is used to determine which "cell" the IPS framework should work with.
It is written to a notebook cell on initializing it, and is searched for when adding a data file to it.
"""


def replace_last(source_string: str, old: str, new: str) -> str:
"""Attempt to replace the last occurence of 'old' with 'new' in 'source_string', searching from the right."""
head, _sep, tail = source_string.rpartition(old)
return f'{head}{new}{tail}'

def _get_state_file_notebook_code_cell(variable: str, tags: List[str]):
itemsep = ',\n'
return f"""import os

def _initial_jupyter_file_notebook_cell(variable: str, initial_data_files: Optional[List[str]] = None) -> str:
if not initial_data_files:
initial = ''
else:
itemsep = '\n'
initial = '\n' + itemsep.join([f"'{file}'," for file in initial_data_files])
return f"""{HOOK}
import os
# NOTE: directory should be sim_name plus the run id from the Portal
# NOTE: add absolute path as a comment to the notebook cell
# Uncomment below line to use any state files saved
#{variable} = os.listdir('data')
# files created during the run
{variable} = [{itemsep.join([f"'data{sep}{file}'" for file in tags])}]
{variable} = [{initial}
]
"""


def stage_jupyter_notebook(dest: str, src: str, tags: List[str], variable_name: str, index: int):
""""""
def initialize_jupyter_notebook(dest: str, src: str, variable_name: str, index: int, initial_data_files: Optional[List[str]] = None):
"""Create a new notebook from an old notebook, copying the result from 'src' to 'dest'.
Params:
- dest - location of notebook to create on filesystem
- src - location of source notebook on filesystem (is not overwritten unless src == dest)
- variable_name: what to call the variable
- index: insert new cells at position before this value (will not remove preexisting cells)
- initial_data_files: optional list of files to initialize the notebook with
"""
# to avoid conversion, use as_version=nbf.NO_CONVERT
#
nb: nbf.NotebookNode = nbf.read(src, as_version=4)

header = '# Next cell generated by IPS Framework'
nb['cells'] = (
nb['cells'][:index]
+ [nbf.v4.new_markdown_cell(header), nbf.v4.new_code_cell(_get_state_file_notebook_code_cell(variable_name, tags))]
+ [nbf.v4.new_markdown_cell(header), nbf.v4.new_code_cell(_initial_jupyter_file_notebook_cell(variable_name, initial_data_files))]
+ nb['cells'][index:]
)

nbf.validate(nb)
with open(dest, 'w') as f:
nbf.write(nb, f)


def add_data_file_to_notebook(dest: str, data_file: str, index: Optional[int] = None):
"""Add data file to notebook list.
Params:
- dest: path to notebook which will be modified
- data_file: data file we add to the notebook
- index: optional index of the IPS notebook cell. If not provided,
"""
nb: nbf.NotebookNode = nbf.read(dest, as_version=4)
if index is None:
index = next((i for i, e in enumerate(nb['cells']) if HOOK in e['source']), -1)
if index < 0:
raise Exception('Cannot find IPS notebook node')
ips_cell = nb['cells'][index]['source']

# search from right of string for the ']' character, should work assuming user does not modify the cell past the variable definition
result = replace_last(ips_cell, ']', f"'{data_file}',\n]")
nb['cells'][index]['source'] = result

with open(dest, 'w') as f:
nbf.write(nb, f)
6 changes: 3 additions & 3 deletions ipsframework/portalBridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def send_post_data(conn: Connection, stop: EventType, url: str):
break


def send_put_jupyter_url(conn: Connection, stop: EventType, url: str):
def send_post_jupyter_url(conn: Connection, stop: EventType, url: str):
fail_count = 0

http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25))
Expand All @@ -110,7 +110,7 @@ def send_put_jupyter_url(conn: Connection, stop: EventType, url: str):
# TODO - consider using multipart/form-data instead
try:
resp = http.request(
'PUT',
'POST',
url,
body=json.dumps({'url': next_val['url'], 'portal_runid': next_val['portal_runid']}).encode(),
headers={
Expand Down Expand Up @@ -430,7 +430,7 @@ def send_notebook_url(self, sim_data, event_data):
self.dataurl_parent_conn, child_conn = Pipe()
self.dataurl_childProcessStop = Event()
self.dataurl_childProcess = Process(
target=send_put_jupyter_url, args=(child_conn, self.dataurl_childProcessStop, self.portal_url + '/api/data/add_url')
target=send_post_jupyter_url, args=(child_conn, self.dataurl_childProcessStop, self.portal_url + '/api/data/add_url')
)
self.dataurl_childProcess.start()
self.dataurl_first_event = False
Expand Down
47 changes: 28 additions & 19 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import logging.handlers
import os
import pathlib
import queue
import shutil
import signal
Expand All @@ -28,7 +29,7 @@
from . import ipsutil, messages
from .cca_es_spec import initialize_event_service
from .ips_es_spec import eventManager
from .jupyter import stage_jupyter_notebook
from .jupyter import add_data_file_to_notebook, initialize_jupyter_notebook
from .taskManager import TaskInit

RunningTask = namedtuple('RunningTask', ['process', 'start_time', 'timeout', 'nproc', 'cores_allocated', 'command', 'binary', 'args'])
Expand Down Expand Up @@ -1842,14 +1843,15 @@ def get_staged_jupyterhub_files(self) -> List[str]:
# TODO generic exception
raise Exception('Unable to initialize base JupyterHub dir')

return os.listdir(os.path.join(self._jupyterhub_dir, 'data'))
data_dir = pathlib.Path(pathlib.Path(self._jupyterhub_dir) / 'data')
return [str(p.resolve()) for p in data_dir.glob('*')]

def jupyterhub_make_state(self, state_file_path: str, timestamp: float) -> str:
"""
Move a state file into the JupyterHub directory.
Returns:
- the path to the state file in the JupyterHub directory
- the path to the state file in the JupyterHub directory. This will be an absolute path.
Raises:
- Exception, if unable to move file to the provided JUPYTERHUB_DIR
Expand Down Expand Up @@ -1885,22 +1887,21 @@ def _get_jupyterhub_url(self) -> Optional[str]:
url += f'ipsframework/runs/{runid}/'
return url

def stage_jupyter_notebook(
def initialize_jupyter_notebook(
self,
dest_notebook_name: str,
source_notebook_path: str,
tags: List[str],
initial_data_files: Optional[List[str]] = None,
variable_name: str = 'FILES',
cell_to_modify: int = 0,
) -> None:
"""Loads a notebook from source_notebook_path, adds a cell to load the data, and then saves it to source_notebook_path.
"""Loads a notebook from source_notebook_path, adds a cell to load the data, and then saves it to source_notebook_path. Will also try to register the notebook with the IPS Portal, if available.
Does not modify the source notebook.
Params:
- dest_notebook_name: name of the JupyterNotebook you want to write (do not include file paths).
- source_notebook_path: location you want to load the source notebook from
- tags: list of state files you want to load in the notebook.
- variable_name: name of the variable you want to load files from (default: "FILES")
- cell_to_modify: which cell in the JupyterNotebook you want to add the data call to (0-indexed).
(This will not overwrite any cells, just appends.)
Expand All @@ -1910,22 +1911,14 @@ def stage_jupyter_notebook(
if not self._init_jupyter():
raise Exception('Unable to initialize base JupyterHub dir')

stage_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path, tags, variable_name, cell_to_modify)
# adds notebook to JupyterHub
initialize_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path, variable_name, cell_to_modify, initial_data_files)

def portal_register_jupyter_notebook(self, notebook_name: str) -> None:
"""Associate a JupyterNotebook with tags on the IPS Portal
NOTE: It's best to ONLY run this if you're wanting to associate multiple data files with a single notebook.
If you just want to save a single file, set the appropriate parameter on send_portal_data instead.
Params
- notebook_name: name of the notebook (do not provide any directories, use the config file for this)
- tags: list of tags to associate the notebook with
"""
# register notebook with IPS Portal
url = self._get_jupyterhub_url()
if not url:
return
url += notebook_name
url += dest_notebook_name

event_data = {}
event_data['sim_name'] = self.sim_conf['__PORTAL_SIM_NAME']
Expand All @@ -1938,6 +1931,22 @@ def portal_register_jupyter_notebook(self, notebook_name: str) -> None:
self.publish('_IPS_MONITOR', 'PORTAL_REGISTER_NOTEBOOK', event_data)
self._send_monitor_event('IPS_PORTAL_REGISTER_NOTEBOOK', f'URL = {url}')

def add_data_file_to_notebook(self, notebook_name: str, state_file: str, index: Optional[int] = None):
"""Add data file to notebook list.
This function assumes that a notebook has already been created with intialize_jupyter_notebook. Using this function does not call the IPS Portal.
Params:
- notebook_name: name of notebook which will be modified. Note that this path is relative to the JupyterHub directory.
- data_file: data file we add to the notebook (simple string). This value should almost always be the return value from "self.services.jupyterhub_make_state".
- index: optional index of the IPS notebook cell. If not provided, the IPS Framework will attempt to automatically find the cell it created,
which should work for every usecase where you don't anticipate modifying the notebook until after the run is complete.
"""
if not self._jupyterhub_dir:
if not self._init_jupyter():
raise Exception('Unable to initialize base JupyterHub dir')
add_data_file_to_notebook(f'{self._jupyterhub_dir}{notebook_name}', state_file, index)

def publish(self, topicName, eventName, eventBody):
"""
Publish event consisting of *eventName* and *eventBody* to topic *topicName* to the IPS event service.
Expand Down

0 comments on commit 106f6dd

Please sign in to comment.