Skip to content

Commit 811c57f

Browse files
committed
out_stackdriver: fix metric accounting for dropped records and decouple logic
This patch refactors the response handling in the stackdriver output plugin to resolve incorrect metric accounting and inconsistent behavior between builds with and without Prometheus metrics enabled. 1. Defer Metric Accounting to Core Engine: Removed manual increment of cmt_dropped_records on standard 4xx errors. The core engine automatically increments drop counters when the plugin returns FLB_ERROR. Manual increments caused double counting. This aligns with how other plugins (like out_cloudwatch_logs) defer core accounting to the engine. 2. Enable Partial Success Handling for All Builds: Moved parse_partial_success_response outside of #ifdef FLB_HAVE_METRICS. Previously, partial success was only checked if metrics were enabled. Now, all builds will correctly identify partial successes and return FLB_OK to clear the chunk, preventing unnecessary retries and ensuring consistent behavior across different build configurations. 3. Added Visibility Logging: Added a warning log for partial successes that reports the exact number of failed records, providing operator visibility into silent drops. 4. Style Guide Compliance: Ensured all variables are declared at the beginning of the function scope as required by Fluent Bit style guidelines. Moved ts outside of ifdef to support its use on all builds. 5. Fixed Double Counting on Partial Success: Guarded the batch success metrics update in if (ret_code == FLB_OK) to ensure we don't double count successful records that were already accounted for in the partial success block. 6. Fixed Partial Success Parsing Bug and Safety Issue: Fixed parse_partial_success_response to only return 0 when a partial success detail is actually found, preventing accidental dropping of logs on standard 400/403 errors with other details. Also split the type check to ensure ret == 0 before accessing string pointers, avoiding potential crashes. Signed-off-by: pdewilde <pdewilde@google.com>
1 parent 3e414ac commit 811c57f

File tree

1 file changed

+54
-37
lines changed

1 file changed

+54
-37
lines changed

plugins/out_stackdriver/stackdriver.c

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2777,6 +2777,7 @@ static int parse_partial_success_response(struct flb_http_client* c,
27772777
msgpack_object logEntryError_map;
27782778
msgpack_object logEntryCode;
27792779
msgpack_object at_type;
2780+
int partial_success_found = FLB_FALSE;
27802781

27812782
if (c->resp.status != 400 && c->resp.status != 403) {
27822783
return -1;
@@ -2866,15 +2867,20 @@ static int parse_partial_success_response(struct flb_http_client* c,
28662867
"@type", 5,
28672868
MSGPACK_OBJECT_STR,
28682869
&at_type);
2870+
if (ret != 0 ||
2871+
at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE) {
2872+
continue;
2873+
}
2874+
28692875
strncpy(at_type_str, at_type.via.str.ptr,
28702876
PARTIAL_SUCCESS_GRPC_TYPE_SIZE);
2871-
if (ret != 0 ||
2872-
at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE ||
2873-
strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE,
2874-
PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) {
2877+
if (strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE,
2878+
PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) {
28752879
continue;
28762880
}
28772881

2882+
partial_success_found = FLB_TRUE;
2883+
28782884
ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map,
28792885
"logEntryErrors", 14,
28802886
MSGPACK_OBJECT_MAP,
@@ -2925,7 +2931,7 @@ static int parse_partial_success_response(struct flb_http_client* c,
29252931
}
29262932
flb_free(buffer);
29272933
msgpack_unpacked_destroy(&result);
2928-
return 0;
2934+
return partial_success_found == FLB_TRUE ? 0 : -1;
29292935
}
29302936
static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
29312937
struct flb_output_flush *out_flush,
@@ -2937,7 +2943,8 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
29372943
(void) config;
29382944
int ret;
29392945
int code;
2940-
int ret_partial_success;
2946+
int failed_records = 0;
2947+
int ret_partial_success = -1;
29412948
int ret_code = FLB_RETRY;
29422949
int formatted_records = 0;
29432950
int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0};
@@ -2953,9 +2960,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
29532960
uint64_t write_entries_start = 0;
29542961
uint64_t write_entries_end = 0;
29552962
float write_entries_latency = 0.0;
2963+
uint64_t ts = cfl_time_now();
29562964
#ifdef FLB_HAVE_METRICS
29572965
char *name = (char *) flb_output_name(ctx->ins);
2958-
uint64_t ts = cfl_time_now();
29592966
#endif
29602967

29612968
/* Reformat msgpack to stackdriver JSON payload */
@@ -3069,53 +3076,60 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
30693076
ret_code = FLB_OK;
30703077
}
30713078
else {
3072-
#ifdef FLB_HAVE_METRICS
30733079
/* check partial success */
30743080
ret_partial_success =
30753081
parse_partial_success_response(c,
30763082
ctx,
30773083
ts,
30783084
grpc_status_counts);
30793085

3080-
int failed_records = 0;
3081-
if (ret_partial_success == 0) {
3082-
for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
3083-
if (grpc_status_counts[code] != 0) {
3084-
failed_records += grpc_status_counts[code];
3085-
}
3086-
}
3087-
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
3088-
failed_records, 1, (char* []) {name});
3089-
int successful_records =
3090-
formatted_records - failed_records;
3091-
if (successful_records != 0) {
3092-
add_record_metrics(ctx, ts, successful_records, 200, 0);
3086+
if (ret_partial_success == 0) {
3087+
ret_code = FLB_OK;
3088+
3089+
for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
3090+
if (grpc_status_counts[code] != 0) {
3091+
failed_records += grpc_status_counts[code];
30933092
}
30943093
}
3095-
else {
3096-
add_record_metrics(ctx, ts, formatted_records,
3097-
c->resp.status, -1);
3098-
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
3099-
formatted_records, 1,
3100-
(char* []) {name});
3094+
3095+
flb_plg_warn(ctx->ins, "partial success: %d of %d records dropped",
3096+
failed_records, formatted_records);
3097+
3098+
#ifdef FLB_HAVE_METRICS
3099+
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
3100+
failed_records, 1, (char* []) {name});
3101+
int successful_records =
3102+
formatted_records - failed_records;
3103+
if (successful_records != 0) {
3104+
add_record_metrics(ctx, ts, successful_records, 200, 0);
31013105
}
31023106
#endif
3103-
if (c->resp.status >= 400 && c->resp.status < 500) {
3104-
ret_code = FLB_ERROR;
3105-
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
3106-
c->resp.payload);
31073107
}
3108+
#ifdef FLB_HAVE_METRICS
31083109
else {
3109-
if (c->resp.payload_size > 0) {
3110-
/* we got an error */
3110+
add_record_metrics(ctx, ts, formatted_records,
3111+
c->resp.status, -1);
3112+
}
3113+
#endif
3114+
/* Partial success is set FLB_OK, with manual update of failure metrics */
3115+
if (ret_partial_success != 0) {
3116+
if (c->resp.status >= 400 && c->resp.status < 500) {
3117+
ret_code = FLB_ERROR;
31113118
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
31123119
c->resp.payload);
31133120
}
31143121
else {
3115-
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
3116-
c->resp.payload);
3122+
if (c->resp.payload_size > 0) {
3123+
/* we got an error */
3124+
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
3125+
c->resp.payload);
3126+
}
3127+
else {
3128+
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
3129+
c->resp.payload);
3130+
}
3131+
ret_code = FLB_RETRY;
31173132
}
3118-
ret_code = FLB_RETRY;
31193133
}
31203134
}
31213135
}
@@ -3127,7 +3141,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
31273141
if (write_entries_latency > 0.0) {
31283142
cmt_histogram_observe(ctx->cmt_write_entries_latency, ts, write_entries_latency, 1, (char *[]) {name});
31293143
}
3130-
add_record_metrics(ctx, ts, formatted_records, 200, 0);
3144+
/* Only add full batch metrics if this was NOT a partial success */
3145+
if (ret_partial_success != 0) {
3146+
add_record_metrics(ctx, ts, formatted_records, 200, 0);
3147+
}
31313148

31323149
/* OLD api */
31333150
flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);

0 commit comments

Comments
 (0)