14
14
15
15
from __future__ import annotations
16
16
17
- import base64
17
+ import json
18
18
import threading
19
19
import time
20
20
from queue import Queue
21
21
from typing import TYPE_CHECKING
22
22
23
+ from google .protobuf import json_format
23
24
from opentelemetry ._logs .severity import SeverityNumber
24
25
from opentelemetry .exporter .otlp .proto .common .trace_encoder import encode_spans
25
26
from opentelemetry .proto .trace .v1 import trace_pb2
@@ -113,15 +114,32 @@ def get_heartbeat_attributes(self) -> dict[str, str]:
113
114
return {
114
115
"partial.event" : "heartbeat" ,
115
116
"partial.frequency" : str (self .heartbeat_interval_millis ) + "ms" ,
117
+ "partial.body.type" : "json/v1" ,
116
118
}
117
119
118
120
def get_log_data (self , span : Span , attributes : dict [str , str ]) -> LogData :
119
- span_context = Span .get_span_context (span )
121
+ instrumentation_scope = span .instrumentation_scope if hasattr (span ,
122
+ "instrumentation_scope" ) else None
123
+ span_context = span .get_span_context ()
120
124
121
125
enc_spans = encode_spans ([span ]).resource_spans
122
126
traces_data = trace_pb2 .TracesData ()
123
127
traces_data .resource_spans .extend (enc_spans )
124
- serialized_traces_data = traces_data .SerializeToString ()
128
+ serialized_traces_data = json_format .MessageToJson (traces_data )
129
+
130
+ # FIXME/HACK replace serialized traceId and spanId values as string comparison
131
+ # possible issue is when there are multiple spans in the same trace.
132
+ # currently that should not be the case.
133
+ # trace_id and span_id are stored as int.
134
+ # when serializing it gets serialized to bytes.
135
+ # that is not inline with partial collector.
136
+ traces_dict = json .loads (serialized_traces_data )
137
+ for resource_span in traces_dict .get ("resourceSpans" , []):
138
+ for scope_span in resource_span .get ("scopeSpans" , []):
139
+ for span in scope_span .get ("spans" , []):
140
+ span ["traceId" ] = hex (span_context .trace_id )[2 :]
141
+ span ["spanId" ] = hex (span_context .span_id )[2 :]
142
+ serialized_traces_data = json .dumps (traces_dict , separators = (',' , ':' ))
125
143
126
144
log_record = LogRecord (
127
145
timestamp = time .time_ns (),
@@ -131,16 +149,17 @@ def get_log_data(self, span: Span, attributes: dict[str, str]) -> LogData:
131
149
trace_flags = TraceFlags ().get_default (),
132
150
severity_text = "INFO" ,
133
151
severity_number = SeverityNumber .INFO ,
134
- body = base64 . b64encode ( serialized_traces_data ). decode ( "utf-8" ) ,
152
+ body = serialized_traces_data ,
135
153
resource = self .resource ,
136
154
attributes = attributes ,
137
155
)
138
156
return LogData (
139
- log_record = log_record , instrumentation_scope = span . instrumentation_scope ,
157
+ log_record = log_record , instrumentation_scope = instrumentation_scope ,
140
158
)
141
159
142
160
143
161
def get_stop_attributes () -> dict [str , str ]:
144
162
return {
145
163
"partial.event" : "stop" ,
164
+ "partial.body.type" : "json/v1" ,
146
165
}
0 commit comments