Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prototypes for benchmark testing. #94

Merged
merged 38 commits into from
Nov 1, 2023
Merged

Add prototypes for benchmark testing. #94

merged 38 commits into from
Nov 1, 2023

Conversation

YooSunYoung
Copy link
Member

@YooSunYoung YooSunYoung commented Sep 22, 2023

There are 2 minimum prototype daemons with dummy workflows for benchmark testings.
This PR does not have benchmarking code. It only has unit tests of each prototypes.

Review points

  • prototype_mini.StopWatch is used to benchmark the process.
  • Three main applications, DataStreamListener/DataReduction/Visualization of prototypes.
  • Dummy workflows in workflows.

Minimum prototype

These are the minimum components of the real-time data reduction testing.

  • tests/prototypes/prototype_mini.py
  • tests/prototypes/parameters.py
  • tests/prototypes/random_data_providers.py
  • tests/prototypes/workflows.py.

prototype_mini can also be run as a script, python -m tests.prototypes.prototype_mini.
It simulates the data stream between async daemons.
The dummy workflow contains binning, transform coordinates, and histogram.

Kafka streaming tests

tests/prototypes/prototype_kafka.py contains the same daemons as prototype_mini.py but with real kafka api and protobuf serialization/deserialization process.
kafka broker should be running in the same machine in order to run this test.

Kafka related tests filter.

kafka related tests will not be included in the CI test (for now) since it requires kafka broker running in the same machine.
Those tests can be filtered by requesting kafka_test fixture.
kafka_test fixture will then check if there was a --kafka-test flag included as an argument of pytest command and then skip the test if it was not.
You can include those tests by pytest --kafka-test.

The reason for using fixture and not mark.

Adding extra marks is also an option to include/exclude marked tests.
It should be configured in the pytest.ini.
fixture was used for now since mark has limitation of how to decide whether it is skipping the test or not.

``Traceback`` of ``pytest`` did not show
where exactly the error is from in those cases.
It was resolved by using ``get_event_loop``,
or manually closing the event loop at the end of the test.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case to reproduce similar error:

async def some_coroutine() -> bool:
    import asyncio
    for _ in range(100):
        try:
            raise RuntimeError
        except RuntimeError:
            await asyncio.sleep(0.01)
    return True


def wrong_way_of_running_async_calls():
    import asyncio
    new_loop = asyncio.new_event_loop()
    return new_loop.run_until_complete(some_coroutine())


def test_foo():
    assert wrong_way_of_running_async_calls()

def test_foo1():
    assert wrong_way_of_running_async_calls()

def test_foo2():
    assert wrong_way_of_running_async_calls()

This was referenced Sep 22, 2023
@YooSunYoung
Copy link
Member Author

Script for deleting all randomly generated topic with prefix of BEAMLIMETEST.

from confluent_kafka.admin import ClusterMetadata

def filter_beamlime_test_topics(cluster_meta: ClusterMetadata) -> list[str]:
    test_topic_prefix = "BEAMLIMETEST"
    return [topic for topic in cluster_meta.topics if topic.startswith(test_topic_prefix)]

if __name__ == "__main__":
    import time

    from confluent_kafka.admin import AdminClient
    
    admin = AdminClient({'bootstrap.servers': 'localhost:9092'})
    topic_list = filter_beamlime_test_topics(admin.list_topics())
    admin.list_topics().topics
    
    if topic_list:
        print("Deleting all existing topics: ", topic_list)
        admin.delete_topics(topic_list)

        while (topic_list:=filter_beamlime_test_topics(admin.list_topics())):
            time.sleep(1)
        
        print('Test topics all deleted.')
    else:
        print("No test topics to delete.")

Copy link
Member

@nvaytet nvaytet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit I don't understand how all the different parts come together here, I would have to spend much longer looking at this to understand it all.

However, I had a superficial look and tried to spot things/patterns that may seem odd.

About:

kafka related tests will not be included in the CI test (for now) since it requires kafka broker running in the same machine.

It would be nice to get this up and running soon, because there is a lot of code that pertains to Kafka that is currently not being automatically tested (we rely on the contributors to run those tests locally, which is quite dangerous).



def provide_kafka_producer(broker_address: KafkaBootstrapServer) -> Producer:
return Producer({'bootstrap.servers': broker_address})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok, but just making sure there is no typo here: 'bootstrap.servers' is the same key for both Producer and AdminClient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes...! This key is minimum requirement so it raises an error if it's not correct.
I'm not sure how to handle these configuration at the moment...

tests/prototypes/prototype_kafka.py Outdated Show resolved Hide resolved
tests/prototypes/prototype_kafka.py Outdated Show resolved Hide resolved
import time

admin.create_topics([NewTopic(topic)])
time.sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment as to why the sleep is needed?

tests/prototypes/prototype_kafka.py Outdated Show resolved Hide resolved
"""
Collect all coroutines of daemons and schedule them into the event loop.

Notes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍



def provide_time_coords(rng: RNG, ef_rate: EventFrameRate) -> TimeCoords:
dummy_zeros = [zr for zr in range(int(13620492e11), int(13620492e11) + ef_rate)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does the number 13620492e11 come from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a random datetime I picked... 2013-02-28T12:00:00 : D.... Actually event_time_zero was not used in this dummy workflow. Maybe it should be...

tests/prototypes/random_data_providers.py Outdated Show resolved Hide resolved
return ReducedData(
binned.transform_coords(
['L', 'wavelength'],
graph={
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the same graph as on L49? Can we avoid the duplication?
Maybe this requires c_a, c_b, and c_c to also be part of the graph, but maybe that is a good thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes probably...?
so the idea was to keep workflow_script from L36 to L58 to be a through workflow script from the input to the output,
and the rest is granular steps of that workflow, probably run by sciline later.
So they are meant to be duplicated.
But the graph might be separated and re-used I agree.

Comment on lines 75 to 77
c_a: ConstantA = default_c_a,
c_b: ConstantB = default_c_b,
c_c: ConstantC = default_c_c,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about having default values here?

@YooSunYoung
Copy link
Member Author

It would be nice to get this up and running soon, because there is a lot of code that pertains to Kafka that is currently not being automatically tested (we rely on the contributors to run those tests locally, which is quite dangerous).

Yes I agree.
I briefly searched how to run an integration test with kafka stream, and it might be just as simple as setting a secret in the github repository to access to a running kafka cluster for testing.
For now, it's always being used for benchmark so I will not spend too much time on it. I opened an issue #97 though...!

graph = provide_coord_transform_graph()

transformed = binned.transform_coords(['L', 'wavelength'], graph=graph)
return transformed.hist(wavelength=histogram_bin_size).sum('L')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instrument like DREAM we need thousands of bins. This will run out of memory, instead we will have to use something like

return transformed.bins.concat('L').hist(wavelength=histogram_bin_size)

'frame_offset': lambda event_time_zero: event_time_zero - first_pulse_time,
'time_offset_pivot': time_offset_pivot,
'tof': tof_from_time_offset,
'wavelength': wavelength_from_tof,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SimonHeybrock I added converting event_time_offset, event_time_zero to tof in this graph.
Or should I just use scippneutron ...?

Haven't run the whole set of benchmarks yet though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we absolutely should use scippneutron. If we start replicating logic here we will run into big problems, since things will never be in sync.

def process_first_data(self, data: Events) -> None:
sample_event = data[0]
first_pulse_time = sample_event.coords['event_time_zero'][0]
self.workflow.providers[FirstPulseTime] = lambda: first_pulse_time
Copy link
Member Author

@YooSunYoung YooSunYoung Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataReductionApp now has separate process_first_data method so that it can retrieve the FirstPulseTime (which is the event_time_zero of the first incoming data in this case).

And then it updates the workflow providers to return the retrieved value.

Copy link
Member

@SimonHeybrock SimonHeybrock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly looked at the places where data is touched/transformed, I believe @nvaytet took care of reviewing the other code?

from streaming_data_types.eventdata_ev44 import deserialise_ev44

data = deserialise_ev44(self.merge_bytes(data_list))
event_zeros = np.full(len(data.pixel_id), data.reference_time[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializing a full numpy array and copying into a variable is a bit inefficient, should directly init a scipp variable. Use scipp.full?

'frame_offset': lambda event_time_zero: event_time_zero - first_pulse_time,
'time_offset_pivot': time_offset_pivot,
'tof': tof_from_time_offset,
'wavelength': wavelength_from_tof,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we absolutely should use scippneutron. If we start replicating logic here we will run into big problems, since things will never be in sync.

frame_rate: FrameRate,
) -> Histogrammed:
merged = sc.concat(da_list, dim='event')
pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used for every processed chunk, can we avoid recreating it all the time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is for processing all data at once, not per chunk.
But as it seems confusing, I just deleted it...!

pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels)
binned = merged.group(pixel_ids)

graph = provide_coord_transform_graph(frame_rate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this recreated every time?


def provide_workflow(
num_pixels: NumPixels, histogram_binsize: HistogramBinSize, frame_rate: FrameRate
) -> Workflow:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the code here seems to be duplicating what is in workflow_script above. Are both used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it was more for showing what it is doing.
I thought it was useful since this workflow doesn't have a nice documentation with graphs... but maybe it is more confusing than useful. I'll remove it.

Base automatically changed from provider-arguments-hashable to main October 31, 2023 09:11
Copy link
Member Author

@YooSunYoung YooSunYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is what you meant... @SimonHeybrock but I replaced the existing graph with frame unwrapping helper from scippneutron package.


def provide_workflow(
num_pixels: NumPixels, histogram_binsize: HistogramBinSize, frame_rate: FrameRate
) -> Workflow:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it was more for showing what it is doing.
I thought it was useful since this workflow doesn't have a nice documentation with graphs... but maybe it is more confusing than useful. I'll remove it.

frame_rate: FrameRate,
) -> Histogrammed:
merged = sc.concat(da_list, dim='event')
pixel_ids = sc.arange(dim='pixel_id', start=0, stop=num_pixels)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is for processing all data at once, not per chunk.
But as it seems confusing, I just deleted it...!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants