-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add prototypes for benchmark testing. #94
Conversation
``Traceback`` of ``pytest`` did not show | ||
where exactly the error is from in those cases. | ||
It was resolved by using ``get_event_loop``, | ||
or manually closing the event loop at the end of the test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test case to reproduce similar error:
async def some_coroutine() -> bool:
import asyncio
for _ in range(100):
try:
raise RuntimeError
except RuntimeError:
await asyncio.sleep(0.01)
return True
def wrong_way_of_running_async_calls():
import asyncio
new_loop = asyncio.new_event_loop()
return new_loop.run_until_complete(some_coroutine())
def test_foo():
assert wrong_way_of_running_async_calls()
def test_foo1():
assert wrong_way_of_running_async_calls()
def test_foo2():
assert wrong_way_of_running_async_calls()
Script for deleting all randomly generated topic with prefix of from confluent_kafka.admin import ClusterMetadata
def filter_beamlime_test_topics(cluster_meta: ClusterMetadata) -> list[str]:
test_topic_prefix = "BEAMLIMETEST"
return [topic for topic in cluster_meta.topics if topic.startswith(test_topic_prefix)]
if __name__ == "__main__":
import time
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': 'localhost:9092'})
topic_list = filter_beamlime_test_topics(admin.list_topics())
admin.list_topics().topics
if topic_list:
print("Deleting all existing topics: ", topic_list)
admin.delete_topics(topic_list)
while (topic_list:=filter_beamlime_test_topics(admin.list_topics())):
time.sleep(1)
print('Test topics all deleted.')
else:
print("No test topics to delete.")
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit I don't understand how all the different parts come together here, I would have to spend much longer looking at this to understand it all.
However, I had a superficial look and tried to spot things/patterns that may seem odd.
About:
kafka related tests will not be included in the CI test (for now) since it requires kafka broker running in the same machine.
It would be nice to get this up and running soon, because there is a lot of code that pertains to Kafka that is currently not being automatically tested (we rely on the contributors to run those tests locally, which is quite dangerous).
|
||
|
||
def provide_kafka_producer(broker_address: KafkaBootstrapServer) -> Producer: | ||
return Producer({'bootstrap.servers': broker_address}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok, but just making sure there is no typo here: 'bootstrap.servers'
is the same key for both Producer
and AdminClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes...! This key is minimum requirement so it raises an error if it's not correct.
I'm not sure how to handle these configuration at the moment...
tests/prototypes/prototype_kafka.py
Outdated
import time | ||
|
||
admin.create_topics([NewTopic(topic)]) | ||
time.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment as to why the sleep is needed?
""" | ||
Collect all coroutines of daemons and schedule them into the event loop. | ||
|
||
Notes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
|
||
def provide_time_coords(rng: RNG, ef_rate: EventFrameRate) -> TimeCoords: | ||
dummy_zeros = [zr for zr in range(int(13620492e11), int(13620492e11) + ef_rate)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does the number 13620492e11
come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a random datetime I picked... 2013-02-28T12:00:00
: D.... Actually event_time_zero was not used in this dummy workflow. Maybe it should be...
tests/prototypes/workflows.py
Outdated
return ReducedData( | ||
binned.transform_coords( | ||
['L', 'wavelength'], | ||
graph={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that the same graph as on L49? Can we avoid the duplication?
Maybe this requires c_a
, c_b
, and c_c
to also be part of the graph, but maybe that is a good thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes probably...?
so the idea was to keep workflow_script
from L36 to L58 to be a through workflow script from the input to the output,
and the rest is granular steps of that workflow, probably run by sciline
later.
So they are meant to be duplicated.
But the graph might be separated and re-used I agree.
tests/prototypes/workflows.py
Outdated
c_a: ConstantA = default_c_a, | ||
c_b: ConstantB = default_c_b, | ||
c_c: ConstantC = default_c_c, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about having default values here?
Yes I agree. |
tests/prototypes/workflows.py
Outdated
graph = provide_coord_transform_graph() | ||
|
||
transformed = binned.transform_coords(['L', 'wavelength'], graph=graph) | ||
return transformed.hist(wavelength=histogram_bin_size).sum('L') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instrument like DREAM we need thousands of bins. This will run out of memory, instead we will have to use something like
return transformed.bins.concat('L').hist(wavelength=histogram_bin_size)
tests/prototypes/workflows.py
Outdated
'frame_offset': lambda event_time_zero: event_time_zero - first_pulse_time, | ||
'time_offset_pivot': time_offset_pivot, | ||
'tof': tof_from_time_offset, | ||
'wavelength': wavelength_from_tof, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SimonHeybrock I added converting event_time_offset
, event_time_zero
to tof
in this graph.
Or should I just use scippneutron
...?
Haven't run the whole set of benchmarks yet though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we absolutely should use scippneutron
. If we start replicating logic here we will run into big problems, since things will never be in sync.
def process_first_data(self, data: Events) -> None: | ||
sample_event = data[0] | ||
first_pulse_time = sample_event.coords['event_time_zero'][0] | ||
self.workflow.providers[FirstPulseTime] = lambda: first_pulse_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataReductionApp
now has separate process_first_data
method so that it can retrieve the FirstPulseTime
(which is the event_time_zero
of the first incoming data in this case).
And then it updates the workflow
providers to return the retrieved value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mainly looked at the places where data is touched/transformed, I believe @nvaytet took care of reviewing the other code?
from streaming_data_types.eventdata_ev44 import deserialise_ev44 | ||
|
||
data = deserialise_ev44(self.merge_bytes(data_list)) | ||
event_zeros = np.full(len(data.pixel_id), data.reference_time[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initializing a full numpy array and copying into a variable is a bit inefficient, should directly init a scipp variable. Use scipp.full
?
tests/prototypes/workflows.py
Outdated
'frame_offset': lambda event_time_zero: event_time_zero - first_pulse_time, | ||
'time_offset_pivot': time_offset_pivot, | ||
'tof': tof_from_time_offset, | ||
'wavelength': wavelength_from_tof, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we absolutely should use scippneutron
. If we start replicating logic here we will run into big problems, since things will never be in sync.
tests/prototypes/workflows.py
Outdated
frame_rate: FrameRate, | ||
) -> Histogrammed: | ||
merged = sc.concat(da_list, dim='event') | ||
pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be used for every processed chunk, can we avoid recreating it all the time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script is for processing all data at once, not per chunk.
But as it seems confusing, I just deleted it...!
tests/prototypes/workflows.py
Outdated
pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels) | ||
binned = merged.group(pixel_ids) | ||
|
||
graph = provide_coord_transform_graph(frame_rate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this recreated every time?
|
||
def provide_workflow( | ||
num_pixels: NumPixels, histogram_binsize: HistogramBinSize, frame_rate: FrameRate | ||
) -> Workflow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the code here seems to be duplicating what is in workflow_script
above. Are both used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it was more for showing what it is doing.
I thought it was useful since this workflow doesn't have a nice documentation with graphs... but maybe it is more confusing than useful. I'll remove it.
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
Co-authored-by: Neil Vaytet <[email protected]>
…with VisualizationDaemon
36d2040
to
f9593b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is what you meant... @SimonHeybrock but I replaced the existing graph with frame unwrapping helper from scippneutron
package.
|
||
def provide_workflow( | ||
num_pixels: NumPixels, histogram_binsize: HistogramBinSize, frame_rate: FrameRate | ||
) -> Workflow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it was more for showing what it is doing.
I thought it was useful since this workflow doesn't have a nice documentation with graphs... but maybe it is more confusing than useful. I'll remove it.
tests/prototypes/workflows.py
Outdated
frame_rate: FrameRate, | ||
) -> Histogrammed: | ||
merged = sc.concat(da_list, dim='event') | ||
pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script is for processing all data at once, not per chunk.
But as it seems confusing, I just deleted it...!
There are 2 minimum prototype daemons with dummy workflows for benchmark testings.
This PR does not have benchmarking code. It only has unit tests of each prototypes.
Review points
prototype_mini.StopWatch
is used to benchmark the process.DataStreamListener
/DataReduction
/Visualization
of prototypes.workflows
.Minimum prototype
These are the minimum components of the real-time data reduction testing.
tests/prototypes/prototype_mini.py
tests/prototypes/parameters.py
tests/prototypes/random_data_providers.py
tests/prototypes/workflows.py
.prototype_mini
can also be run as a script,python -m tests.prototypes.prototype_mini
.It simulates the data stream between async daemons.
The dummy workflow contains binning, transform coordinates, and histogram.
Kafka streaming tests
tests/prototypes/prototype_kafka.py
contains the same daemons asprototype_mini.py
but with real kafka api andprotobuf
serialization/deserialization process.kafka
broker should be running in the same machine in order to run this test.Kafka related tests filter.
kafka
related tests will not be included in the CI test (for now) since it requireskafka
broker running in the same machine.Those tests can be filtered by requesting
kafka_test
fixture.kafka_test
fixture will then check if there was a--kafka-test
flag included as an argument ofpytest
command and then skip the test if it was not.You can include those tests by
pytest --kafka-test
.The reason for using
fixture
and notmark
.Adding extra marks is also an option to include/exclude marked tests.
It should be configured in the
pytest.ini
.fixture
was used for now sincemark
has limitation of how to decide whether it is skipping the test or not.