diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 59ffd99e0de..d9505b44a83 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -154,6 +154,10 @@ public interface BookKeeperServerStats { String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL"; String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL"; String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL"; + String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME"; + String COMPACT_RUNTIME = "COMPACT_RUNTIME"; + String EXTRACT_META_RUNTIME = "EXTRACT_META_RUNTIME"; + String ENTRY_LOG_COMPACT_RATIO = "ENTRY_LOG_COMPACT_RATIO"; // Index Related Counters String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index e58064cfd90..5126e79802b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -29,6 +29,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -59,6 +60,7 @@ public class GarbageCollectorThread implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class); private static final int SECOND = 1000; + private static final int ENTRY_LOG_USAGE_SEGMENT_COUNT = 10; private static final long MINUTE = TimeUnit.MINUTES.toMillis(1); // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger @@ -97,6 +99,8 @@ public class GarbageCollectorThread implements Runnable { private volatile long totalEntryLogSize; private volatile int numActiveEntryLogs; + private volatile double entryLogCompactRatio; + private volatile int[] currentEntryLogUsageBuckets; final CompactableLedgerStorage ledgerStorage; @@ -172,12 +176,16 @@ public GarbageCollectorThread(ServerConfiguration conf, this.numActiveEntryLogs = 0; this.totalEntryLogSize = 0L; + this.entryLogCompactRatio = 0.0; + this.currentEntryLogUsageBuckets = new int[ENTRY_LOG_USAGE_SEGMENT_COUNT]; this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf, statsLogger); this.gcStats = new GarbageCollectorStats( statsLogger, () -> numActiveEntryLogs, () -> totalEntryLogSize, - () -> garbageCollector.getNumActiveLedgers() + () -> garbageCollector.getNumActiveLedgers(), + () -> entryLogCompactRatio, + () -> currentEntryLogUsageBuckets ); this.garbageCleaner = ledgerId -> { @@ -413,12 +421,22 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin // this is used in extractMetaFromEntryLogs to calculate the usage of entry log doGcLedgers(); - // Extract all of the ledger ID's that comprise all of the entry logs - // (except for the current new one which is still being written to). - extractMetaFromEntryLogs(); - // gc entry logs - doGcEntryLogs(); + long extractMetaStart = MathUtils.nowInNano(); + try { + // Extract all of the ledger ID's that comprise all of the entry logs + // (except for the current new one which is still being written to). + extractMetaFromEntryLogs(); + + // gc entry logs + doGcEntryLogs(); + gcStats.getExtractMetaRuntime() + .registerSuccessfulEvent(MathUtils.elapsedNanos(extractMetaStart), TimeUnit.NANOSECONDS); + } catch (EntryLogMetadataMapException e) { + gcStats.getExtractMetaRuntime() + .registerFailedEvent(MathUtils.elapsedNanos(extractMetaStart), TimeUnit.NANOSECONDS); + throw e; + } if (suspendMajor) { LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); @@ -428,14 +446,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin } long curTime = System.currentTimeMillis(); + long compactStart = MathUtils.nowInNano(); if (((isForceMajorCompactionAllow && force) || (enableMajorCompaction && (force || curTime - lastMajorCompactionTime > majorCompactionInterval))) && (!suspendMajor)) { // enter major compaction - LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); + LOG.info("Enter major compaction, suspendMajor {}, lastMajorCompactionTime {}", suspendMajor, + lastMajorCompactionTime); majorCompacting.set(true); try { doCompactEntryLogs(majorCompactionThreshold, majorCompactionMaxTimeMillis); + } catch (EntryLogMetadataMapException e) { + gcStats.getCompactRuntime() + .registerFailedEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS); + throw e; } finally { lastMajorCompactionTime = System.currentTimeMillis(); // and also move minor compaction time @@ -447,23 +471,29 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin && (force || curTime - lastMinorCompactionTime > minorCompactionInterval))) && (!suspendMinor)) { // enter minor compaction - LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); + LOG.info("Enter minor compaction, suspendMinor {}, lastMinorCompactionTime {}", suspendMinor, + lastMinorCompactionTime); minorCompacting.set(true); try { doCompactEntryLogs(minorCompactionThreshold, minorCompactionMaxTimeMillis); + } catch (EntryLogMetadataMapException e) { + gcStats.getCompactRuntime() + .registerFailedEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS); + throw e; } finally { lastMinorCompactionTime = System.currentTimeMillis(); gcStats.getMinorCompactionCounter().inc(); minorCompacting.set(false); } } + gcStats.getCompactRuntime() + .registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS); gcStats.getGcThreadRuntime().registerSuccessfulEvent( MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); } catch (EntryLogMetadataMapException e) { LOG.error("Error in entryLog-metadatamap, Failed to complete GC/Compaction due to entry-log {}", e.getMessage(), e); - gcStats.getGcThreadRuntime().registerFailedEvent( - MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + gcStats.getGcThreadRuntime().registerFailedEvent(MathUtils.elapsedNanos(threadStart), TimeUnit.NANOSECONDS); } finally { if (force && forceGarbageCollection.compareAndSet(true, false)) { LOG.info("{} Set forceGarbageCollection to false after force GC to make it forceGC-able again.", @@ -477,7 +507,16 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin * Do garbage collection ledger index files. */ private void doGcLedgers() { - garbageCollector.gc(garbageCleaner); + long gcLedgersStart = MathUtils.nowInNano(); + try { + garbageCollector.gc(garbageCleaner); + gcStats.getGcLedgerRuntime() + .registerSuccessfulEvent(MathUtils.elapsedNanos(gcLedgersStart), TimeUnit.NANOSECONDS); + } catch (Throwable t) { + LOG.warn("Exception when doing gc ledger.", t); + gcStats.getGcLedgerRuntime() + .registerFailedEvent(MathUtils.elapsedNanos(gcLedgersStart), TimeUnit.NANOSECONDS); + } } /** @@ -550,7 +589,7 @@ private boolean removeIfLedgerNotExists(EntryLogMetadata meta) throws EntryLogMe void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMetadataMapException { LOG.info("Do compaction to compact those files lower than {}", threshold); - final int numBuckets = 10; + final int numBuckets = ENTRY_LOG_USAGE_SEGMENT_COUNT; int[] entryLogUsageBuckets = new int[numBuckets]; int[] compactedBuckets = new int[numBuckets]; @@ -585,7 +624,8 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet compactableBuckets.get(bucketIndex).add(meta.getEntryLogId()); }); - + currentEntryLogUsageBuckets = entryLogUsageBuckets; + gcStats.setEntryLogUsageBuckets(currentEntryLogUsageBuckets); LOG.info( "Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}", entryLogUsageBuckets); @@ -649,9 +689,11 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis); } } - LOG.info( - "Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}", - entryLogUsageBuckets, compactedBuckets); + int totalEntryLogNum = Arrays.stream(entryLogUsageBuckets).sum(); + int compactedEntryLogNum = Arrays.stream(compactedBuckets).sum(); + this.entryLogCompactRatio = totalEntryLogNum == 0 ? 0 : (double) compactedEntryLogNum / totalEntryLogNum; + LOG.info("Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}, " + + "compacted entry log ratio {}", entryLogUsageBuckets, compactedBuckets, entryLogCompactRatio); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java index f9f1e31feee..0b88a5effec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java @@ -24,7 +24,11 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_LEDGER_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GC_LEDGER_RUNTIME; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES; @@ -102,11 +106,36 @@ public class GarbageCollectorStats { help = "Current number of active ledgers" ) private final Gauge activeLedgerCountGauge; + @StatsDoc( + name = GC_LEDGER_RUNTIME, + help = "Operation stats of doing gc ledgers base on metaStore" + ) + private final OpStatsLogger gcLedgerRuntime; + @StatsDoc( + name = COMPACT_RUNTIME, + help = "Operation stats of entry log compaction" + ) + private final OpStatsLogger compactRuntime; + @StatsDoc( + name = EXTRACT_META_RUNTIME, + help = "Operation stats of extracting Meta from entryLogs" + ) + private final OpStatsLogger extractMetaRuntime; + @StatsDoc( + name = ENTRY_LOG_COMPACT_RATIO, + help = "Current proportion of compacted entry log files that have been executed" + ) + private final Gauge entryLogCompactRatioGauge; + private volatile int[] entryLogUsageBuckets; + private final Gauge[] entryLogUsageBucketsLeGauges; + public GarbageCollectorStats(StatsLogger statsLogger, Supplier activeEntryLogCountSupplier, Supplier activeEntryLogSpaceBytesSupplier, - Supplier activeLedgerCountSupplier) { + Supplier activeLedgerCountSupplier, + Supplier entryLogCompactRatioSupplier, + Supplier usageBuckets) { this.statsLogger = statsLogger; this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT); @@ -116,6 +145,10 @@ public GarbageCollectorStats(StatsLogger statsLogger, this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE); this.gcThreadRuntime = statsLogger.getOpStatsLogger(THREAD_RUNTIME); this.deletedLedgerCounter = statsLogger.getCounter(DELETED_LEDGER_COUNT); + this.gcLedgerRuntime = statsLogger.getOpStatsLogger(GC_LEDGER_RUNTIME); + this.compactRuntime = statsLogger.getOpStatsLogger(COMPACT_RUNTIME); + this.extractMetaRuntime = statsLogger.getOpStatsLogger(EXTRACT_META_RUNTIME); + this.entryLogUsageBuckets = usageBuckets.get(); this.activeEntryLogCountGauge = new Gauge() { @Override @@ -153,6 +186,44 @@ public Integer getSample() { } }; statsLogger.registerGauge(ACTIVE_LEDGER_COUNT, activeLedgerCountGauge); + this.entryLogCompactRatioGauge = new Gauge() { + @Override + public Double getDefaultValue() { + return 0.0; + } + + @Override + public Double getSample() { + return entryLogCompactRatioSupplier.get(); + } + }; + statsLogger.registerGauge(ENTRY_LOG_COMPACT_RATIO, entryLogCompactRatioGauge); + + this.entryLogUsageBucketsLeGauges = new Gauge[entryLogUsageBuckets.length]; + for (int i = 0; i < entryLogUsageBucketsLeGauges.length; i++) { + entryLogUsageBucketsLeGauges[i] = + registerEntryLogUsageBucketsLeGauge("entry_log_usage_buckets_le_" + (i + 1) * 10, i); + } + } + + private Gauge registerEntryLogUsageBucketsLeGauge(String name, int index) { + Gauge gauge = new Gauge() { + @Override + public Integer getDefaultValue() { + return 0; + } + + @Override + public Integer getSample() { + return entryLogUsageBuckets[index]; + } + }; + statsLogger.registerGauge(name, gauge); + return gauge; } + + public void setEntryLogUsageBuckets(int[] usageBuckets) { + entryLogUsageBuckets = usageBuckets; + } }