Skip to content

Commit 69f13dd

Browse files
committed
Fixed nits after self review
Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
1 parent 24623ba commit 69f13dd

File tree

7 files changed

+274
-24
lines changed

7 files changed

+274
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Improve performace of NumericTermAggregation by avoiding unnecessary sorting([#17252](https://github.com/opensearch-project/OpenSearch/pull/17252))
1414
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))
1515
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))
16-
- Restrict Search Replica Allocation to Search-Dedicated Nodes ([#17457](https://github.com/opensearch-project/OpenSearch/pull/17457))
16+
- Search Replica Allocation and Recovery ([#17457](https://github.com/opensearch-project/OpenSearch/pull/17457))
1717

1818
### Dependencies
1919
- Bump `org.awaitility:awaitility` from 4.2.0 to 4.3.0 ([#17230](https://github.com/opensearch-project/OpenSearch/pull/17230), [#17439](https://github.com/opensearch-project/OpenSearch/pull/17439))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.cluster.allocation;
34+
35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
37+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
38+
import org.opensearch.cluster.ClusterState;
39+
import org.opensearch.cluster.metadata.IndexMetadata;
40+
import org.opensearch.cluster.routing.IndexRoutingTable;
41+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
42+
import org.opensearch.cluster.routing.ShardRouting;
43+
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
44+
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.common.util.FeatureFlags;
46+
import org.opensearch.indices.replication.common.ReplicationType;
47+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
48+
import org.opensearch.test.OpenSearchIntegTestCase;
49+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
50+
51+
import java.util.HashMap;
52+
import java.util.List;
53+
import java.util.Map;
54+
55+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
56+
import static org.hamcrest.Matchers.anyOf;
57+
import static org.hamcrest.Matchers.equalTo;
58+
59+
@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
60+
public class SearchReplicaAwarenessAllocationIT extends RemoteStoreBaseIntegTestCase {
61+
62+
private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationIT.class);
63+
64+
@Override
65+
protected Settings featureFlagSettings() {
66+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
67+
}
68+
69+
public void testAllocationAwarenessZones() {
70+
Settings commonSettings = Settings.builder()
71+
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
72+
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
73+
.build();
74+
75+
logger.info("--> starting 8 nodes on different zones");
76+
List<String> nodes = internalCluster().startNodes(
77+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
78+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
79+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
80+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
81+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
82+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
83+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
84+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
85+
);
86+
87+
logger.info("--> waiting for nodes to form a cluster");
88+
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("8").execute().actionGet();
89+
assertThat(health.isTimedOut(), equalTo(false));
90+
91+
logger.info("--> set search dedicated for nodes");
92+
setSearchDedicatedNodeSettings(nodes.get(4) + ", " + nodes.get(5) + ", " + nodes.get(6) + ", " + nodes.get(7));
93+
94+
logger.info("--> create index");
95+
createIndex(
96+
"test",
97+
Settings.builder()
98+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
99+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
101+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
102+
.build()
103+
);
104+
105+
logger.info("--> waiting for shards to be allocated");
106+
ensureGreen("test");
107+
108+
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
109+
final Map<String, Integer> counts = new HashMap<>();
110+
111+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
112+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
113+
for (ShardRouting shardRouting : indexShardRoutingTable) {
114+
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
115+
}
116+
}
117+
}
118+
119+
assertThat(counts.get(nodes.get(3)), anyOf(equalTo(2), equalTo(3)));
120+
assertThat(counts.get(nodes.get(2)), anyOf(equalTo(2), equalTo(3)));
121+
assertThat(counts.get(nodes.get(0)), anyOf(equalTo(2), equalTo(3)));
122+
assertThat(counts.get(nodes.get(1)), anyOf(equalTo(2), equalTo(3)));
123+
assertThat(counts.get(nodes.get(4)), anyOf(equalTo(2), equalTo(3)));
124+
assertThat(counts.get(nodes.get(5)), anyOf(equalTo(2), equalTo(3)));
125+
assertThat(counts.get(nodes.get(6)), anyOf(equalTo(2), equalTo(3)));
126+
assertThat(counts.get(nodes.get(7)), anyOf(equalTo(2), equalTo(3)));
127+
}
128+
129+
public void testAwarenessZonesIncrementalNodes() {
130+
Settings commonSettings = Settings.builder()
131+
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b")
132+
.put("cluster.routing.allocation.awareness.attributes", "zone")
133+
.build();
134+
135+
logger.info("--> starting 2 nodes on zones 'a' & 'b'");
136+
List<String> nodes = internalCluster().startNodes(
137+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
138+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
139+
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
140+
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
141+
);
142+
143+
setSearchDedicatedNodeSettings(nodes.get(2) + "," + nodes.get(3));
144+
145+
createIndex(
146+
"test",
147+
Settings.builder()
148+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
149+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
150+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
151+
.build()
152+
);
153+
154+
ensureGreen("test");
155+
156+
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
157+
Map<String, Integer> counts = new HashMap<>();
158+
159+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
160+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
161+
for (ShardRouting shardRouting : indexShardRoutingTable) {
162+
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
163+
}
164+
}
165+
}
166+
assertThat(counts.get(nodes.get(0)), equalTo(5));
167+
assertThat(counts.get(nodes.get(1)), equalTo(5));
168+
assertThat(counts.get(nodes.get(2)), equalTo(5));
169+
assertThat(counts.get(nodes.get(3)), equalTo(5));
170+
171+
logger.info("--> starting another node in zone 'b'");
172+
173+
String B_2 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
174+
String B_3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
175+
176+
setSearchDedicatedNodeSettings(nodes.get(2) + "," + nodes.get(3) + "," + B_3);
177+
178+
ensureGreen("test");
179+
180+
client().admin().cluster().prepareReroute().get();
181+
182+
ensureGreen("test");
183+
184+
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
185+
186+
counts = new HashMap<>();
187+
188+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
189+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
190+
for (ShardRouting shardRouting : indexShardRoutingTable) {
191+
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
192+
}
193+
}
194+
}
195+
196+
assertThat(counts.get(nodes.get(0)), equalTo(5));
197+
assertThat(counts.get(nodes.get(1)), equalTo(3));
198+
assertThat(counts.get(nodes.get(2)), equalTo(5));
199+
assertThat(counts.get(nodes.get(3)), equalTo(3));
200+
assertThat(counts.get(B_2), equalTo(2));
201+
assertThat(counts.get(B_3), equalTo(2));
202+
203+
String noZoneNode = internalCluster().startNode();
204+
ensureGreen("test");
205+
client().admin().cluster().prepareReroute().get();
206+
ensureGreen("test");
207+
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
208+
209+
counts = new HashMap<>();
210+
211+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
212+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
213+
for (ShardRouting shardRouting : indexShardRoutingTable) {
214+
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
215+
}
216+
}
217+
}
218+
219+
assertThat(counts.get(nodes.get(0)), equalTo(5));
220+
assertThat(counts.get(nodes.get(1)), equalTo(3));
221+
assertThat(counts.get(nodes.get(2)), equalTo(5));
222+
assertThat(counts.get(nodes.get(3)), equalTo(3));
223+
assertThat(counts.get(B_2), equalTo(2));
224+
assertThat(counts.get(B_3), equalTo(2));
225+
assertThat(counts.containsKey(noZoneNode), equalTo(false));
226+
227+
client().admin()
228+
.cluster()
229+
.prepareUpdateSettings()
230+
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build())
231+
.get();
232+
233+
ensureGreen("test");
234+
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
235+
236+
counts = new HashMap<>();
237+
238+
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
239+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
240+
for (ShardRouting shardRouting : indexShardRoutingTable) {
241+
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
242+
}
243+
}
244+
}
245+
246+
assertThat(counts.get(nodes.get(0)), equalTo(3));
247+
assertThat(counts.get(nodes.get(1)), equalTo(3));
248+
assertThat(counts.get(nodes.get(2)), equalTo(4));
249+
assertThat(counts.get(nodes.get(3)), equalTo(3));
250+
assertThat(counts.get(B_2), equalTo(2));
251+
assertThat(counts.get(B_3), equalTo(3));
252+
assertThat(counts.get(noZoneNode), equalTo(2));
253+
}
254+
}

server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.stream.Collectors;
2222

2323
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
24-
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2524

2625
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2726
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {
@@ -134,13 +133,4 @@ private IndexShardRoutingTable getRoutingTable() {
134133
private String getNodeName(String id) {
135134
return getClusterState().nodes().get(id).getName();
136135
}
137-
138-
private void setSearchDedicatedNodeSettings(String nodeName) {
139-
client().admin()
140-
.cluster()
141-
.prepareUpdateSettings()
142-
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
143-
.execute()
144-
.actionGet();
145-
}
146136
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
3131
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
3232
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
33-
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
3433
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
3534

3635
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -277,15 +276,6 @@ public void testUnableToAllocateRegularReplicaWontBlockSearchReplicaAllocation()
277276
assertActiveShardCounts(numSearchReplicas, 0);
278277
}
279278

280-
private void setSearchDedicatedNodeSettings(String nodeName) {
281-
client().admin()
282-
.cluster()
283-
.prepareUpdateSettings()
284-
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
285-
.execute()
286-
.actionGet();
287-
}
288-
289279
/**
290280
* Helper to assert counts of active shards for each type.
291281
*/

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.stream.Collectors;
6262
import java.util.stream.Stream;
6363

64+
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
6465
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
6566
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
6667
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
@@ -374,4 +375,13 @@ protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes,
374375
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
375376
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
376377
}
378+
379+
protected void setSearchDedicatedNodeSettings(String nodeName) {
380+
client().admin()
381+
.cluster()
382+
.prepareUpdateSettings()
383+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
384+
.execute()
385+
.actionGet();
386+
}
377387
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,9 +784,15 @@ private void recoverEmptyStore(IndexShard indexShard, Store store) throws IOExce
784784
indexShard.recoveryState().getIndex().setFileDetailsComplete();
785785
}
786786

787-
private void recoverLocalFiles(RecoveryState recoveryState, SegmentInfos si, Store store) throws IOException {
787+
private void recoverLocalFiles(RecoveryState recoveryState, SegmentInfos si, Store store) {
788788
final ReplicationLuceneIndex index = recoveryState.getIndex();
789-
addRecoveredFileDetails(si, store, index);
789+
try {
790+
if (si != null) {
791+
addRecoveredFileDetails(si, store, index);
792+
}
793+
} catch (IOException e) {
794+
logger.debug("failed to list file details", e);
795+
}
790796
index.setFileDetailsComplete();
791797
}
792798

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3050,7 +3050,7 @@ public void testRestoreSearchOnlyShardFromStore() throws IOException {
30503050
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so")
30513051
);
30523052
searchReplicaShardRouting = ShardRoutingHelper.initialize(searchReplicaShardRouting, replica.routingEntry().currentNodeId());
3053-
assertEquals(RecoverySource.EmptyStoreRecoverySource.INSTANCE, searchReplicaShardRouting.recoverySource());
3053+
assertEquals(RecoverySource.ExistingStoreRecoverySource.INSTANCE, searchReplicaShardRouting.recoverySource());
30543054
replica = reinitShard(replica, searchReplicaShardRouting);
30553055
recoverShardFromStore(replica);
30563056
assertDocs(replica, "1", "2");

0 commit comments

Comments
 (0)