Skip to content

Commit

Permalink
DSET-4559: Group By fields are part of SessionInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-majlis-s1 committed Nov 15, 2023
1 parent 5f915d0 commit b473f82
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 86 deletions.
6 changes: 3 additions & 3 deletions examples/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ go 1.19

require (
github.com/scalyr/dataset-go v0.0.0
go.uber.org/zap v1.24.0
go.uber.org/zap v1.26.0
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions examples/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -26,8 +27,10 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
Expand Down
6 changes: 3 additions & 3 deletions examples/readme/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ go 1.19

require (
github.com/scalyr/dataset-go v0.0.0
go.uber.org/zap v1.24.0
go.uber.org/zap v1.26.0
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions examples/readme/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -26,8 +27,10 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
Expand Down
11 changes: 4 additions & 7 deletions pkg/api/add_events/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ const (
AttrOrigServerHost = "__origServerHost"
)

type EventAttrs = map[string]interface{}
type (
EventAttrs = map[string]interface{}
SessionInfo = map[string]interface{}
)

// Event represents DataSet REST API event structure (see https://app.scalyr.com/help/api#addEvents)
type Event struct {
Expand Down Expand Up @@ -68,12 +71,6 @@ type Log struct {
Attrs map[string]interface{} `json:"attrs"`
}

type SessionInfo struct {
ServerType string `json:"serverType,omitempty"`
ServerId string `json:"serverId,omitempty"`
Region string `json:"region,omitempty"`
}

// AddEventsRequestParams represents a represents a AddEvent DataSet REST API request parameters, see https://app.scalyr.com/help/api#addEvents.
type AddEventsRequestParams struct {
Session string `json:"session,omitempty"`
Expand Down
12 changes: 6 additions & 6 deletions pkg/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func createTestBundle() add_events.EventBundle {

func createEmptyBuffer() *Buffer {
sessionInfo := &add_events.SessionInfo{
ServerId: "serverId",
ServerType: "serverType",
Region: "region",
"serverId": "serverId",
"serverType": "serverType",
"region": "region",
}
session := "session"
token := "token"
Expand Down Expand Up @@ -136,9 +136,9 @@ func TestPayloadFull(t *testing.T) {

func TestPayloadInjection(t *testing.T) {
sessionInfo := &add_events.SessionInfo{
ServerId: "serverId\",\"sI\":\"I",
ServerType: "serverType\",\"sT\":\"T",
Region: "region\",\"r\":\"R",
"serverId": "serverId\",\"sI\":\"I",
"serverType": "serverType\",\"sT\":\"T",
"region": "region\",\"r\":\"R",
}
session := "session\",\"s\":\"S"
token := "token\",\"events\":[{}],\"foo\":\"bar"
Expand Down
56 changes: 39 additions & 17 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ import (
Wrapper around: https://app.scalyr.com/help/api#addEvents
*/

type EventWithMeta struct {
EventBundle *add_events.EventBundle
Key string
SessionInfo add_events.SessionInfo
}

func NewEventWithMeta(bundle *add_events.EventBundle, groupBy []string) EventWithMeta {
info := make(add_events.SessionInfo)

return EventWithMeta{
EventBundle: bundle,
Key: bundle.Key(groupBy),
SessionInfo: info,
}
}

// AddEvents enqueues given events for processing (sending to Dataset).
// It returns an error if the batch was not accepted (e.g. exporter in error state and retrying handle previous batches or client is being shutdown).
func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error {
Expand All @@ -53,10 +69,15 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error

// then, figure out which keys are part of the batch
// store there information about the host
seenKeys := make(map[string]bool)
bundlesWithMeta := make(map[string][]EventWithMeta)
for _, bundle := range bundles {
key := bundle.Key(client.Config.BufferSettings.GroupBy)
seenKeys[key] = true
bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy)
list, found := bundlesWithMeta[bWM.Key]
if !found {
bundlesWithMeta[bWM.Key] = []EventWithMeta{bWM}
} else {
_ = append(list, bWM)
}
}

// update time when the first batch was received
Expand All @@ -69,21 +90,22 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
// add subscriber for buffer by key
client.addEventsMutex.Lock()
defer client.addEventsMutex.Unlock()
for key := range seenKeys {
for key, list := range bundlesWithMeta {
_, found := client.eventBundleSubscriptionChannels[key]
if !found {
// add information about the host to the sessionInfo
client.newBufferForEvents(key)
client.newBufferForEvents(key, &list[0].SessionInfo)

client.newEventBundleSubscriberRoutine(key)
}
}

// and as last step - publish them
for _, bundle := range bundles {
key := bundle.Key(client.Config.BufferSettings.GroupBy)
client.eventBundlePerKeyTopic.Pub(bundle, key)
client.eventsEnqueued.Add(1)
for key, list := range bundlesWithMeta {
for _, bundle := range list {
client.eventBundlePerKeyTopic.Pub(bundle, key)
client.eventsEnqueued.Add(1)
}
}

return nil
Expand Down Expand Up @@ -113,11 +135,11 @@ func (client *DataSetClient) newEventBundleSubscriberRoutine(key string) {
})(key, ch)
}

func (client *DataSetClient) newBufferForEvents(key string) {
func (client *DataSetClient) newBufferForEvents(key string, info *add_events.SessionInfo) {
session := fmt.Sprintf("%s-%s", client.Id, key)
buf := buffer.NewEmptyBuffer(session, client.Config.Tokens.WriteLog)

client.initBuffer(buf, client.SessionInfo)
client.initBuffer(buf, info)

client.buffersAllMutex.Lock()
client.buffers[session] = buf
Expand Down Expand Up @@ -150,7 +172,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
msg, channelReceiveSuccess := <-ch
if !channelReceiveSuccess {
client.Logger.Error(
"Cannot receive EventBundle from channel",
"Cannot receive EventWithMeta from channel",
zap.String("key", key),
zap.Any("msg", msg),
)
Expand All @@ -159,12 +181,12 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
continue
}

bundle, ok := msg.(*add_events.EventBundle)
bundle, ok := msg.(EventWithMeta)
if ok {
buf := getBuffer(key)
client.fixServerHostsInBundle(bundle)
client.fixServerHostsInBundle(bundle.EventBundle)

added, err := buf.AddBundle(bundle)
added, err := buf.AddBundle(bundle.EventBundle)
if err != nil {
if errors.Is(err, &buffer.NotAcceptingError{}) {
buf = getBuffer(key)
Expand All @@ -184,7 +206,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
}

if added == buffer.TooMuch {
added, err = buf.AddBundle(bundle)
added, err = buf.AddBundle(bundle.EventBundle)
if err != nil {
if errors.Is(err, &buffer.NotAcceptingError{}) {
buf = getBuffer(key)
Expand Down Expand Up @@ -214,7 +236,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
}
} else {
client.Logger.Error(
"Cannot convert message to EventBundle",
"Cannot convert message to EventWithMeta",
zap.String("key", key),
zap.Any("msg", msg),
)
Expand Down
3 changes: 0 additions & 3 deletions pkg/client/add_events_long_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil)
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
sc.SessionInfo = sessionInfo

for bI := 0; bI < MaxBatchCount; bI++ {
batch := make([]*add_events.EventBundle, 0)
for lI := 0; lI < LogsPerBatch; lI++ {
Expand Down
Loading

0 comments on commit b473f82

Please sign in to comment.