Skip to content

Replace trace serialization from proto to json #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "partial_span_processor"
version = "0.0.6"
version = "0.0.7"
authors = [
{ name = "Mladjan Gadzic", email = "[email protected]" }
]
Expand Down
29 changes: 24 additions & 5 deletions src/partial_span_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

from __future__ import annotations

import base64
import json
import threading
import time
from queue import Queue
from typing import TYPE_CHECKING

from google.protobuf import json_format
from opentelemetry._logs.severity import SeverityNumber
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
from opentelemetry.proto.trace.v1 import trace_pb2
Expand Down Expand Up @@ -113,15 +114,32 @@ def get_heartbeat_attributes(self) -> dict[str, str]:
return {
"partial.event": "heartbeat",
"partial.frequency": str(self.heartbeat_interval_millis) + "ms",
"partial.body.type": "json/v1",
}

def get_log_data(self, span: Span, attributes: dict[str, str]) -> LogData:
span_context = Span.get_span_context(span)
instrumentation_scope = span.instrumentation_scope if hasattr(span,
"instrumentation_scope") else None
span_context = span.get_span_context()

enc_spans = encode_spans([span]).resource_spans
traces_data = trace_pb2.TracesData()
traces_data.resource_spans.extend(enc_spans)
serialized_traces_data = traces_data.SerializeToString()
serialized_traces_data = json_format.MessageToJson(traces_data)

# FIXME/HACK replace serialized traceId and spanId values as string comparison
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not the happiest solution but built-in json serialization does not serialize trace_id and span_id from span according to partial collector expectations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still, i prefer this over manual serialization where a lot of things can be missed/go wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a test for a hack seems silly to me since this is using built-in json serialization.

# possible issue is when there are multiple spans in the same trace.
# currently that should not be the case.
# trace_id and span_id are stored as int.
# when serializing it gets serialized to bytes.
# that is not inline with partial collector.
traces_dict = json.loads(serialized_traces_data)
for resource_span in traces_dict.get("resourceSpans", []):
for scope_span in resource_span.get("scopeSpans", []):
for span in scope_span.get("spans", []):
span["traceId"] = hex(span_context.trace_id)[2:]
span["spanId"] = hex(span_context.span_id)[2:]
serialized_traces_data = json.dumps(traces_dict, separators=(',', ':'))

log_record = LogRecord(
timestamp=time.time_ns(),
Expand All @@ -131,16 +149,17 @@ def get_log_data(self, span: Span, attributes: dict[str, str]) -> LogData:
trace_flags=TraceFlags().get_default(),
severity_text="INFO",
severity_number=SeverityNumber.INFO,
body=base64.b64encode(serialized_traces_data).decode("utf-8"),
body=serialized_traces_data,
resource=self.resource,
attributes=attributes,
)
return LogData(
log_record=log_record, instrumentation_scope=span.instrumentation_scope,
log_record=log_record, instrumentation_scope=instrumentation_scope,
)


def get_stop_attributes() -> dict[str, str]:
return {
"partial.event": "stop",
"partial.body.type": "json/v1",
}