From 664af5f377ef92d739c1f227e948e504b431a468 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 9 Jan 2024 19:01:56 +0800 Subject: [PATCH] chore: minor fix with metric (#17) --- docs/metrics-display.md | 2 +- .../java/io/greptime/common/util/Clock.java | 2 +- .../main/java/io/greptime/WriteClient.java | 20 ++++++++++--------- .../io/greptime/options/GreptimeOptions.java | 7 ++++--- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/metrics-display.md b/docs/metrics-display.md index 6f1c22a..de3abb3 100644 --- a/docs/metrics-display.md +++ b/docs/metrics-display.md @@ -30,11 +30,11 @@ By default, 2 files are generated in the program's working directory | Histogram | insert_rows_success_num | Statistics on the number of successful writes. | | Histogram | serializing_executor_drain_num\_${name} | Serializing executor. Statistics on the number of draining tasks. | | Histogram | write_limiter_acquire_available_permits | Statistics on the number of available permits for write data(insert/delete). | -| Histogram | write_stream_limiter_acquire_wait_time | Statistics on the time spent acquiring write data (insert/delete) permits when using `StreamWriter`,
note that it does not include the time spent writing, only the time spent acquiring the permit. | | Meter | connection_failure | Statistics on the number of failed connections. | | Meter | write_by_retries_${n} | QPS for the nth retry write, n == 0 for the first write (non-retry), n > 3 will be counted as n == 3 | | Meter | write_failure_num | Statistics on the number of failed writes. | | Meter | write_qps | Write Request QPS | +| Timer | write_stream_limiter_acquire_wait_time | Statistics on the time spent acquiring write data (insert/delete) permits when using `StreamWriter`,
note that it does not include the time spent writing, only the time spent acquiring the permit. | | Timer | async_write_pool.time | Asynchronous pool time statistics for asynchronous write tasks in SDK, this is important and it is recommended to focus on it. | | Timer | direct_executor_timer_rpc_direct_pool | he appearance of this metric means that we are using the current thread to execute the asynchronous callback of the rpc client, which is the default configuration.
This is usually sufficient and very resource-saving, but it needs attention. When there are problems, replace it with a thread pool in time. | | Timer | req_rt_${service_name}/${method_name} | The time consumption statistics of the request, the service name and method name are the names of the service and method of the grpc request. | diff --git a/ingester-common/src/main/java/io/greptime/common/util/Clock.java b/ingester-common/src/main/java/io/greptime/common/util/Clock.java index 04bacaf..3a35762 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Clock.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Clock.java @@ -29,7 +29,7 @@ public abstract class Clock { */ public abstract long getTick(); - public long duration(final long startTick) { + public long duration(long startTick) { return getTick() - startTick; } diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index a2d6cd9..5511cc3 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -17,6 +17,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import com.google.common.util.concurrent.RateLimiter; import io.greptime.common.Display; import io.greptime.common.Endpoint; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; /** * Default Write API impl. @@ -254,8 +256,8 @@ static final class InnerMetricHelper { static final Histogram DELETE_ROWS_SUCCESS_NUM = MetricsUtil.histogram("delete_rows_success_num"); static final Histogram INSERT_ROWS_FAILURE_NUM = MetricsUtil.histogram("insert_rows_failure_num"); static final Histogram DELETE_ROWS_FAILURE_NUM = MetricsUtil.histogram("delete_rows_failure_num"); - static final Histogram WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME = MetricsUtil - .histogram("write_stream_limiter_acquire_wait_time"); + static final Timer WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME = MetricsUtil + .timer("write_stream_limiter_acquire_wait_time"); static final Meter WRITE_FAILURE_NUM = MetricsUtil.meter("write_failure_num"); static final Meter WRITE_QPS = MetricsUtil.meter("write_qps"); @@ -281,7 +283,7 @@ static Histogram writeRowsFailureNum(WriteOp writeOp) { } } - static Histogram writeStreamLimiterAcquireWaitTime() { + static Timer writeStreamLimiterAcquireWaitTime() { return WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME; } @@ -340,14 +342,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table table, WriteOp writeOp) { Ensures.ensureNonNull(table, "null `table`"); - WriteTables writeTables = new WriteTables(table, writeOp); - - if (this.rateLimiter != null) { - double timeSpent = this.rateLimiter.acquire(table.pointCount()); - InnerMetricHelper.writeStreamLimiterAcquireWaitTime().update((long) timeSpent); + int permits = table.pointCount(); + if (this.rateLimiter != null && permits > 0) { + double millisToWait = this.rateLimiter.acquire(permits) * 1000; + InnerMetricHelper.writeStreamLimiterAcquireWaitTime() + .update((long) millisToWait, TimeUnit.MILLISECONDS); } - this.observer.onNext(writeTables); + this.observer.onNext(new WriteTables(table, writeOp)); return this; } } diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 310f602..991b7c0 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -217,7 +217,8 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) { * - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. * - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. * - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. - * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if + * the limiter is full, abort if timeout. * The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` * * @param writeLimitedPolicy write limited policy @@ -229,8 +230,8 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) { } /** - * The default rate limit for `StreamWriter`. It only takes effect when we do not specify the - * `maxPointsPerSecond` when creating a `StreamWriter`. + * The default rate limit value(points per second) for `StreamWriter`. It only takes + * effect when we do not specify the `maxPointsPerSecond` when creating a `StreamWriter`. * The default is 10 * 65536 * * @param defaultStreamMaxWritePointsPerSecond default max write points per second