Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ymtdzzz committed Jul 7, 2024
1 parent b98cea8 commit 9964aa4
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 5 deletions.
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ service:
receivers: [otlp]
processors: []
exporters: [tui]
metrics:
receivers: [otlp]
processors: []
exporters: [tui]
`

configProviderSettings := otelcol.ConfigProviderSettings{
Expand Down
7 changes: 7 additions & 0 deletions tuiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ymtdzzz/otel-tui/tuiexporter/internal/tui"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand All @@ -27,6 +28,12 @@ func (e *tuiExporter) pushTraces(_ context.Context, traces ptrace.Traces) error
return nil
}

func (e *tuiExporter) pushMetrics(_ context.Context, metrics pmetric.Metrics) error {
e.app.Store().AddMetric(&metrics)

return nil
}

func (e *tuiExporter) pushLogs(_ context.Context, logs plog.Logs) error {
e.app.Store().AddLog(&logs)

Expand Down
23 changes: 22 additions & 1 deletion tuiexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewFactory() exporter.Factory {
component.MustNewType("tui"),
createDefaultConfig,
exporter.WithTraces(createTraces, stability),
//exporter.WithMetrics(createMetrics, stability),
exporter.WithMetrics(createMetrics, stability),
exporter.WithLogs(createLogs, stability),
)
}
Expand Down Expand Up @@ -49,6 +49,27 @@ func createTraces(ctx context.Context, set exporter.Settings, cfg component.Conf
)
}

func createMetrics(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Metrics, error) {
oCfg := cfg.(*Config)

e, err := exporters.LoadOrStore(
oCfg,
func() (*tuiExporter, error) {
return newTuiExporter(oCfg), nil
},
&set.TelemetrySettings,
)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(ctx, set, oCfg,
e.Unwrap().pushMetrics,
exporterhelper.WithStart(e.Start),
exporterhelper.WithShutdown(e.Shutdown),
)
}

func createLogs(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) {
oCfg := cfg.(*Config)

Expand Down
71 changes: 69 additions & 2 deletions tuiexporter/internal/telemetry/store.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package telemetry

import (
"log"
"strings"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
MAX_SERVICE_SPAN_COUNT = 1000
MAX_METRIC_COUNT = 1000
MAX_LOG_COUNT = 1000
)

Expand All @@ -32,6 +35,14 @@ func (sd *SpanData) IsRoot() bool {
// This is a slice of one span of a single service
type SvcSpans []*SpanData

// MetricData is a struct to represent a metric
type MetricData struct {
Metric *pmetric.Metric
ResourceMetric *pmetric.ResourceMetrics
ScopeMetric *pmetric.ScopeMetrics
ReceivedAt time.Time
}

// LogData is a struct to represent a log
type LogData struct {
Log *plog.LogRecord
Expand All @@ -58,11 +69,13 @@ type Store struct {
svcspans SvcSpans
svcspansFiltered SvcSpans
tracecache *TraceCache
metrics []*MetricData
logs []*LogData
logsFiltered []*LogData
logcache *LogCache
updatedAt time.Time
maxServiceSpanCount int
maxMetricCount int
maxLogCount int
}

Expand All @@ -77,6 +90,7 @@ func NewStore() *Store {
logsFiltered: []*LogData{},
logcache: NewLogCache(),
maxServiceSpanCount: MAX_SERVICE_SPAN_COUNT, // TODO: make this configurable
maxMetricCount: MAX_METRIC_COUNT, // TODO: make this configurable
maxLogCount: MAX_LOG_COUNT, // TODO: make this configurable
}
}
Expand All @@ -101,6 +115,11 @@ func (s *Store) GetFilteredSvcSpans() *SvcSpans {
return &s.svcspansFiltered
}

// GetMetrics returns the metrics in the store
func (s *Store) GetMetrics() *[]*MetricData {
return &s.metrics
}

// GetFilteredLogs returns the filtered logs in the store
func (s *Store) GetFilteredLogs() *[]*LogData {
return &s.logsFiltered
Expand Down Expand Up @@ -177,6 +196,14 @@ func (s *Store) GetFilteredServiceSpansByIdx(idx int) []*SpanData {
return spans
}

// GetMetricByIdx returns the metric at the given index
func (s *Store) GetMetricByIdx(idx int) *MetricData {
if idx < 0 || idx >= len(s.metrics) {
return nil
}
return s.metrics[idx]
}

// GetFilteredLogByIdx returns the log at the given index
func (s *Store) GetFilteredLogByIdx(idx int) *LogData {
if idx < 0 || idx >= len(s.logsFiltered) {
Expand All @@ -185,7 +212,7 @@ func (s *Store) GetFilteredLogByIdx(idx int) *LogData {
return s.logsFiltered[idx]
}

// AddSpan adds a span to the store
// AddSpan adds spans to the store
func (s *Store) AddSpan(traces *ptrace.Traces) {
s.mut.Lock()
defer func() {
Expand Down Expand Up @@ -230,7 +257,47 @@ func (s *Store) AddSpan(traces *ptrace.Traces) {
s.updateFilterService()
}

// AddLog adds a log to the store
// AddMetric adds metrics to the store
func (s *Store) AddMetric(metrics *pmetric.Metrics) {
s.mut.Lock()
defer func() {
s.updatedAt = time.Now()
s.mut.Unlock()
}()
log.Println("AddMetric called")

for rmi := 0; rmi < metrics.ResourceMetrics().Len(); rmi++ {
rm := metrics.ResourceMetrics().At(rmi)

for smi := 0; smi < rm.ScopeMetrics().Len(); smi++ {
sm := rm.ScopeMetrics().At(smi)

for si := 0; si < sm.Metrics().Len(); si++ {
metric := sm.Metrics().At(si)
sd := &MetricData{
Metric: &metric,
ResourceMetric: &rm,
ScopeMetric: &sm,
ReceivedAt: time.Now(),
}
s.metrics = append(s.metrics, sd)
}
}
}

// data rotation
if len(s.svcspans) > s.maxServiceSpanCount {
deleteSpans := s.svcspans[:len(s.svcspans)-s.maxServiceSpanCount]

s.tracecache.DeleteCache(deleteSpans)

s.svcspans = s.svcspans[len(s.svcspans)-s.maxServiceSpanCount:]
}

// TODO: update filtered
}

// AddLog adds logs to the store
func (s *Store) AddLog(logs *plog.Logs) {
s.mut.Lock()
defer func() {
Expand Down
Loading

0 comments on commit 9964aa4

Please sign in to comment.