Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3909 single error alerts are not sent #1764

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from typing import DefaultDict, Dict, List, Optional, Union

from alive_progress import alive_it
from alive_progress import alive_bar

from elementary.config.config import Config
from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts
Expand Down Expand Up @@ -251,27 +251,29 @@ def _send_alerts(
self.execution_properties["sent_alert_count"] = self.sent_alert_count
return

alerts_with_progress_bar = alive_it(alerts, title="Sending alerts")
sent_successfully_alerts = []
for alert, sent_successfully in self.alerts_integration.send_alerts(
alerts_with_progress_bar, self.config.group_alerts_threshold
):
if sent_successfully:
if isinstance(alert, BaseAlertsGroup):
sent_successfully_alerts.extend(alert.alerts)

with alive_bar(len(alerts), title="Sending alerts") as bar:
for alert, sent_successfully in self.alerts_integration.send_alerts(
alerts, self.config.group_alerts_threshold
):
bar()
if sent_successfully:
if isinstance(alert, BaseAlertsGroup):
sent_successfully_alerts.extend(alert.alerts)
else:
sent_successfully_alerts.append(alert)
else:
sent_successfully_alerts.append(alert)
else:
if isinstance(alert, BaseAlertsGroup):
for inner_alert in alert.alerts:
if isinstance(alert, BaseAlertsGroup):
for inner_alert in alert.alerts:
logger.error(
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"
)
else:
logger.error(
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"
f"Could not send the alert - {alert.id}. Full alert: {json.dumps(alert.data)}"
)
else:
logger.error(
f"Could not send the alert - {alert.id}. Full alert: {json.dumps(alert.data)}"
)
self.success = False
self.success = False

# Now update as sent:
self.sent_alert_count = len(sent_successfully_alerts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ def send_alert(
) -> bool:
raise NotImplementedError

@staticmethod
def _group_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
Expand All @@ -129,6 +129,9 @@ def _group_alerts(
AlertsGroup,
]
]:
if not alerts:
return []

flattened_alerts: List[
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
] = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
BaseIntegration,
)


def test_group_alerts():
grouped_alerts = BaseIntegration._group_alerts(alerts=[], threshold=0)
assert len(grouped_alerts) == 0
grouped_alerts = BaseIntegration._group_alerts(alerts=[], threshold=1)
assert len(grouped_alerts) == 0

alerts = [
TestAlertModel(
id="1",
test_unique_id="1",
elementary_unique_id="1",
test_name="1",
severity="WARN",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="1",
alert_class_id="1",
)
]
grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=0)
assert len(grouped_alerts) == 1
assert isinstance(grouped_alerts[0], AlertsGroup)
assert grouped_alerts[0].alerts == alerts
grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=1)
assert len(grouped_alerts) == 1
assert isinstance(grouped_alerts[0], AlertsGroup)
assert grouped_alerts[0].alerts == alerts

alerts = [
TestAlertModel(
id="1",
test_unique_id="1",
elementary_unique_id="1",
test_name="1",
severity="WARN",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="1",
alert_class_id="1",
),
TestAlertModel(
id="2",
test_unique_id="2",
elementary_unique_id="2",
test_name="2",
severity="WARN",
test_type="dbt_test",
test_sub_type="generic",
test_short_name="2",
alert_class_id="2",
),
]
grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=0)
assert len(grouped_alerts) == 1
assert isinstance(grouped_alerts[0], AlertsGroup)
assert grouped_alerts[0].alerts == alerts
grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=2)
assert len(grouped_alerts) == 1
assert isinstance(grouped_alerts[0], AlertsGroup)
assert grouped_alerts[0].alerts == alerts
grouped_alerts = BaseIntegration._group_alerts(alerts=alerts, threshold=10)
assert len(grouped_alerts) == 2
assert grouped_alerts == alerts
Loading