Skip to content

Commit

Permalink
Merge pull request #172 from rosswhitfield/fix_trace_duplicate_step_call
Browse files Browse the repository at this point in the history
Fix trace IDs when multiple calls to component method with same timestamp
  • Loading branch information
rosswhitfield authored Sep 15, 2022
2 parents 5d5cb8a + 646cdb7 commit f310a1e
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 15 deletions.
11 changes: 8 additions & 3 deletions ipsframework/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, services, config):
self.__start_time = 0.0
self.__sys_exit = None
self.__method_name = None
self.__call_id = 0
self.__args = None
for i in config.keys():
try:
Expand Down Expand Up @@ -122,7 +123,7 @@ def __run__(self):
msg = self.__invocation_q.get()
self.services.log('Received Message ')
sender_id = msg.sender_id
call_id = msg.call_id
self.__call_id = msg.call_id
self.__method_name = msg.target_method
self.__args = msg.args
keywords = msg.keywords
Expand All @@ -140,12 +141,12 @@ def __run__(self):
self.services.exception('Uncaught Exception in component method.')
response_msg = MethodResultMessage(self.component_id,
sender_id,
call_id,
self.call_id,
Message.FAILURE, e)
else:
response_msg = MethodResultMessage(self.component_id,
sender_id,
call_id,
self.call_id,
Message.SUCCESS, retval)
self.services.fwk_in_q.put(response_msg)

Expand All @@ -169,6 +170,10 @@ def start_time(self):
def method_name(self):
return self.__method_name

@property
def call_id(self):
return self.__call_id

@property
def args(self):
return self.__args
Expand Down
7 changes: 4 additions & 3 deletions ipsframework/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ def run(self):
start_time=start_time,
end_time=time.time(),
target=comp,
operation=f'{method}({arg})')
operation=f'{method}({arg})',
call_id=msg.call_id)
sim_msg_list = self.call_queue_map[msg.call_id]
del self.call_queue_map[msg.call_id]
if msg.status == Message.FAILURE:
Expand Down Expand Up @@ -581,7 +582,7 @@ def initiate_new_simulation(self, sim_name):
self.call_queue_map[call_id] = msg_list
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()

def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None):
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None, call_id=0):
"""
Publish a portal monitor event to the *_IPS_MONITOR* event topic.
Event topics that start with an underscore are reserved for use by the
Expand Down Expand Up @@ -674,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
if target is not None:
trace['localEndpoint'] = {"serviceName": target}
trace['name'] = operation
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16]

if trace:
Expand Down
27 changes: 20 additions & 7 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ def _send_monitor_event(self,
target=None,
operation=None,
procs_requested=None,
cores_allocated=None):
cores_allocated=None,
call_id=0):
"""
Construct and send an event populated with the component's
information, *eventType*, *comment*, *ok*, *state*, and a wall time
Expand Down Expand Up @@ -427,9 +428,10 @@ def _send_monitor_event(self,
trace['name'] = operation
formatted_args = ['%.3f' % (x) if isinstance(x, float)
else str(x) for x in self.component_ref.args]
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})"
.encode()).hexdigest()[:16]
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
trace['parentId'] = hashlib.md5(
f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)}):{self.component_ref.call_id}"
.encode()).hexdigest()[:16]
trace['tags'] = {}
if procs_requested is not None:
trace['tags']['procs_requested'] = str(procs_requested)
Expand Down Expand Up @@ -549,7 +551,8 @@ def wait_call(self, call_id, block=True):
end_time=time.time(),
elapsed_time=time.time()-start_time,
target=target,
operation=f'{method_name}({formatted_args})')
operation=f'{method_name}({formatted_args})',
call_id=call_id)
except Exception as e:
self._send_monitor_event('IPS_CALL_END',
f'Error: "{e}" Target = {target_full}',
Expand All @@ -558,6 +561,7 @@ def wait_call(self, call_id, block=True):
elapsed_time=time.time()-start_time,
target=target,
operation=f'{method_name}({formatted_args})',
call_id=call_id,
ok=False)
raise

Expand Down Expand Up @@ -929,7 +933,15 @@ def wait_task(self, task_id, timeout=-1, delay=1):
process.kill()
task_retval = process.wait()
self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' %
(str(task_id), finish_time - start_time))
(str(task_id), finish_time - start_time),
start_time=start_time,
end_time=finish_time,
elapsed_time=finish_time - start_time,
procs_requested=nproc,
cores_allocated=cores,
target=binary,
operation=" ".join(args),
call_id=task_id)
else:
self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' %
(str(task_id), finish_time - start_time),
Expand All @@ -939,7 +951,8 @@ def wait_task(self, task_id, timeout=-1, delay=1):
procs_requested=nproc,
cores_allocated=cores,
target=binary,
operation=" ".join(args))
operation=" ".join(args),
call_id=task_id)

del self.task_map[task_id]
try:
Expand Down
9 changes: 9 additions & 0 deletions tests/components/drivers/driver_double_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ipsframework import Component


class driver(Component):
def step(self, timestamp=0.0, **keywords):
w = self.services.get_port('WORKER')
# call the same worker step twice to check that the trace is correct
self.services.call(w, 'step', 0)
self.services.call(w, 'step', 0)
14 changes: 14 additions & 0 deletions tests/components/workers/simple_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -------------------------------------------------------------------------------
# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information.
# -------------------------------------------------------------------------------
from ipsframework import Component


class simple_sleep(Component):
def step(self, timestamp=0.0, **keywords):
self.services.wait_task(
self.services.launch_task(1,
"/tmp",
"/bin/sleep",
1)
)
4 changes: 2 additions & 2 deletions tests/helloworld/test_helloworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ def handle(self):
assert 'duration' in trace
assert 'timestamp' in trace
assert 'id' in trace
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16]
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000):7'.encode()).hexdigest()[:16]
assert 'traceId' in trace
assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest()
assert 'parentId' in trace
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16]
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0):5'.encode()).hexdigest()[:16]
assert 'localEndpoint' in trace
assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2'
136 changes: 136 additions & 0 deletions tests/new/test_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import glob
import json
import hashlib
from ipsframework import Framework


def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False):
platform_file = tmpdir.join('platform.conf')

platform = """MPIRUN = eval
NODE_DETECTION = manual
CORES_PER_NODE = 2
SOCKETS_PER_NODE = 1
NODE_ALLOCATION_MODE = shared
HOST =
SCRATCH =
"""

with open(platform_file, 'w') as f:
f.write(platform)

config_file = tmpdir.join('ips.config')

config = f"""RUN_COMMENT = trace testing
SIM_NAME = trace
LOG_FILE = {str(tmpdir)}/sim.log
LOG_LEVEL = INFO
SIM_ROOT = {str(tmpdir)}
SIMULATION_MODE = NORMAL
[PORTS]
NAMES = DRIVER WORKER
[[DRIVER]]
IMPLEMENTATION = DRIVER
[[WORKER]]
IMPLEMENTATION = WORKER
[DRIVER]
CLASS = DRIVER
SUB_CLASS =
NAME = driver
BIN_PATH =
NPROC = 1
INPUT_FILES =
OUTPUT_FILES =
SCRIPT =
MODULE = components.drivers.driver_double_trace
[WORKER]
CLASS = WORKER
SUB_CLASS =
NAME = simple_sleep
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT =
MODULE = components.workers.simple_sleep
"""

with open(config_file, 'w') as f:
f.write(config)

return platform_file, config_file


def test_trace_info(tmpdir):
platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1)

framework = Framework(config_file_list=[str(config_file)],
log_file_name=str(tmpdir.join('ips.log')),
platform_file_name=str(platform_file),
debug=None,
verbose_debug=None,
cmd_nodes=0,
cmd_ppn=0)

framework.run()

# check simulation_log, make sure it includes events from dask tasks
json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json")))
assert len(json_files) == 1
with open(json_files[0], 'r') as json_file:
lines = json_file.readlines()
lines = [json.loads(line.strip()) for line in lines]
assert len(lines) == 17

portal_runid = lines[0]['portal_runid']

traces = [e['trace'] for e in lines if "trace" in e]

assert len(traces) == 8

call_ids = [5, 1, 8, 2, 9, 7, 10, None]
service_names = ['trace@driver@1',
'/bin/sleep',
'trace@simple_sleep@2',
'/bin/sleep',
'trace@simple_sleep@2',
'trace@driver@1',
'trace@driver@1',
'trace@FRAMEWORK@Framework@0']
names = ['init(0)',
'1',
'step(0)',
'1',
'step(0)',
'step(0)',
'finalize(0)',
None]
tags = [None,
{"procs_requested": "1", "cores_allocated": "1"},
{},
{"procs_requested": "1", "cores_allocated": "1"},
{},
None,
None,
{'total_cores': '2'}]
parents = [7, 2, 5, 4, 5, 7, 7, None]

for n, trace in enumerate(traces):
assert isinstance(trace['timestamp'], int)
assert isinstance(trace['duration'], int)
assert trace['traceId'] == hashlib.md5(portal_runid.encode()).hexdigest()
assert trace['localEndpoint']['serviceName'] == service_names[n]
assert "id" in trace
assert trace.get('tags') == tags[n]

if names[n]:
assert trace['name'] == names[n]
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}:{trace['name']}:{call_ids[n]}".encode()).hexdigest()[:16]
else:
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}".encode()).hexdigest()[:16]

if parents[n]:
if names[parents[n]]:
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}:{names[parents[n]]}:{call_ids[parents[n]]}".encode()).hexdigest()[:16]
else:
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}".encode()).hexdigest()[:16]

0 comments on commit f310a1e

Please sign in to comment.