Skip to content

Commit

Permalink
Merge pull request #171 from rosswhitfield/child_runs
Browse files Browse the repository at this point in the history
Add abillity to set parent/child relation between different runs for portal.
  • Loading branch information
rosswhitfield authored Sep 1, 2022
2 parents b328a33 + 8387541 commit 5d5cb8a
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 22 deletions.
Binary file added doc/user_guides/child_runs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/user_guides/child_runs_trace.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
42 changes: 41 additions & 1 deletion doc/user_guides/portal_guides.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ in either your :doc:`Platform Configuration File<platform>` or your
Tracing
-------

IPS (version >= 0.6.0) has the ability to capture a trace of the
.. note::

New in IPS-Framework 0.6.0

IPS has the ability to capture a trace of the
workflow to allow analysis and visualizations. The traces are captured
in the `Zipkin Span format <https://zipkin.io/zipkin-api/>`_ and
viewed within IPS portal using `Jaeger
Expand All @@ -60,3 +64,39 @@ The statistics can be further broken down by operation.
.. note::

Self time (ST) is the total time spent in a span when it was not waiting on children. For example, a 10ms span with two 4ms non-overlapping children would have self-time = 10ms - 2 * 4ms = 2ms.


Child Runs
----------

.. note::

New in IPS-Framework 0.7.0

If you have a workflow where you are running ``ips`` as a task of
another IPS simulation you can create a relation between them that
will allow it to be viewed together in the IPS-portal and get a single
trace for the entire collection.

To setup the hierarchical structure between different IPS runs, so if
one run starts other runs as a separate simulation, you can set the
``PARENT_PORTAL_RUNID`` parameter in the child simulation
configuration. This can be done dynamically from the parent simulation
like:

.. code-block:: python
child_conf['PARENT_PORTAL_RUNID'] = self.services.get_config_param("PORTAL_RUNID")
This is automatically configured when running
``ips_dakota_dynamic.py``.

The child runs will not appear on the main runs list but will appear
on a tab next to the events.

.. image:: child_runs.png

The trace of the primary simulation will contain the traces from all
the simulations:

.. image:: child_runs_trace.png
14 changes: 8 additions & 6 deletions ipsframework/configurationManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import uuid
import logging
import socket
import time
from multiprocessing import Queue, Process, set_start_method
from .configobj import ConfigObj
from . import ipsLogging
Expand Down Expand Up @@ -40,7 +41,8 @@ class SimulationData:
entry in the configurationManager class
"""

def __init__(self, sim_name):
def __init__(self, sim_name, start_time=time.time()):
self.start_time = start_time
self.sim_name = sim_name
self.portal_sim_name = None
self.sim_root = None
Expand Down Expand Up @@ -281,7 +283,7 @@ def initialize(self, data_mgr, resource_mgr, task_mgr):
sim_name_list.append(sim_name)
sim_root_list.append(sim_root)
log_file_list.append(log_file)
new_sim = self.SimulationData(sim_name)
new_sim = self.SimulationData(sim_name, self.fwk.start_time)
conf['__PORTAL_SIM_NAME'] = sim_name
new_sim.sim_conf = conf
new_sim.config_file = conf_file
Expand All @@ -301,7 +303,7 @@ def initialize(self, data_mgr, resource_mgr, task_mgr):
if not self.fwk_sim_name:
fwk_sim_conf = conf.dict()
fwk_sim_conf['SIM_NAME'] = '_'.join([conf['SIM_NAME'], 'FWK'])
fwk_sim = self.SimulationData(fwk_sim_conf['SIM_NAME'])
fwk_sim = self.SimulationData(fwk_sim_conf['SIM_NAME'], self.fwk.start_time)
fwk_sim.sim_conf = fwk_sim_conf
fwk_sim.sim_root = new_sim.sim_root
fwk_sim.log_file = self.fwk.log_file # sys.stdout
Expand Down Expand Up @@ -380,6 +382,7 @@ def _initialize_fwk_components(self):
portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER']
except KeyError:
portal_conf['USER'] = self.platform_conf['USER']
portal_conf['HOST'] = self.platform_conf['HOST']
if self.fwk.log_level == logging.DEBUG:
portal_conf['LOG_LEVEL'] = 'DEBUG'

Expand Down Expand Up @@ -502,7 +505,6 @@ def _create_component(self, comp_conf, sim_data):

# SIMYAN: removed else conditional, copying files in runspaceInit
# component now

svc_response_q = Queue(0)
invocation_q = Queue(0)
component_id = ComponentID(class_name, sim_name)
Expand All @@ -512,7 +514,7 @@ def _create_component(self, comp_conf, sim_data):
services_proxy = ServicesProxy(self.fwk, fwk_inq, svc_response_q,
sim_data.sim_conf, log_pipe_name)
new_component = component_class(services_proxy, comp_conf)
new_component.__initialize__(component_id, invocation_q, self.fwk.start_time)
new_component.__initialize__(component_id, invocation_q, sim_data.start_time)
services_proxy.__initialize__(new_component)
self.comp_registry.addEntry(component_id, svc_response_q,
invocation_q, new_component,
Expand Down Expand Up @@ -643,7 +645,7 @@ def create_simulation(self, sim_name, config_file, override, sub_workflow=False)
self.sim_name_list.append(sim_name)
self.sim_root_list.append(sim_root)
self.log_file_list.append(log_file)
new_sim = self.SimulationData(sim_name)
new_sim = self.SimulationData(sim_name, start_time=self.fwk.start_time if sub_workflow else time.time())
new_sim.sim_conf = conf
new_sim.config_file = config_file
new_sim.sim_root = sim_root
Expand Down
1 change: 1 addition & 0 deletions ipsframework/dakota_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def step(self, timestamp=0, **keywords): # pragma: no cover
self.old_master_conf['SIM_NAME'] = self.sim_name + '_%s' % (instance_id)
self.old_master_conf['LOG_FILE'] = self.sim_logfile + '_%s' % (instance_id)
self.old_master_conf['OUT_REDIRECT'] = 'TRUE'
self.old_master_conf['PARENT_PORTAL_RUNID'] = services.get_config_param("PORTAL_RUNID")
fname = "%s.out" % (self.old_master_conf['SIM_NAME'])
fname = os.path.join(self.sim_root, fname)
self.old_master_conf['OUT_REDIRECT_FNAME'] = fname
Expand Down
22 changes: 14 additions & 8 deletions ipsframework/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,15 +603,15 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
portal_data['eventtype'] = eventType
portal_data['ok'] = ok
portal_data['comment'] = comment
portal_data['walltime'] = '%.2f' % (event_time - self.start_time)
portal_data['walltime'] = '%.2f' % (event_time - self.config_manager.sim_map[sim_name].start_time)
portal_data['time'] = getTimeString(time.localtime(event_time))

topic_name = '_IPS_MONITOR'
# portal_data['phystimestamp'] = self.timeStamp
get_config = self.config_manager.get_config_parameter
if eventType == 'IPS_START':
portal_data['state'] = 'Running'
portal_data['host'] = get_config(sim_name, 'HOST')
portal_data['host'] = self.config_manager.get_platform_parameter('HOST')
try:
portal_data['outputprefix'] = get_config(sim_name, 'OUTPUT_PREFIX')
except KeyError:
Expand Down Expand Up @@ -647,18 +647,24 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
portal_data['sim_runid'] = get_config(sim_name, 'RUN_ID')
except KeyError:
pass
portal_data['startat'] = getTimeString(time.localtime(self.start_time))
portal_data['startat'] = getTimeString(time.localtime(self.config_manager.sim_map[sim_name].start_time))
portal_data['ips_version'] = get_versions()['version']

try:
portal_data['parent_portal_runid'] = get_config(sim_name, 'PARENT_PORTAL_RUNID')
except KeyError:
pass

elif eventType == 'IPS_END':
portal_data['state'] = 'Completed'
portal_data['stopat'] = getTimeString(time.localtime(event_time))
# Zipkin json format
portal_data['trace'] = {"timestamp": int(self.start_time*1e6),
"duration": int((event_time - self.start_time)*1e6),
portal_data['trace'] = {"timestamp": int(self.config_manager.sim_map[sim_name].start_time*1e6),
"duration": int((event_time - self.config_manager.sim_map[sim_name].start_time)*1e6),
"localEndpoint": {
"serviceName": str(self.component_id)
"serviceName": f'{sim_name}@{self.component_id}'
},
"id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16],
"id": hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16],
'tags': {'total_cores': str(self.resource_manager.total_cores)}}
elif eventType == "IPS_CALL_END":
trace = {} # Zipkin json format
Expand All @@ -669,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
trace['localEndpoint'] = {"serviceName": target}
trace['name'] = operation
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16]

if trace:
portal_data['trace'] = trace
Expand Down
7 changes: 2 additions & 5 deletions ipsframework/portalBridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def __init__(self, services, config):
:py:class:`component.Component` object.
"""
super().__init__(services, config)
self.host = ''
self.curTime = time.localtime()
self.startTime = self.curTime
self.sim_map = {}
Expand Down Expand Up @@ -109,7 +108,6 @@ def init(self, timestamp=0.0, **keywords):
self.portal_url = self.PORTAL_URL
except AttributeError:
pass
self.host = self.services.get_config_param('HOST')
self.services.subscribe('_IPS_MONITOR', "process_event")
try:
freq = int(self.services.get_config_param("HTML_DUMP_FREQ", silent=True))
Expand Down Expand Up @@ -427,7 +425,7 @@ def init_simulation(self, sim_name, sim_root):

d = datetime.datetime.now()
date_str = "%s.%03d" % (d.strftime("%Y-%m-%dT%H:%M:%S"), int(d.microsecond / 1000))
sim_data.portal_runid = "_".join([self.host, "USER", date_str])
sim_data.portal_runid = "_".join([sim_name, getattr(self, "HOST"), getattr(self, "USER"), date_str])
try:
self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid,
target_sim_name=sim_name)
Expand All @@ -445,8 +443,7 @@ def init_simulation(self, sim_name, sim_root):
(sim_log_dir, oserr.errno, oserr.strerror))
raise

sim_data.monitor_file_name = os.path.join(sim_log_dir,
sim_data.sim_name + '_' + sim_data.portal_runid + '.eventlog')
sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog')
try:
sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0)
except IOError as oserr:
Expand Down
2 changes: 1 addition & 1 deletion tests/dakota/dakota_test_Gaussian.ips
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUN_ID = DAKOTA_Rosenbrock # Identifier for this simulation run
RUN_ID = DAKOTA_Gaussian # Identifier for this simulation run
TOKAMAK_ID = TEST
SHOT_NUMBER = 1 # Numerical identifier for specific case

Expand Down
39 changes: 39 additions & 0 deletions tests/dakota/test_dakota.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import glob
import sys
import json
import pytest
from ipsframework import ips_dakota_dynamic

Expand Down Expand Up @@ -48,6 +49,44 @@ def test_dakota(tmpdir):

assert float(X) == pytest.approx(0.5, rel=1e-4)

# Check PARENT CHILD relationship
# Get parent PORTAL_RUNID
json_files = glob.glob(str(tmpdir.join("DAKOTA_Gaussian_TEST_1").join("simulation_log").join("*.json")))
assert len(json_files) == 1

with open(json_files[0], 'r') as json_file:
lines = json_file.readlines()

lines = [json.loads(line.strip()) for line in lines]
assert len(lines) == 9

# get portal_runid event
portal_runid = lines[0]['portal_runid']
sim_name, host, user, _ = portal_runid.rsplit('_', maxsplit=3)
assert sim_name == "DAKOTA_Gaussian_TEST_1"
assert host == "workstation"
assert user == "user"

# Check child run
json_files = glob.glob(str(tmpdir.join("DAKOTA_Gaussian_TEST_1").join("simulation_*_0000").join("simulation_log").join("*.json")))
assert len(json_files) == 1
with open(json_files[0], 'r') as json_file:
lines = json_file.readlines()

lines = [json.loads(line.strip()) for line in lines]
assert len(lines) == 8
child_portal_runid = lines[0]['portal_runid']
assert child_portal_runid != portal_runid

sim_name, host, user, _ = child_portal_runid.rsplit('_', maxsplit=3)
assert sim_name.startswith("DAKOTA_Gaussian_TEST_1")
assert len(sim_name) > len("DAKOTA_Gaussian_TEST_1")
assert host == "workstation"
assert user == "user"

parent_portal_runid = lines[0]['parent_portal_runid']
assert parent_portal_runid == portal_runid


@mock.patch('ipsframework.ips_dakota_dynamic.DakotaDynamic')
def test_dakota_main(MockDakotaDynamic):
Expand Down
3 changes: 2 additions & 1 deletion tests/dakota/workstation.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
HOST = my_laptop
HOST = workstation
USER = user
MPIRUN = eval

#######################################
Expand Down

0 comments on commit 5d5cb8a

Please sign in to comment.