diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 25ae21964ba0e..5508c501b30a2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; @@ -104,6 +105,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; @@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() { } } + public void testShardWriteLoadsArePresent() { + // Create some indices and some write-load + final int numIndices = randomIntBetween(1, 5); + final String indexPrefix = randomIdentifier(); + IntStream.range(0, numIndices).forEach(i -> { + final String indexName = indexPrefix + "_" + i; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build()); + IntStream.range(0, randomIntBetween(1, 500)) + .forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get()); + }); + + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + + // Not collecting stats yet because allocation write load stats collection is disabled by default. + { + ClusterInfoServiceUtils.refresh(clusterInfoService); + final Map shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); + assertNotNull(shardWriteLoads); + assertTrue(shardWriteLoads.isEmpty()); + } + + // Enable collection for node write loads. + updateClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + + try { + // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. + ClusterInfoServiceUtils.refresh(clusterInfoService); + final Map shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); + + // Verify that each shard has write-load reported. + final ClusterState state = getInstanceFromNode(ClusterService.class).state(); + assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size()); + double maximumLoadRecorded = 0; + for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) { + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), i); + assertTrue(shardWriteLoads.containsKey(shardId)); + maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded); + } + } + // And that at least one is greater than zero + assertThat(maximumLoadRecorded, greaterThan(0.0)); + } finally { + updateClusterSettings( + Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build() + ); + } + } + public void testIndexCanChangeCustomDataPath() throws Exception { final String index = "test-custom-data-path"; final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 11a0103cd22e0..d5aa9e78a8c8a 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -342,6 +342,7 @@ static TransportVersion def(int id) { public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00); public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00); + public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_124_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 6d11700500c24..58f8993e3c529 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map reservedSpace; final Map estimatedHeapUsages; final Map nodeUsageStatsForThreadPools; + final Map shardWriteLoads; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -85,7 +86,8 @@ public ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, - Map nodeUsageStatsForThreadPools + Map nodeUsageStatsForThreadPools, + Map shardWriteLoads ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -95,6 +97,7 @@ public ClusterInfo( this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); + this.shardWriteLoads = Map.copyOf(shardWriteLoads); } public ClusterInfo(StreamInput in) throws IOException { @@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.nodeUsageStatsForThreadPools = Map.of(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { + this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble); + } else { + this.shardWriteLoads = Map.of(); + } } @Override @@ -136,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); } + if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { + out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble); + } } /** @@ -216,7 +227,7 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid + // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid // committing to API payloads until the features are settled ); } @@ -255,6 +266,16 @@ public Map getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; } + /** + * Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted + * as the average number of threads that ingestion to the shard will consume. + * This information may be partial or missing altogether under some circumstances. The absence of a shard + * write load from the map should be interpreted as "unknown". + */ + public Map getShardWriteLoads() { + return shardWriteLoads; + } + /** * Returns the shard size for the given shardId or null if that metric is not available. */ @@ -466,6 +487,7 @@ public static class Builder { private Map reservedSpace = Map.of(); private Map estimatedHeapUsages = Map.of(); private Map nodeUsageStatsForThreadPools = Map.of(); + private Map shardWriteLoads = Map.of(); public ClusterInfo build() { return new ClusterInfo( @@ -476,7 +498,8 @@ public ClusterInfo build() { dataPath, reservedSpace, estimatedHeapUsages, - nodeUsageStatsForThreadPools + nodeUsageStatsForThreadPools, + shardWriteLoads ); } @@ -519,5 +542,10 @@ public Builder nodeUsageStatsForThreadPools(Map shardWriteLoads) { + this.shardWriteLoads = shardWriteLoads; + return this; + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 7e995404191d6..fd9c62daebd29 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeThreadPoolUsageStats + nodeThreadPoolUsageStats, + allocation.clusterInfo().getShardWriteLoads() ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 89394c8fa8ba8..d4ecec83ebc8c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.threadpool.ThreadPool; @@ -215,7 +216,7 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - maybeFetchIndicesStats(diskThresholdEnabled); + maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); @@ -301,7 +302,14 @@ public void onFailure(Exception e) { private void fetchIndicesStats() { final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.clear(); - indicesStatsRequest.store(true); + if (diskThresholdEnabled) { + // This returns the shard sizes on disk + indicesStatsRequest.store(true); + } + if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { + // This returns the shard write-loads + indicesStatsRequest.indexing(true); + } indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN); indicesStatsRequest.timeout(fetchTimeout); client.admin() @@ -350,6 +358,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { } final ShardStats[] stats = indicesStatsResponse.getShards(); + final Map shardWriteLoadByIdentifierBuilder = new HashMap<>(); final Map shardSizeByIdentifierBuilder = new HashMap<>(); final Map shardDataSetSizeBuilder = new HashMap<>(); final Map dataPath = new HashMap<>(); @@ -357,6 +366,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { new HashMap<>(); buildShardLevelInfo( adjustShardStats(stats), + shardWriteLoadByIdentifierBuilder, shardSizeByIdentifierBuilder, shardDataSetSizeBuilder, dataPath, @@ -370,7 +380,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { Map.copyOf(shardSizeByIdentifierBuilder), Map.copyOf(shardDataSetSizeBuilder), Map.copyOf(dataPath), - Map.copyOf(reservedSpace) + Map.copyOf(reservedSpace), + Map.copyOf(shardWriteLoadByIdentifierBuilder) ); } @@ -527,8 +538,6 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); - final Map nodeThreadPoolUsageStats = new HashMap<>(); - nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, @@ -537,7 +546,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, - nodeThreadPoolUsageStats + nodeThreadPoolUsageStatsPerNode, + indicesStatsSummary.shardWriteLoads() ); } @@ -567,6 +577,7 @@ public void addListener(Consumer clusterInfoConsumer) { static void buildShardLevelInfo( ShardStats[] stats, + Map shardWriteLoads, Map shardSizes, Map shardDataSetSizeBuilder, Map dataPathByShard, @@ -577,25 +588,31 @@ static void buildShardLevelInfo( dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath()); final StoreStats storeStats = s.getStats().getStore(); - if (storeStats == null) { - continue; - } - final long size = storeStats.sizeInBytes(); - final long dataSetSize = storeStats.totalDataSetSizeInBytes(); - final long reserved = storeStats.reservedSizeInBytes(); - - final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); - logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); - shardSizes.put(shardIdentifier, size); - if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { - shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); + if (storeStats != null) { + final long size = storeStats.sizeInBytes(); + final long dataSetSize = storeStats.totalDataSetSizeInBytes(); + final long reserved = storeStats.reservedSizeInBytes(); + + final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); + logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); + shardSizes.put(shardIdentifier, size); + if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { + shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); + } + if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { + final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( + new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), + t -> new ClusterInfo.ReservedSpace.Builder() + ); + reservedSpaceBuilder.add(shardRouting.shardId(), reserved); + } } - if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { - final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( - new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), - t -> new ClusterInfo.ReservedSpace.Builder() - ); - reservedSpaceBuilder.add(shardRouting.shardId(), reserved); + final IndexingStats indexingStats = s.getStats().getIndexing(); + if (indexingStats != null) { + final double shardWriteLoad = indexingStats.getTotal().getPeakWriteLoad(); + if (shardWriteLoad > shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)) { + shardWriteLoads.put(shardRouting.shardId(), shardWriteLoad); + } } } } @@ -623,9 +640,10 @@ private record IndicesStatsSummary( Map shardSizes, Map shardDataSetSizes, Map dataPath, - Map reservedSpace + Map reservedSpace, + Map shardWriteLoads ) { - static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of()); + static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 814aa102ce284..e0e749aaa2360 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -44,10 +44,20 @@ public static ClusterInfo randomClusterInfo() { randomRoutingToDataPath(), randomReservedSpace(), randomNodeHeapUsage(), - randomNodeUsageStatsForThreadPools() + randomNodeUsageStatsForThreadPools(), + randomShardWriteLoad() ); } + private static Map randomShardWriteLoad() { + final int numEntries = randomIntBetween(0, 128); + final Map builder = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + builder.put(randomShardId(), randomDouble()); + } + return builder; + } + private static Map randomNodeHeapUsage() { int numEntries = randomIntBetween(0, 128); Map nodeHeapUsage = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 80c2395ae9644..3eacc0c4fcec0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -135,9 +135,17 @@ public void testFillShardLevelInfo() { 0 ) }; Map shardSizes = new HashMap<>(); + HashMap shardWriteLoads = new HashMap<>(); Map shardDataSetSizes = new HashMap<>(); Map routingToPath = new HashMap<>(); - InternalClusterInfoService.buildShardLevelInfo(stats, shardSizes, shardDataSetSizes, routingToPath, new HashMap<>()); + InternalClusterInfoService.buildShardLevelInfo( + stats, + shardWriteLoads, + shardSizes, + shardDataSetSizes, + routingToPath, + new HashMap<>() + ); assertThat( shardSizes, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 37646d376f8fd..1f8d59a958bfe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -621,6 +621,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), + ImmutableOpenMap.of(), ImmutableOpenMap.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index f85c2678e04e7..c4ca84e6e977f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1406,7 +1406,17 @@ static class DevNullClusterInfo extends ClusterInfo { Map shardSizes, Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); + super( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + Map.of(), + Map.of(), + reservedSpace, + Map.of(), + Map.of(), + Map.of() + ); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 117afe0cec877..debb4343931d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -110,6 +110,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -183,6 +184,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -330,6 +332,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) { shardRoutingMap, Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 6c4066a447b67..c76a88b0da2f9 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -961,6 +961,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info;