@@ -13,11 +13,16 @@ import (
1313
1414 "github.com/go-kit/kit/log"
1515 "github.com/go-kit/kit/log/level"
16+ "github.com/grpc-ecosystem/go-grpc-middleware"
1617 "github.com/pkg/errors"
1718 "github.com/prometheus/client_golang/prometheus"
1819 "github.com/prometheus/client_golang/prometheus/promhttp"
1920 "github.com/prometheus/common/version"
2021 "github.com/thanos-io/thanos/pkg/store/storepb"
22+ "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
23+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
24+ "go.opentelemetry.io/otel"
25+ "go.opentelemetry.io/otel/propagation"
2126 "golang.org/x/net/trace"
2227 "google.golang.org/grpc"
2328
@@ -31,6 +36,9 @@ import (
3136 opentsdb "github.com/G-Research/opentsdb-goclient/client"
3237
3338 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
39+
40+ jaeger_propagator "go.opentelemetry.io/contrib/propagators/jaeger"
41+ jaeger_exporter "go.opentelemetry.io/otel/exporters/trace/jaeger"
3442)
3543
3644func NewConfiguredLogger (format string , logLevel string ) (log.Logger , error ) {
@@ -98,6 +106,29 @@ func (i *multipleStringFlags) Set(value string) error {
98106 return nil
99107}
100108
109+ func initTracer () func () {
110+ flush , err := jaeger_exporter .InstallNewPipeline (
111+ jaeger_exporter .WithCollectorEndpoint ("" ),
112+ jaeger_exporter .WithProcess (jaeger_exporter.Process {
113+ ServiceName : "geras" ,
114+ }),
115+ jaeger_exporter .WithDisabled (true ),
116+ jaeger_exporter .WithDisabledFromEnv (),
117+ )
118+ if err != nil {
119+ fmt .Fprintf (os .Stderr , "Could not initialize tracer: %s" , err )
120+ os .Exit (1 )
121+ }
122+
123+ otel .SetTextMapPropagator (propagation .NewCompositeTextMapPropagator (
124+ jaeger_propagator.Jaeger {},
125+ propagation.TraceContext {},
126+ propagation.Baggage {},
127+ ))
128+
129+ return flush
130+ }
131+
101132func main () {
102133 // define and parse command line flags
103134 grpcListenAddr := flag .String ("grpc-listen" , "localhost:19000" , "Service will expose the Store API on this address" )
@@ -146,6 +177,11 @@ func main() {
146177 fmt .Fprintf (os .Stderr , "Could not initialize logger: %s" , err )
147178 os .Exit (1 )
148179 }
180+
181+ // initialize distributed tracing
182+ flush := initTracer ()
183+ defer flush ()
184+
149185 // initialize tracing
150186 var transport http.RoundTripper = opentsdb .DefaultTransport
151187 if * traceEnabled {
@@ -162,7 +198,7 @@ func main() {
162198 client , err := opentsdb .NewClientContext (
163199 config.OpenTSDBConfig {
164200 OpentsdbHost : * openTSDBAddress ,
165- Transport : transport ,
201+ Transport : otelhttp . NewTransport ( transport ) ,
166202 })
167203 if err != nil {
168204 level .Error (logger ).Log ("err" , err )
@@ -188,8 +224,8 @@ func main() {
188224 // create openTSDBStore and expose its api on a grpc server
189225 srv := store .NewOpenTSDBStore (logger , client , prometheus .DefaultRegisterer , * refreshInterval , * refreshTimeout , storeLabels , allowedMetricNames , blockedMetricNames , * enableMetricSuggestions , * enableMetricNameRewriting , * healthcheckMetric , * periodCharacter )
190226 grpcSrv := grpc .NewServer (
191- grpc .StreamInterceptor (grpc_prometheus .StreamServerInterceptor ),
192- grpc .UnaryInterceptor (grpc_prometheus .UnaryServerInterceptor ),
227+ grpc .StreamInterceptor (grpc_middleware . ChainStreamServer ( grpc_prometheus .StreamServerInterceptor , otelgrpc . StreamServerInterceptor ()) ),
228+ grpc .UnaryInterceptor (grpc_middleware . ChainUnaryServer ( grpc_prometheus .UnaryServerInterceptor , otelgrpc . UnaryServerInterceptor ()) ),
193229 )
194230
195231 storepb .RegisterStoreServer (grpcSrv , srv )
0 commit comments