Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 121 additions & 50 deletions src/sentry/replays/lib/summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@

from sentry import nodestore
from sentry.constants import ObjectStatus
from sentry.issues.grouptype import FeedbackGroup
from sentry.models.project import Project
from sentry.replays.query import query_trace_connected_events
from sentry.replays.usecases.ingest.event_parser import EventType
from sentry.replays.usecases.ingest.event_parser import (
get_timestamp_ms as get_replay_event_timestamp_ms,
)
from sentry.replays.usecases.ingest.event_parser import parse_network_content_lengths, which
from sentry.search.events.builder.discover import DiscoverQueryBuilder
from sentry.search.events.types import QueryBuilderConfig, SnubaParams
from sentry.search.events.types import SnubaParams
from sentry.services.eventstore.models import Event
from sentry.snuba.dataset import Dataset
from sentry.snuba.referrer import Referrer
from sentry.utils import json
from sentry.utils.snuba import bulk_snuba_queries

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,63 +89,115 @@ def fetch_trace_connected_errors(
Project.objects.filter(organization=project.organization, status=ObjectStatus.ACTIVE)
)

queries = []
for trace_id in trace_ids:
snuba_params = SnubaParams(
projects=org_projects,
start=start,
end=end,
organization=project.organization,
)

# Generate a query for each trace ID. This will be executed in bulk.
error_query = DiscoverQueryBuilder(
Dataset.Events,
params={},
snuba_params=snuba_params,
query=f"trace:{trace_id}",
selected_columns=[
"id",
"timestamp_ms",
"timestamp",
"title",
"message",
],
orderby=["id"],
limit=100,
config=QueryBuilderConfig(
auto_fields=False,
),
)
queries.append(error_query)
snuba_params = SnubaParams(
projects=org_projects,
start=start,
end=end,
organization=project.organization,
)

if not queries:
return []
trace_ids_query = " OR ".join([f"trace:{trace_id}" for trace_id in trace_ids])

# Query for errors dataset
error_query = query_trace_connected_events(
dataset_label="errors",
selected_columns=[
"id",
"timestamp_ms",
"timestamp",
"title",
"message",
],
query=trace_ids_query,
snuba_params=snuba_params,
orderby=["id"],
limit=100,
referrer=Referrer.API_REPLAY_SUMMARIZE_BREADCRUMBS.value,
)

# Execute all queries
results = bulk_snuba_queries(
[query.get_snql_query() for query in queries],
# Query for issuePlatform dataset
issue_query = query_trace_connected_events(
dataset_label="issuePlatform",
selected_columns=[
"event_id",
"title",
"subtitle",
"timestamp",
"occurrence_type_id",
],
query=trace_ids_query,
snuba_params=snuba_params,
orderby=["event_id"],
limit=100,
referrer=Referrer.API_REPLAY_SUMMARIZE_BREADCRUMBS.value,
)

if not (error_query or issue_query):
return []

# Process results and convert to EventDict objects
error_events = []
for result, query in zip(results, queries):
error_data = query.process_results(result)["data"]
seen_event_ids = set() # Track seen event IDs to avoid duplicates

# Process error query results
if error_query and "data" in error_query:
for event in error_query["data"]:
event_id = event.get("id")

# Skip if we've already seen this event
if event_id in seen_event_ids:
continue

seen_event_ids.add(event_id)

for event in error_data:
timestamp = _parse_iso_timestamp_to_ms(
event.get("timestamp_ms")
) or _parse_iso_timestamp_to_ms(event.get("timestamp"))
message = event.get("message", "")

if timestamp:
error_events.append(
EventDict(
category="error",
id=event["id"],
id=event_id,
title=event.get("title", ""),
timestamp=timestamp,
message=event.get("message", ""),
message=message,
)
)

# Process issuePlatform query results
if issue_query and "data" in issue_query:
for event in issue_query["data"]:
event_id = event.get("event_id")

# Skip if we've already seen this event
if event_id in seen_event_ids:
continue

seen_event_ids.add(event_id)

timestamp = _parse_iso_timestamp_to_ms(event.get("timestamp"))
message = event.get("subtitle", "") or event.get("message", "")

if event.get("occurrence_type_id") == FeedbackGroup.type_id:
category = "feedback"
else:
category = "error"

# NOTE: The issuePlatform dataset query can return feedback.
# We also fetch feedback from nodestore in fetch_feedback_details
# for feedback breadcrumbs.
# We avoid creating duplicate feedback logs
# by filtering for unique feedback IDs during log generation.
if timestamp:
error_events.append(
EventDict(
category=category,
id=event_id,
title=event.get("title", ""),
timestamp=timestamp,
message=message,
)
)

Expand Down Expand Up @@ -207,7 +258,7 @@ def get_summary_logs(
error_events: list[EventDict],
project_id: int,
) -> list[str]:
# Sort error events by timestamp
# Sort error events by timestamp. This list includes all feedback events still.
error_events.sort(key=lambda x: x["timestamp"])
return list(generate_summary_logs(segment_data, error_events, project_id))

Expand All @@ -217,8 +268,12 @@ def generate_summary_logs(
error_events: list[EventDict],
project_id,
) -> Generator[str]:
"""Generate log messages from events and errors in chronological order."""
"""
Generate log messages from events and errors in chronological order.
Avoid processing duplicate feedback events.
"""
error_idx = 0
seen_feedback_ids = set()

# Process segments
for _, segment in segment_data:
Expand All @@ -232,23 +287,39 @@ def generate_summary_logs(
error_idx < len(error_events) and error_events[error_idx]["timestamp"] < timestamp
):
error = error_events[error_idx]
yield generate_error_log_message(error)

if error["category"] == "error":
yield generate_error_log_message(error)
elif error["category"] == "feedback":
seen_feedback_ids.add(error["id"])
yield generate_feedback_log_message(error)

error_idx += 1

# Yield the current event's log message
if event_type == EventType.FEEDBACK:
feedback_id = event["data"]["payload"].get("data", {}).get("feedbackId")
feedback = fetch_feedback_details(feedback_id, project_id)
if feedback:
yield generate_feedback_log_message(feedback)
# Filter out duplicate feedback events.
if feedback_id not in seen_feedback_ids:
seen_feedback_ids.add(feedback_id)
feedback = fetch_feedback_details(feedback_id, project_id)

if feedback:
yield generate_feedback_log_message(feedback)

elif message := as_log_message(event):
yield message

# Yield any remaining error messages
while error_idx < len(error_events):
error = error_events[error_idx]
yield generate_error_log_message(error)

if error["category"] == "error":
yield generate_error_log_message(error)
elif error["category"] == "feedback":
seen_feedback_ids.add(error["id"])
yield generate_feedback_log_message(error)

error_idx += 1


Expand Down
56 changes: 55 additions & 1 deletion src/sentry/replays/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from collections.abc import Generator, Sequence
from datetime import datetime
from typing import Any
from typing import Any, Literal

from snuba_sdk import (
Column,
Expand Down Expand Up @@ -34,6 +34,8 @@
make_full_aggregation_query,
query_using_optimized_search,
)
from sentry.search.events.types import SnubaParams
from sentry.snuba.utils import get_dataset
from sentry.utils.snuba import raw_snql_query

MAX_PAGE_SIZE = 100
Expand Down Expand Up @@ -902,3 +904,55 @@ def compute_has_viewed(viewed_by_id: int | None) -> Function:
],
alias="has_viewed",
)


def query_trace_connected_events(
dataset_label: Literal["errors", "issuePlatform", "discover"],
selected_columns: list[str],
query: str | None,
snuba_params: SnubaParams,
equations: list[str] | None = None,
orderby: list[str] | None = None,
offset: int = 0,
limit: int = 10,
referrer: str = "api.replay.details-page",
) -> dict[str, Any]:
"""
Query for trace-connected events, with a reusable query configuration for replays.

Args:
dataset: The Snuba dataset to query against
selected_columns: List of columns to select
query: Optional query string
snuba_params: Snuba parameters including project IDs, time range, etc.
equations: Optional list of equations
orderby: Optional ordering specification
offset: Pagination offset
limit: Pagination limit
referrer: Referrer string for tracking

Returns:
Query result from the dataset
"""
query_details = {
"selected_columns": selected_columns,
"query": query,
"snuba_params": snuba_params,
"equations": equations,
"orderby": orderby,
"offset": offset,
"limit": limit,
"referrer": referrer,
"auto_fields": True,
"auto_aggregations": True,
"use_aggregate_conditions": True,
"allow_metric_aggregates": False,
"transform_alias_to_input_format": True,
}

dataset = get_dataset(dataset_label)

if dataset is None:
raise ValueError(f"Unknown dataset: {dataset_label}")

return dataset.query(**query_details)
Loading
Loading