12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ from __future__ import annotations
16
+
15
17
import base64
16
18
import threading
17
19
import time
18
20
from queue import Queue
19
- from typing import Optional
21
+ from typing import TYPE_CHECKING
20
22
21
- from opentelemetry import context as context_api
22
23
from opentelemetry ._logs .severity import SeverityNumber
23
24
from opentelemetry .exporter .otlp .proto .common .trace_encoder import encode_spans
24
25
from opentelemetry .proto .trace .v1 import trace_pb2
25
26
from opentelemetry .sdk ._logs import LogData , LogRecord
26
- from opentelemetry .sdk ._logs .export import LogExporter
27
- from opentelemetry .sdk .trace import (
28
- SpanProcessor ,
29
- Span ,
30
- ReadableSpan
31
- )
27
+ from opentelemetry .sdk .trace import ReadableSpan , Span , SpanProcessor
32
28
from opentelemetry .trace import TraceFlags
33
29
30
+ if TYPE_CHECKING :
31
+ from opentelemetry import context as context_api
32
+ from opentelemetry .sdk ._logs .export import LogExporter
33
+ from opentelemetry .sdk .resources import Resource
34
+
34
35
WORKER_THREAD_NAME = "OtelPartialSpanProcessor"
35
36
36
37
@@ -39,12 +40,15 @@ class PartialSpanProcessor(SpanProcessor):
39
40
def __init__ (
40
41
self ,
41
42
log_exporter : LogExporter ,
42
- heartbeat_interval_millis : int
43
- ):
43
+ heartbeat_interval_millis : int ,
44
+ resource : Resource | None = None ,
45
+ ) -> None :
44
46
if heartbeat_interval_millis <= 0 :
45
- raise ValueError ("heartbeat_interval_ms must be greater than 0" )
47
+ msg = "heartbeat_interval_ms must be greater than 0"
48
+ raise ValueError (msg )
46
49
self .log_exporter = log_exporter
47
50
self .heartbeat_interval_millis = heartbeat_interval_millis
51
+ self .resource = resource
48
52
49
53
self .active_spans = {}
50
54
self .ended_spans = Queue ()
@@ -53,11 +57,11 @@ def __init__(
53
57
self .done = False
54
58
self .condition = threading .Condition (threading .Lock ())
55
59
self .worker_thread = threading .Thread (
56
- name = WORKER_THREAD_NAME , target = self .worker , daemon = True
60
+ name = WORKER_THREAD_NAME , target = self .worker , daemon = True ,
57
61
)
58
62
self .worker_thread .start ()
59
63
60
- def worker (self ):
64
+ def worker (self ) -> None :
61
65
while not self .done :
62
66
with self .condition :
63
67
self .condition .wait (self .heartbeat_interval_millis / 1000 )
@@ -73,17 +77,17 @@ def worker(self):
73
77
74
78
self .heartbeat ()
75
79
76
- def heartbeat (self ):
80
+ def heartbeat (self ) -> None :
77
81
with self .lock :
78
- for span_key , span in list (self .active_spans .items ()):
82
+ for span in list (self .active_spans .values ()):
79
83
attributes = self .get_heartbeat_attributes ()
80
- log_data = get_log_data (span , attributes )
84
+ log_data = self . get_log_data (span , attributes )
81
85
self .log_exporter .export ([log_data ])
82
86
83
- def on_start (self , span : " Span" ,
84
- parent_context : Optional [ context_api .Context ] = None ) -> None :
87
+ def on_start (self , span : Span ,
88
+ parent_context : context_api .Context | None = None ) -> None :
85
89
attributes = self .get_heartbeat_attributes ()
86
- log_data = get_log_data (span , attributes )
90
+ log_data = self . get_log_data (span , attributes )
87
91
self .log_exporter .export ([log_data ])
88
92
89
93
span_key = (span .context .trace_id , span .context .span_id )
@@ -92,7 +96,7 @@ def on_start(self, span: "Span",
92
96
93
97
def on_end (self , span : ReadableSpan ) -> None :
94
98
attributes = get_stop_attributes ()
95
- log_data = get_log_data (span , attributes )
99
+ log_data = self . get_log_data (span , attributes )
96
100
self .log_exporter .export ([log_data ])
97
101
98
102
span_key = (span .context .trace_id , span .context .span_id )
@@ -105,39 +109,38 @@ def shutdown(self) -> None:
105
109
self .condition .notify_all ()
106
110
self .worker_thread .join ()
107
111
108
- def get_heartbeat_attributes (self ):
112
+ def get_heartbeat_attributes (self ) -> dict [ str , str ] :
109
113
return {
110
114
"partial.event" : "heartbeat" ,
111
115
"partial.frequency" : str (self .heartbeat_interval_millis ) + "ms" ,
112
116
}
113
117
118
+ def get_log_data (self , span : Span , attributes : dict [str , str ]) -> LogData :
119
+ span_context = Span .get_span_context (span )
120
+
121
+ enc_spans = encode_spans ([span ]).resource_spans
122
+ traces_data = trace_pb2 .TracesData ()
123
+ traces_data .resource_spans .extend (enc_spans )
124
+ serialized_traces_data = traces_data .SerializeToString ()
125
+
126
+ log_record = LogRecord (
127
+ timestamp = time .time_ns (),
128
+ observed_timestamp = time .time_ns (),
129
+ trace_id = span_context .trace_id ,
130
+ span_id = span_context .span_id ,
131
+ trace_flags = TraceFlags ().get_default (),
132
+ severity_text = "INFO" ,
133
+ severity_number = SeverityNumber .INFO ,
134
+ body = base64 .b64encode (serialized_traces_data ).decode ("utf-8" ),
135
+ resource = self .resource ,
136
+ attributes = attributes ,
137
+ )
138
+ return LogData (
139
+ log_record = log_record , instrumentation_scope = span .instrumentation_scope ,
140
+ )
114
141
115
- def get_stop_attributes ():
142
+
143
+ def get_stop_attributes () -> dict [str , str ]:
116
144
return {
117
145
"partial.event" : "stop" ,
118
146
}
119
-
120
-
121
- def get_log_data (span , attributes ):
122
- span_context = Span .get_span_context (span )
123
-
124
- enc_spans = encode_spans ([span ]).resource_spans
125
- traces_data = trace_pb2 .TracesData ()
126
- traces_data .resource_spans .extend (enc_spans )
127
- serialized_traces_data = traces_data .SerializeToString ()
128
-
129
- log_record = LogRecord (
130
- timestamp = time .time_ns (),
131
- observed_timestamp = time .time_ns (),
132
- trace_id = span_context .trace_id ,
133
- span_id = span_context .span_id ,
134
- trace_flags = TraceFlags ().get_default (),
135
- severity_text = "INFO" ,
136
- severity_number = SeverityNumber .INFO ,
137
- body = base64 .b64encode (serialized_traces_data ).decode ('utf-8' ),
138
- attributes = attributes ,
139
- )
140
- log_data = LogData (
141
- log_record = log_record , instrumentation_scope = span .instrumentation_scope
142
- )
143
- return log_data
0 commit comments