Skip to content

Commit

Permalink
Merge pull request #155 from rosswhitfield/profile_logging
Browse files Browse the repository at this point in the history
Add trace information to the monitor events
  • Loading branch information
rosswhitfield authored Mar 4, 2022
2 parents 635a5f3 + 1d96683 commit 8fb6efa
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 137 deletions.
6 changes: 3 additions & 3 deletions doc/examples/dask/simulation_log.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = method, elasped time = 0.50s",
"comment": "task_name = method, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = function, elasped time = 0.50s",
"comment": "task_name = function, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.85",
"state": "Running",
"comment": "task_name = binary, elasped time = 0.52s",
"comment": "task_name = binary, elapsed time = 0.52s",
}
14 changes: 8 additions & 6 deletions ipsframework/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(self, services, config):
self.config = config
self.start_time = 0.0
self.sys_exit = None
self.method_name = None
self.args = None
for i in config.keys():
try:
setattr(self, i, config[i])
Expand Down Expand Up @@ -121,19 +123,19 @@ def __run__(self):
self.services.log('Received Message ')
sender_id = msg.sender_id
call_id = msg.call_id
method_name = msg.target_method
args = msg.args
self.method_name = msg.target_method
self.args = msg.args
keywords = msg.keywords
formatted_args = ['%.3f' % (x) if isinstance(x, float)
else str(x) for x in args]
else str(x) for x in self.args]
if keywords:
formatted_args += [" %s=" % k + str(v) for (k, v) in keywords.items()]

self.services.debug('Calling method ' + method_name +
self.services.debug('Calling method ' + self.method_name +
"(" + ' ,'.join(formatted_args) + ")")
try:
method = getattr(self, method_name)
retval = method(*args, **keywords)
method = getattr(self, self.method_name)
retval = method(*self.args, **keywords)
except Exception as e:
self.services.exception('Uncaught Exception in component method.')
response_msg = MethodResultMessage(self.component_id,
Expand Down
62 changes: 50 additions & 12 deletions ipsframework/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import logging
import os
import time
import hashlib
from ipsframework import platformspec
from ipsframework.messages import Message, ServiceRequestMessage, \
ServiceResponseMessage, MethodInvokeMessage
Expand Down Expand Up @@ -181,7 +182,7 @@ def __init__(self, config_file_list, log_file_name, platform_file_name=None,
logger.addHandler(self.ch)
self.logger = logger
self.verbose_debug = verbose_debug
self.outstanding_calls_list = []
self.outstanding_calls_list = {}
self.call_queue_map = {}

# add the handler to the root logger
Expand Down Expand Up @@ -421,7 +422,7 @@ def run(self):
for method in ['step', 'finalize']:
req_msg = ServiceRequestMessage(self.component_id, self.component_id,
comp_id, 'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, None, str(comp_id), method, 0))

outstanding_sim_calls[str(comp_id)] = msg_list

Expand All @@ -447,7 +448,7 @@ def run(self):
self.component_id,
comp_id,
'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, sim_name, str(comp_id), method, 0))
# SIMYAN: add the msg_list to the outstanding sim calls
if msg_list:
outstanding_sim_calls[sim_name] = msg_list
Expand All @@ -460,11 +461,15 @@ def run(self):
# send off first round of invocations...
try:
for sim_name, msg_list in outstanding_sim_calls.items():
msg = msg_list.pop(0)
msg, sim_name, comp, method, arg = msg_list.pop(0)
self.debug('Framework sending message %s ', msg.__dict__)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_BEGIN')
call_id = self.task_manager.init_call(msg, manage_return=False)
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()
except Exception:
self.exception('encountered exception during fwk.run() sending first round of invocations (init of inits and fwk comps)')
self.terminate_all_sims(status=Message.FAILURE)
Expand Down Expand Up @@ -503,7 +508,15 @@ def run(self):
self.task_manager.return_call(msg)
continue
# Message is a result from a framework invocation
self.outstanding_calls_list.remove(msg.call_id)
sim_name, comp, method, arg, start_time = self.outstanding_calls_list.pop(msg.call_id)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_END',
start_time=start_time,
end_time=time.time(),
target=comp,
operation=f'{method}({arg})')
sim_msg_list = self.call_queue_map[msg.call_id]
del self.call_queue_map[msg.call_id]
if msg.status == Message.FAILURE:
Expand All @@ -520,10 +533,14 @@ def run(self):
comment = 'Simulation Ended'
ok = True
try:
next_call_msg = sim_msg_list.pop(0)
next_call_msg, sim_name, comp, method, arg = sim_msg_list.pop(0)
if sim_name is not None:
self._send_monitor_event(sim_name=sim_name,
comment=f'Target = {comp}:{method}({arg})',
eventType='IPS_CALL_BEGIN')
call_id = self.task_manager.init_call(next_call_msg,
manage_return=False)
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()
self.call_queue_map[call_id] = sim_msg_list
except IndexError:
sim_comps = self.config_manager.get_component_map() # Get any new dynamic simulations
Expand Down Expand Up @@ -554,16 +571,16 @@ def initiate_new_simulation(self, sim_name):
req_msg = ServiceRequestMessage(self.component_id,
self.component_id, comp_id,
'init_call', method, 0)
msg_list.append(req_msg)
msg_list.append((req_msg, sim_name, str(comp_id), method, 0))

# send off first round of invocations...
msg = msg_list.pop(0)
msg, sim_name, comp, method, arg = msg_list.pop(0)
self.debug('Framework sending message %s ', msg.__dict__)
call_id = self.task_manager.init_call(msg, manage_return=False)
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list.append(call_id)
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()

def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'):
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True', target=None, operation=None, start_time=None, end_time=None):
"""
Publish a portal monitor event to the *_IPS_MONITOR* event topic.
Event topics that start with an underscore are reserved for use by the
Expand Down Expand Up @@ -633,6 +650,27 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok='True'):
portal_data['state'] = 'Completed'
portal_data['stopat'] = time.strftime('%Y-%m-%d|%H:%M:%S%Z',
time.localtime())
# Zipkin json format
portal_data['trace'] = {"timestamp": int(self.start_time*1e6),
"duration": int((time.time() - self.start_time)*1e6),
"localEndpoint": {
"serviceName": str(self.component_id)
},
"id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16],
'tags': {'total_cores': str(self.resource_manager.total_cores)}}
elif eventType == "IPS_CALL_END":
trace = {} # Zipkin json format
if start_time is not None and end_time is not None:
trace['timestamp'] = int(start_time*1e6) # convert to microsecond
trace['duration'] = int((end_time-start_time)*1e6) # convert to microsecond
if target is not None:
trace['localEndpoint'] = {"serviceName": target}
trace['name'] = operation
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16]

if trace:
portal_data['trace'] = trace

event_body = {}
event_body['sim_name'] = sim_name
Expand Down
3 changes: 3 additions & 0 deletions ipsframework/portalBridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def process_event(self, topicName, theEvent):
portal_data['portal_runid'] = sim_data.portal_runid
portal_data['seqnum'] = sim_data.counter

if 'trace' in portal_data:
portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest()

self.send_event(sim_data, portal_data)
sim_data.counter += 1
self.counter += 1
Expand Down
32 changes: 16 additions & 16 deletions ipsframework/resourceManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def get_allocation(self, comp_id, nproc, task_id,
else:
try:
self.processes += nproc
k = 0
cores_allocated = 0
alloc_procs = 0
node_file_entries = []
if whole_nodes:
Expand All @@ -371,10 +371,10 @@ def get_allocation(self, comp_id, nproc, task_id,
self.avail_nodes.remove(n)
self.alloc_nodes.append(n)
node_file_entries.append((n, cores))
k += procs
self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
cores_allocated += procs
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
elif whole_socks:
# -------------------------------
# whole sock allocation
Expand All @@ -388,17 +388,17 @@ def get_allocation(self, comp_id, nproc, task_id,
whole_socks,
task_id, comp_id,
to_alloc)
k += len(cores)
cores_allocated += len(cores)
alloc_procs = min([ppn, len(cores)])
node_file_entries.append((n, cores))
if n not in self.alloc_nodes:
self.alloc_nodes.append(n)
if node.avail_cores - node.total_cores == 0:
self.avail_nodes.remove(n)

self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
else:
# -------------------------------
# single core allocation
Expand All @@ -407,22 +407,22 @@ def get_allocation(self, comp_id, nproc, task_id,
node = self.nodes[n]
if node.avail_cores > 0:
to_alloc = min([ppn, node.avail_cores,
nproc - k])
nproc - cores_allocated])
self.fwk.debug("allocate task_id %d node %s %d cores" % (task_id, n, to_alloc))
procs, cores = node.allocate(whole_nodes,
whole_socks,
task_id, comp_id,
to_alloc)
k += procs
cores_allocated += procs
node_file_entries.append((n, cores))
if n not in self.alloc_nodes:
self.alloc_nodes.append(n)
if node.avail_cores - node.total_cores == 0:
self.avail_nodes.remove(n)

self.alloc_cores += k
self.avail_cores -= k
self.active_tasks.update({task_id: (comp_id, nproc, k)})
self.alloc_cores += cores_allocated
self.avail_cores -= cores_allocated
self.active_tasks.update({task_id: (comp_id, nproc, cores_allocated)})
except Exception:
print("Available Nodes:")
for nm in self.avail_nodes:
Expand All @@ -443,10 +443,10 @@ def get_allocation(self, comp_id, nproc, task_id,

if whole_nodes:
self.report_RM_status("allocation for task %d using whole nodes" % task_id)
return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes
return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes, cores_allocated
else:
self.report_RM_status("allocation for task %d using partial nodes" % task_id)
return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes
return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes, cores_allocated

def check_whole_node_cap(self, nproc, ppn):
"""
Expand Down
Loading

0 comments on commit 8fb6efa

Please sign in to comment.