Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model Monitoring Inference Aggregator block #818

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

robiscoding
Copy link
Contributor

@robiscoding robiscoding commented Nov 15, 2024

Description

  • Currently Model Monitoring does not support for inferences made by InferencePipeline. To add support, this Model Monitoring Inference Aggregator block allows users to arbitrarily send an aggregated and consolidated set of inference results to MM at a defined frequency (in seconds)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How has this change been tested, please provide a testcase or example of how you tested the change?

  • Tested using local workflow run when frequency is and is not in range for reporting

Any specific deployment considerations

Docs

  • Docs updated? What were the changes:

description="Reference data to extract property from",
examples=["$steps.my_step.predictions"],
)
frequency: Union[
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered using the RateLimiter block, but this export block would always require that as a dependency. I'm not sure if there's a pattern for making another block required to run, so I just added this to the params for now, open to doing it another way

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name is misleading, as it suggests compaction of predictions to be sent in batches, which does not take place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe something like SamplePredictions, ReportPredictionsSample, ModelMonitoringPredictionsSampler

"model_type": detections.data.get("prediction_type", [""])[i],
}
results.append(formatted_det)
elif isinstance(detections, dict):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What case would predictions be returned as a dict? I saw in other Blocks that predictions will either be of type sv.Detections or dict

Copy link
Collaborator

@PawelPeczek-Roboflow PawelPeczek-Roboflow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have general concern with this PR - what is the quality of data acquired in that way? Is that meaningful monitoring at the end of the day? What do we want to display as the result of running monitoring about a week-long stream? How would people understand that something is wrong with the model?

I mean, we have stream running and we sub sample of predictions based on time, suggesting to update once per few seconds. We do it as this is basically unfeasible to push through the wire all predictions - but maybe - in this scenario it makes sense to compute aggregates of predictions on the client end and submit compacted results once per interval?

def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return BlockManifest

def is_in_reporting_range(self, frequency: int) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move into usage first, then declaration (run(...) method first

def is_in_reporting_range(self, frequency: int) -> bool:
now = datetime.now()
last_report_time_str = self._cache.get(LAST_REPORT_TIME_CACHE_KEY)
print(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove print statements

self,
fire_and_forget: bool,
predictions: Union[sv.Detections, dict],
frequency: int = 3,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaults should not be set here, but in manifest

BLOCK_NAME = "Roboflow Model Monitoring Exporter"


class BlockManifest(WorkflowBlockManifest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
type: Literal[
"roboflow_core/roboflow_model_monitoring_exporter@v1",
BLOCK_NAME,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove name aliasing, this is not needed for new blocks

"error_status": False,
"message": "Not in reporting range, skipping report. (Ok)",
}
if self._api_key is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be validated first - fail fast approach - clearer error handling



# TODO: maybe make this a helper or decorator, it's used in multiple places
def get_workspace_name(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remember about this and if I see re-ocurring pattern of usage, I will move to common inference utils

description="Reference data to extract property from",
examples=["$steps.my_step.predictions"],
)
frequency: Union[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name is misleading, as it suggests compaction of predictions to be sent in batches, which does not take place


def is_in_reporting_range(self, frequency: int) -> bool:
now = datetime.now()
last_report_time_str = self._cache.get(LAST_REPORT_TIME_CACHE_KEY)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global key? how would that work on hosted platform?

@robiscoding
Copy link
Contributor Author

I have general concern with this PR - what is the quality of data acquired in that way? Is that meaningful monitoring at the end of the day? What do we want to display as the result of running monitoring about a week-long stream? How would people understand that something is wrong with the model?

This is particularly meant for inferences made by InferencePipeline in edge deployments. Model Monitoring currently does not get any data from InferencePipeline. This is because of the large volume of requests made by IP would overload our system. So the goal here is not to have comprehensive inference results data or an aggregation of it (that's coming later), but more of a health/status check at a regular interval that IP is running and making inferences. Is there a recommended way of doing this?

I mean, we have stream running and we sub sample of predictions based on time, suggesting to update once per few seconds. We do it as this is basically unfeasible to push through the wire all predictions - but maybe - in this scenario it makes sense to compute aggregates of predictions on the client end and submit compacted results once per interval?

Aggregating is fine, but we were trying to defer doing aggregations until we had a better idea of what type of aggregation makes sense for this use case. I thought the Aggregator block could be used in conjunction with this but maybe not?

@robiscoding robiscoding marked this pull request as draft November 18, 2024 15:42
@PawelPeczek-Roboflow
Copy link
Collaborator

is model monitoring in given form even acceptable to run on video?
I would say we should disable it by default given the performance penalty

@PawelPeczek-Roboflow
Copy link
Collaborator

Today we need to close the topic, otherwise release 0.28.0 label will be removed

@robiscoding robiscoding marked this pull request as ready for review November 21, 2024 13:35
@robiscoding robiscoding changed the title Model monitoring export block Model Monitoring Inference Aggregator block Nov 21, 2024
self._cache = cache
self._background_tasks = background_tasks
self._thread_pool_executor = thread_pool_executor
self._last_report_time_cache_key = "roboflow_model_monitoring_last_report_time"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would have a side effect for all blocks of this type to have shared state of last report time - is that desired end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, the intent was for each instance of the block to have it's own last_report_time. Should I add a random string to the key name to make that happen?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if the expectation is that it works at hosted, I would use redis with the following structure of keys workflows:steps_cache:roboflow_core/model_monitoring_inference_aggregator@v1:{uuid4()}:last_report_time

last_report_time = now
else:
last_report_time = datetime.fromisoformat(last_report_time_str)
time_elapsed = int((now - last_report_time).total_seconds())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type conversion to int seems not to be needed

)


def format_sv_detections_for_model_monitoring(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the name to reflect that sv.Detections and cls results are handled

) -> List[Prediction]:
results = []
if isinstance(detections, sv.Detections):
num_detections = len(detections.data.get("detection_id", []))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an iter in sv.Detections

for detection in detections: would be easier and less error-prone with getting the list of size 1, that seems to fail with strange error if other blocks break the contract

if system_info:
for key, value in system_info.items():
inference_data[key] = value
inference_data["inference_results"] = predictions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a strange feeling it may fail on serialisation - predictions is List of dataclasses which does not get automatically converted into json afaik

LONG_DESCRIPTION = """
This block periodically reports an aggregated sample of inference results to Roboflow Model Monitoring.

It aggregates predictions in memory between reports and then sends a representative sample of predictions at a regular interval specified by the `frequency` parameter.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would elaborate on what we understand by representative - seems like "most confident for given class"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants