Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,17 @@
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
# Trace sampling rates for replay summary endpoint.
register(
"replay.endpoints.project_replay_summary.trace_sample_rate_post",
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"replay.endpoints.project_replay_summary.trace_sample_rate_get",
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# User Feedback Options
register(
Expand Down
204 changes: 124 additions & 80 deletions src/sentry/replays/endpoints/project_replay_summary.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from typing import Any

import sentry_sdk
from django.conf import settings
from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import features
from sentry import features, options
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
Expand All @@ -24,7 +25,7 @@
)
from sentry.seer.seer_setup import has_seer_access
from sentry.seer.signed_seer_api import make_signed_seer_api_request
from sentry.utils import json
from sentry.utils import json, metrics

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,9 +63,15 @@ class ProjectReplaySummaryEndpoint(ProjectEndpoint):
}
permission_classes = (ReplaySummaryPermission,)

def __init__(self, **options) -> None:
def __init__(self, **kw) -> None:
storage.initialize_client()
super().__init__(**options)
self.sample_rate_post = options.get(
"replay.endpoints.project_replay_summary.trace_sample_rate_post"
)
self.sample_rate_get = options.get(
"replay.endpoints.project_replay_summary.trace_sample_rate_get"
)
super().__init__(**kw)

def make_seer_request(self, path: str, post_body: dict[str, Any]) -> Response:
"""Make a POST request to a Seer endpoint. Raises HTTPError and logs non-200 status codes."""
Expand Down Expand Up @@ -133,91 +140,128 @@ def has_replay_summary_access(self, project: Project, request: Request) -> bool:

def get(self, request: Request, project: Project, replay_id: str) -> Response:
"""Poll for the status of a replay summary task in Seer."""
if not self.has_replay_summary_access(project, request):
return self.respond(
{"detail": "Replay summaries are not available for this organization."}, status=403
)

# We skip checking Seer permissions here for performance, and because summaries can't be created without them anyway.
with sentry_sdk.start_transaction(
name="replays.endpoints.project_replay_summary.get",
op="replays.endpoints.project_replay_summary.get",
custom_sampling_context={"sample_rate": self.sample_rate_get},
):

# Request Seer for the state of the summary task.
return self.make_seer_request(
SEER_POLL_STATE_ENDPOINT_PATH,
{
"replay_id": replay_id,
},
)
if not self.has_replay_summary_access(project, request):
return self.respond(
{"detail": "Replay summaries are not available for this organization."},
status=403,
)

# We skip checking Seer permissions here for performance, and because summaries can't be created without them anyway.

# Request Seer for the state of the summary task.
return self.make_seer_request(
SEER_POLL_STATE_ENDPOINT_PATH,
{
"replay_id": replay_id,
},
)

def post(self, request: Request, project: Project, replay_id: str) -> Response:
"""Download replay segment data and parse it into logs. Then post to Seer to start a summary task."""
if not self.has_replay_summary_access(project, request):
return self.respond(
{"detail": "Replay summaries are not available for this organization."}, status=403

with sentry_sdk.start_transaction(
name="replays.endpoints.project_replay_summary.post",
op="replays.endpoints.project_replay_summary.post",
custom_sampling_context={"sample_rate": self.sample_rate_post},
):

if not self.has_replay_summary_access(project, request):
return self.respond(
{"detail": "Replay summaries are not available for this organization."},
status=403,
)

filter_params = self.get_filter_params(request, project)
num_segments = request.data.get("num_segments", 0)
temperature = request.data.get("temperature", None)

# Limit data with the frontend's segment count, to keep summaries consistent with the video displayed in the UI.
# While the replay is live, the FE and BE may have different counts.
if num_segments > MAX_SEGMENTS_TO_SUMMARIZE:
logger.warning(
"Replay Summary: hit max segment limit.",
extra={
"replay_id": replay_id,
"project_id": project.id,
"organization_id": project.organization.id,
"segment_limit": MAX_SEGMENTS_TO_SUMMARIZE,
},
)
num_segments = MAX_SEGMENTS_TO_SUMMARIZE

# Fetch the replay's error and trace IDs from the replay_id.
snuba_response = query_replay_instance(
project_id=project.id,
replay_id=replay_id,
start=filter_params["start"],
end=filter_params["end"],
organization=project.organization,
request_user_id=request.user.id,
)
processed_response = process_raw_response(
snuba_response,
fields=request.query_params.getlist("field"),
)
error_ids = processed_response[0].get("error_ids", []) if processed_response else []
trace_ids = processed_response[0].get("trace_ids", []) if processed_response else []

filter_params = self.get_filter_params(request, project)
num_segments = request.data.get("num_segments", 0)
temperature = request.data.get("temperature", None)
# Fetch same-trace errors.
trace_connected_errors = fetch_trace_connected_errors(
project=project,
trace_ids=trace_ids,
start=filter_params["start"],
end=filter_params["end"],
limit=100,
)
trace_connected_error_ids = {x["id"] for x in trace_connected_errors}

# Limit data with the frontend's segment count, to keep summaries consistent with the video displayed in the UI.
# While the replay is live, the FE and BE may have different counts.
if num_segments > MAX_SEGMENTS_TO_SUMMARIZE:
logger.warning(
"Replay Summary: hit max segment limit.",
extra={
# Fetch directly linked errors, if they weren't returned by the trace query.
replay_errors = fetch_error_details(
project_id=project.id,
error_ids=[x for x in error_ids if x not in trace_connected_error_ids],
)

error_events = replay_errors + trace_connected_errors

metrics.distribution(
"replays.endpoints.project_replay_summary.direct_errors",
value=len(replay_errors),
)
metrics.distribution(
"replays.endpoints.project_replay_summary.trace_connected_errors",
value=len(trace_connected_errors),
)
metrics.distribution(
"replays.endpoints.project_replay_summary.num_trace_ids",
value=len(trace_ids),
)

# Download segment data.
# XXX: For now this is capped to 100 and blocking. DD shows no replays with >25 segments, but we should still stress test and figure out how to deal with large replays.
segment_md = fetch_segments_metadata(project.id, replay_id, 0, num_segments)
segment_data = iter_segment_data(segment_md)

# Combine replay and error data and parse into logs.
logs = get_summary_logs(segment_data, error_events, project.id)

# Post to Seer to start a summary task.
# XXX: Request isn't streaming. Limitation of Seer authentication. Would be much faster if we
# could stream the request data since the GCS download will (likely) dominate latency.
return self.make_seer_request(
SEER_START_TASK_ENDPOINT_PATH,
{
"logs": logs,
"num_segments": num_segments,
"replay_id": replay_id,
"project_id": project.id,
"organization_id": project.organization.id,
"segment_limit": MAX_SEGMENTS_TO_SUMMARIZE,
"project_id": project.id,
"temperature": temperature,
},
)
num_segments = MAX_SEGMENTS_TO_SUMMARIZE

# Fetch the replay's error and trace IDs from the replay_id.
snuba_response = query_replay_instance(
project_id=project.id,
replay_id=replay_id,
start=filter_params["start"],
end=filter_params["end"],
organization=project.organization,
request_user_id=request.user.id,
)
processed_response = process_raw_response(
snuba_response,
fields=request.query_params.getlist("field"),
)
error_ids = processed_response[0].get("error_ids", []) if processed_response else []
trace_ids = processed_response[0].get("trace_ids", []) if processed_response else []

# Fetch error details.
replay_errors = fetch_error_details(project_id=project.id, error_ids=error_ids)
trace_connected_errors = fetch_trace_connected_errors(
project=project,
trace_ids=trace_ids,
start=filter_params["start"],
end=filter_params["end"],
)
error_events = replay_errors + trace_connected_errors

# Download segment data.
# XXX: For now this is capped to 100 and blocking. DD shows no replays with >25 segments, but we should still stress test and figure out how to deal with large replays.
segment_md = fetch_segments_metadata(project.id, replay_id, 0, num_segments)
segment_data = iter_segment_data(segment_md)

# Combine replay and error data and parse into logs.
logs = get_summary_logs(segment_data, error_events, project.id)

# Post to Seer to start a summary task.
# XXX: Request isn't streaming. Limitation of Seer authentication. Would be much faster if we
# could stream the request data since the GCS download will (likely) dominate latency.
return self.make_seer_request(
SEER_START_TASK_ENDPOINT_PATH,
{
"logs": logs,
"num_segments": num_segments,
"replay_id": replay_id,
"organization_id": project.organization.id,
"project_id": project.id,
"temperature": temperature,
},
)
56 changes: 55 additions & 1 deletion src/sentry/replays/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from collections.abc import Generator, Sequence
from datetime import datetime
from typing import Any
from typing import Any, Literal

from snuba_sdk import (
Column,
Expand Down Expand Up @@ -34,6 +34,8 @@
make_full_aggregation_query,
query_using_optimized_search,
)
from sentry.search.events.types import SnubaParams
from sentry.snuba.utils import get_dataset
from sentry.utils.snuba import raw_snql_query

MAX_PAGE_SIZE = 100
Expand Down Expand Up @@ -902,3 +904,55 @@ def compute_has_viewed(viewed_by_id: int | None) -> Function:
],
alias="has_viewed",
)


def query_trace_connected_events(
dataset_label: Literal["errors", "issuePlatform", "discover"],
selected_columns: list[str],
query: str | None,
snuba_params: SnubaParams,
equations: list[str] | None = None,
orderby: list[str] | None = None,
offset: int = 0,
limit: int = 10,
referrer: str = "api.replay.details-page",
) -> dict[str, Any]:
"""
Query for trace-connected events, with a reusable query configuration for replays.

Args:
dataset: The Snuba dataset to query against
selected_columns: List of columns to select
query: Optional query string
snuba_params: Snuba parameters including project IDs, time range, etc.
equations: Optional list of equations
orderby: Optional ordering specification
offset: Pagination offset
limit: Pagination limit
referrer: Referrer string for tracking

Returns:
Query result from the dataset
"""
query_details = {
"selected_columns": selected_columns,
"query": query,
"snuba_params": snuba_params,
"equations": equations,
"orderby": orderby,
"offset": offset,
"limit": limit,
"referrer": referrer,
"auto_fields": True,
"auto_aggregations": True,
"use_aggregate_conditions": True,
"allow_metric_aggregates": False,
"transform_alias_to_input_format": True,
}

dataset = get_dataset(dataset_label)

if dataset is None:
raise ValueError(f"Unknown dataset: {dataset_label}")

return dataset.query(**query_details)
Loading
Loading