Skip to content

Commit

Permalink
SOLR-16116: Use apache curator to manage the Solr Zookeeper interacti…
Browse files Browse the repository at this point in the history
…ons (#760)

Co-authored-by: Kevin Risden <[email protected]>
  • Loading branch information
HoustonPutman and risdenk authored Oct 30, 2024
1 parent c2091d0 commit e5d15cc
Show file tree
Hide file tree
Showing 86 changed files with 1,025 additions and 2,163 deletions.
2 changes: 2 additions & 0 deletions gradle/testing/randomization/policies/solr-tests.policy
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ grant {
permission java.net.SocketPermission "127.0.0.1:4", "connect,resolve";
permission java.net.SocketPermission "127.0.0.1:6", "connect,resolve";
permission java.net.SocketPermission "127.0.0.1:8", "connect,resolve";
// Used as an invalid ZK host
permission java.net.SocketPermission "----------:33332", "connect,resolve";

// Basic permissions needed for Lucene to work:
permission java.util.PropertyPermission "*", "read,write";
Expand Down
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Improvements

* SOLR-17077: When a shard rejoins leader election, leave previous election only once to save unneeded calls to Zookeeper. (Pierre Salagnac)

* SOLR-16116: Apache Curator is now used to manage all Solr Zookeeper interactions. This should provide more stability in the Solr-Zookeeper interactions.
The solrj-zookeeper module, now has a dependency on curator. (Houston Putman, Kevin Risden, Mike Drob, David Smiley)

Optimizations
---------------------
(No changes)
Expand Down
11 changes: 10 additions & 1 deletion solr/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ dependencies {
api project(':solr:solrj-zookeeper')
api project(':solr:solrj-streaming')


api 'io.dropwizard.metrics:metrics-core'
implementation ('io.dropwizard.metrics:metrics-graphite', {
exclude group: "com.rabbitmq", module: "amqp-client"
Expand Down Expand Up @@ -125,6 +124,16 @@ dependencies {
implementation 'org.eclipse.jetty.toolchain:jetty-servlet-api'

// ZooKeeper

implementation('org.apache.curator:curator-framework', {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
})
implementation('org.apache.curator:curator-client', {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
})
testImplementation('org.apache.curator:curator-test', {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
})
implementation('org.apache.zookeeper:zookeeper', {
exclude group: "org.apache.yetus", module: "audience-annotations"
})
Expand Down
7 changes: 3 additions & 4 deletions solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.cloud.ZkController.ContextKey;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkMaintenanceUtils;
Expand Down Expand Up @@ -107,7 +106,7 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement

// If any double-registrations exist for me, remove all but this latest one!
// TODO: can we even get into this state?
String prefix = zkClient.getZooKeeper().getSessionId() + "-" + context.id + "-";
String prefix = zkClient.getZkSessionId() + "-" + context.id + "-";
Iterator<String> it = seqs.iterator();
while (it.hasNext()) {
String elec = it.next();
Expand Down Expand Up @@ -227,7 +226,7 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo

final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;

long sessionId = zkClient.getZooKeeper().getSessionId();
long sessionId = zkClient.getZkSessionId();
String id = sessionId + "-" + context.id;
String leaderSeqPath = null;
boolean cont = true;
Expand Down Expand Up @@ -348,7 +347,7 @@ public void process(WatchedEvent event) {
try {
// am I the next leader?
checkIfIamLeader(context, true);
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {

} catch (Exception e) {
if (!zkClient.isClosed()) {
Expand Down
20 changes: 9 additions & 11 deletions solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void run() {
// the workQueue is empty now, use stateUpdateQueue as fallback queue
fallbackQueue = stateUpdateQueue;
fallbackQueueSize = 0;
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {
return;
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
Expand All @@ -342,7 +342,7 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {

} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
Expand Down Expand Up @@ -402,7 +402,7 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {

} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
Expand All @@ -414,7 +414,9 @@ public void run() {
log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId));
}
// do this in a separate thread because any wait is interrupted in this main thread
new Thread(this::checkIfIamStillLeader, "OverseerExitThread").start();
Thread checkLeaderThread = new Thread(this::checkIfIamStillLeader, "OverseerExitThread");
checkLeaderThread.setDaemon(true);
checkLeaderThread.start();
}
}

Expand Down Expand Up @@ -480,7 +482,7 @@ private void checkIfIamStillLeader() {
byte[] data;
try {
data = zkClient.getData(path, null, stat, true);
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {
return;
} catch (Exception e) {
log.warn("Error communicating with ZooKeeper", e);
Expand Down Expand Up @@ -634,7 +636,7 @@ private LeaderStatus amILeader() {
} catch (InterruptedException e) {
success = false;
Thread.currentThread().interrupt();
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {
success = false;
} catch (Exception e) {
success = false;
Expand Down Expand Up @@ -1047,11 +1049,7 @@ ZkDistributedQueue getOverseerQuitNotificationQueue() {
*/
ZkDistributedQueue getStateUpdateQueue(Stats zkStats) {
return new ZkDistributedQueue(
reader.getZkClient(),
"/overseer/queue",
zkStats,
STATE_UPDATE_MAX_QUEUE,
() -> Overseer.this.isClosed() || zkController.getCoreContainer().isShutDown());
reader.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.function.Predicate;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
Expand Down Expand Up @@ -188,7 +187,7 @@ public void run() {
// We don't need to handle this. This is just a fail-safe which comes in handy in skipping
// already processed async calls.
log.error("KeeperException", e);
} catch (AlreadyClosedException e) {
} catch (IllegalStateException ignore) {
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -202,7 +201,7 @@ public void run() {

try {
prioritizer.prioritizeOverseerNodes(myId);
} catch (AlreadyClosedException e) {
} catch (IllegalStateException ignore) {
return;
} catch (Exception e) {
if (!zkStateReader.getZkClient().isClosed()) {
Expand Down Expand Up @@ -395,7 +394,7 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (AlreadyClosedException ignore) {
} catch (IllegalStateException ignore) {

} catch (Exception e) {
log.error("Exception processing", e);
Expand Down Expand Up @@ -602,7 +601,7 @@ public void run() {
response.getResponse());
}
success = true;
} catch (AlreadyClosedException ignore) {
} catch (IllegalStateException ignore) {

} catch (KeeperException e) {
log.error("KeeperException", e);
Expand All @@ -617,7 +616,7 @@ public void run() {
// Reset task from tracking data structures so that it can be retried.
try {
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
} catch (AlreadyClosedException ignore) {
} catch (IllegalStateException ignore) {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.lang.invoke.MethodHandles;
import java.util.List;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
Expand All @@ -36,11 +38,6 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.OpResult.SetDataResult;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,10 +103,11 @@ public void cancelElection() throws InterruptedException, KeeperException {
leaderPath,
leaderZkNodeParentVersion);
String parent = ZkMaintenanceUtils.getZkParent(leaderPath);
List<Op> ops =
List.of(Op.check(parent, leaderZkNodeParentVersion), Op.delete(leaderPath, -1));
zkClient.multi(ops, true);
zkClient.multi(
op -> op.check().withVersion(leaderZkNodeParentVersion).forPath(parent),
op -> op.delete().withVersion(-1).forPath(leaderPath));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (IllegalArgumentException e) {
log.error("Illegal argument", e);
Expand Down Expand Up @@ -144,27 +142,25 @@ void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
// be used to make sure we only remove our own leader registration node.
// The setData call used to get the parent version is also the trigger to
// increment the version. We also do a sanity check that our leaderSeqPath exists.
List<Op> ops =
List.of(
Op.check(leaderSeqPath, -1),
Op.create(
leaderPath,
Utils.toJSON(leaderProps),
zkClient.getZkACLProvider().getACLsToAdd(leaderPath),
CreateMode.EPHEMERAL),
Op.setData(parent, null, -1));
List<OpResult> results;

results = zkClient.multi(ops, true);
for (OpResult result : results) {
if (result.getType() == ZooDefs.OpCode.setData) {
SetDataResult dresult = (SetDataResult) result;
Stat stat = dresult.getStat();
leaderZkNodeParentVersion = stat.getVersion();
return;
}
}
assert leaderZkNodeParentVersion != null;
List<CuratorTransactionResult> results =
zkClient.multi(
op -> op.check().withVersion(-1).forPath(leaderSeqPath),
op ->
op.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(leaderPath, Utils.toJSON(leaderProps)),
op -> op.setData().withVersion(-1).forPath(parent, null));
leaderZkNodeParentVersion =
results.stream()
.filter(
CuratorTransactionResult.ofTypeAndPath(OperationType.SET_DATA, parent))
.findFirst()
.orElseThrow(
() ->
new RuntimeException(
"Could not set data for parent path in ZK: " + parent))
.getResultStat()
.getVersion();
}
});
} catch (NoNodeException e) {
Expand Down
Loading

0 comments on commit e5d15cc

Please sign in to comment.