Skip to content

Commit

Permalink
Merge pull request #185 from naveen-rao-philips/main
Browse files Browse the repository at this point in the history
Support for SynchronousFlush and RetryOnError
  • Loading branch information
loafoe authored Sep 12, 2023
2 parents ba332b2 + bee471b commit 366a391
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 8 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ These keys are relevant when using either SigningKey or Service identities
| Debug | Shows request details when set to true | HSDP\_DEBUG | Optional |
| CustomField | Adds the field hash to custom field when set to true | HSDP\_CUSTOM\_FIELD | Optional |
| InsecureSkipVerify | Skip checking HSDP ingestor TLS cert. Insecure! | HSDP\_INSECURE\_SKIP\_VERIFY | Optional |
| SynchronousFlush | Flushes log messages synchronously without batching. By default this is set to *false* | | Optional |
| RetryOnError | Returns retry to FLB if flush fails. Applicable only when *SynchronousFlush* option is set. By default this is set to *false* | | Optional |

### Signing keys

Expand Down
58 changes: 50 additions & 8 deletions hsdp/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
)

var (
plugin Plugin = &fluentPlugin{}
client storer.Storer
queue chan logging.Resource
useCustomField bool
ignoreTLS bool
drop bool
plugin Plugin = &fluentPlugin{}
client storer.Storer
queue chan logging.Resource
useCustomField bool
ignoreTLS bool
drop bool
synchronousMode bool
retryEnabled bool
)

const (
Expand Down Expand Up @@ -101,13 +103,30 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
logdrainApplicationName := plugin.Environment(ctx, "LogdrainApplicationName")
logdrainServerName := plugin.Environment(ctx, "LogdrainServerName")
dropMessages := plugin.Environment(ctx, "Drop")
synchronous := plugin.Environment(ctx, "SynchronousFlush")
retry := plugin.Environment(ctx, "RetryOnError")

var err error

useCustomField = customField == "true" || customField == "yes" || customField == "1" // TODO: remove global
ignoreTLS = noTLS == "true" || noTLS == "yes" || noTLS == "1"
drop = dropMessages == "true" || dropMessages == "yes" || dropMessages == "1"
enableDebug := debugging == "true" || debugging == "yes" || debugging == "1"
synchronousMode = synchronous == "true" || synchronous == "yes" || synchronous == "1"
retryEnabled = retry == "true" || retry == "yes" || retry == "1"

if !synchronousMode && retryEnabled {
fmt.Printf("Retry is supported only in synchronouse mode. Resetting to false\n")
retryEnabled = false
}

if synchronousMode {
fmt.Printf("Synchronous flush mode enabled\n")
}

if retryEnabled {
fmt.Printf("Retry on error enabled\n")
}

c := &http.Client{
Transport: &http.Transport{
Expand Down Expand Up @@ -273,10 +292,24 @@ func flushResources(resources []logging.Resource, count int) (*logging.StoreResp
return client.StoreResources(resources, count)
}

func flushResource(resource logging.Resource) (*logging.StoreResponse, error) {

res := make([]logging.Resource, 1)
res[0] = resource
resp, err := client.StoreResources(res, 1)
if err != nil {
printError(resp, err)
} else {
fmt.Printf("[out-hsdp] Flushed 1 resource\n")
}
return resp, err
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
// do something with the data
var ret int
var status int = output.FLB_OK
var ts interface{}
var record map[interface{}]interface{}

Expand Down Expand Up @@ -310,9 +343,18 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
// error should be printed to console
continue
}
queue <- js
if synchronousMode {
_, flusherr := flushResource(js)
if retryEnabled && flusherr != nil {
fmt.Printf("[out-hsdp] Failed to flush, returning Retry..\n")
status = output.FLB_RETRY
break
}
} else {
queue <- js
}
}
return output.FLB_OK
return status
}

func mapReturnDelete(m *map[string]interface{}, key, defaultValue string) string {
Expand Down

0 comments on commit 366a391

Please sign in to comment.