Skip to content

Commit 8c78f65

Browse files
committed
fix(outputs.influxdb_v2): Fix panic and API error handling
1 parent 90836c8 commit 8c78f65

File tree

3 files changed

+407
-35
lines changed

3 files changed

+407
-35
lines changed

plugins/outputs/influxdb_v2/http.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,17 @@ import (
2626
)
2727

2828
type APIError struct {
29-
StatusCode int
30-
Title string
31-
Description string
29+
Err error
30+
StatusCode int
31+
Retryable bool
3232
}
3333

3434
func (e APIError) Error() string {
35-
if e.Description != "" {
36-
return fmt.Sprintf("%s: %s", e.Title, e.Description)
37-
}
38-
return e.Title
35+
return e.Err.Error()
36+
}
37+
38+
func (e APIError) Unwrap() error {
39+
return e.Err
3940
}
4041

4142
const (
@@ -185,7 +186,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
185186
}
186187

187188
batches[bucket] = append(batches[bucket], metric)
188-
batchIndices[c.bucket] = append(batchIndices[c.bucket], i)
189+
batchIndices[bucket] = append(batchIndices[bucket], i)
189190
}
190191
}
191192

@@ -201,10 +202,14 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
201202
var apiErr *APIError
202203
if errors.As(err, &apiErr) {
203204
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
205+
// TODO: Need a testcase to verify rejected metrics are not retried...
204206
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
205207
}
206208
wErr.Err = err
207-
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
209+
if !apiErr.Retryable {
210+
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
211+
}
212+
// TODO: Clarify if we should continue here to try the remaining buckets?
208213
return &wErr
209214
}
210215

@@ -301,21 +306,19 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
301306
}
302307

303308
// We got an error and now try to decode further
309+
var desc string
304310
writeResp := &genericRespError{}
305-
err = json.NewDecoder(resp.Body).Decode(writeResp)
306-
desc := writeResp.Error()
307-
if err != nil {
308-
desc = resp.Status
311+
if json.NewDecoder(resp.Body).Decode(writeResp) == nil {
312+
desc = writeResp.Error()
309313
}
310314

311315
switch resp.StatusCode {
312316
// request was too large, send back to try again
313317
case http.StatusRequestEntityTooLarge:
314318
c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket)
315319
return &APIError{
316-
StatusCode: resp.StatusCode,
317-
Title: resp.Status,
318-
Description: desc,
320+
Err: fmt.Errorf("%s: %s", resp.Status, desc),
321+
StatusCode: resp.StatusCode,
319322
}
320323
case
321324
// request was malformed:
@@ -325,14 +328,10 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
325328
http.StatusUnprocessableEntity,
326329
http.StatusNotAcceptable:
327330

328-
// Clients should *not* repeat the request and the metrics should be dropped.
329-
rejected := make([]int, 0, len(metrics))
330-
for i := range len(metrics) {
331-
rejected = append(rejected, i)
332-
}
333-
return &internal.PartialWriteError{
334-
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
335-
MetricsReject: rejected,
331+
// Clients should *not* repeat the request and the metrics should be rejected.
332+
return &APIError{
333+
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
334+
StatusCode: resp.StatusCode,
336335
}
337336
case http.StatusUnauthorized, http.StatusForbidden:
338337
return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc)
@@ -351,13 +350,9 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
351350
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
352351
// retrying will not make the request magically work.
353352
if len(resp.Status) > 0 && resp.Status[0] == '4' {
354-
rejected := make([]int, 0, len(metrics))
355-
for i := range len(metrics) {
356-
rejected = append(rejected, i)
357-
}
358-
return &internal.PartialWriteError{
359-
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
360-
MetricsReject: rejected,
353+
return &APIError{
354+
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
355+
StatusCode: resp.StatusCode,
361356
}
362357
}
363358

@@ -367,10 +362,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
367362
desc = fmt.Sprintf("%s; %s", desc, xErr)
368363
}
369364

365+
if desc != "" {
366+
desc = ": " + desc
367+
}
368+
370369
return &APIError{
371-
StatusCode: resp.StatusCode,
372-
Title: resp.Status,
373-
Description: desc,
370+
Err: fmt.Errorf("failed to write metric to bucket %q: %s%s", bucket, resp.Status, desc),
371+
StatusCode: resp.StatusCode,
372+
Retryable: true,
374373
}
375374
}
376375

plugins/outputs/influxdb_v2/influxdb_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
199199
for _, n := range rand.Perm(len(i.clients)) {
200200
client := i.clients[n]
201201
if err := client.Write(ctx, metrics); err != nil {
202+
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
202203
var werr *internal.PartialWriteError
203204
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
204205
return err
205206
}
206-
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
207207
continue
208208
}
209209
return nil

0 commit comments

Comments
 (0)