RATIS-2174. Move future.join outside the lock
What changes were proposed in this pull request?
In recent observations, I found that some functions wait for a future while holding a lock, which is a very dangerous thing. To avoid the deadlock problem caused by holding the lock and waiting for the future, I am considering moving the future waiting outside the lock.
Currently, I found that calling changeToFollower in the following functions causes the above situation:
- RaftServerImpl.appendEntries
- RaftServerImpl.RequestVote
- checkAndInstallSnapshot
see https://issues.apache.org/jira/browse/RATIS-2174
@szetszwo please take a look. If you agree to write it this way, I will create an issue in jira.
@szetszwo @OneSizeFitsQuorum
Please check the comments below. could you help me refer to which one to use as the final solution? There are currently two main options:
- Move future.join outsize the lock (current commit)
- Use wait/notify (Code Block below)
Subject: [PATCH] use wait/notify
---
Index: ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java (revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java (date 1729085480985)
@@ -94,6 +94,9 @@
logAppender.restart();
}
closeFuture.complete(finalState);
+ synchronized (logAppender.getServer()) {
+ logAppender.getServer().notifyAll();
+ }
}
}
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java (revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java (date 1729086757702)
@@ -130,6 +130,9 @@
runImpl();
} finally {
stopped.complete(null);
+ synchronized (server) {
+ server.notifyAll();
+ }
}
}
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java (revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java (date 1729087936568)
@@ -565,16 +565,38 @@
}
}
+ static class Pair<U, V> {
+ private final U first;
+ private final V second;
+
+ private Pair(U first, V second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ static <U,V> Pair<U, V> makePair(U u, V v) {
+ return new Pair<>(u, v);
+ }
+
+ U first() {
+ return first;
+ }
+
+ V second() {
+ return second;
+ }
+ }
+
/**
* Change the server state to Follower if this server is in a different role or force is true.
* @param newTerm The new term.
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
- private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
+ private Pair<Boolean, CompletableFuture<Void>> changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
- changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
- return metadataUpdated.get();
+ CompletableFuture<Void> future = changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated);
+ return Pair.makePair(metadataUpdated.get(), future);
}
private synchronized CompletableFuture<Void> changeToFollowerAsync(
@@ -617,20 +639,37 @@
long newTerm,
boolean allowListener,
Object reason) throws IOException {
- if (changeToFollower(newTerm, false, allowListener, reason)) {
- state.persistMetadata();
+ Pair<Boolean, CompletableFuture<Void>> pair = changeToFollower(newTerm, false, allowListener, reason);
+ try {
+ if (pair.first()) {
+ state.persistMetadata();
+ }
+ } finally {
+ waitFutureAndJoin(pair.second());
}
}
- synchronized void changeToLeader() {
+ synchronized void waitFutureAndJoin(CompletableFuture<?> future) {
+ while (!future.isDone()) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ future.join();
+ }
+
+ synchronized CompletableFuture<Void> changeToLeader() {
Preconditions.assertTrue(getInfo().isCandidate());
- role.shutdownLeaderElection();
+ CompletableFuture<Void> future = role.shutdownLeaderElection();
setRole(RaftPeerRole.LEADER, "changeToLeader");
final LeaderStateImpl leader = role.updateLeaderState(this);
state.becomeLeader();
// start sending AppendEntries RPC to followers
leader.start();
+ return future;
}
@Override
@@ -1460,8 +1499,9 @@
final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
if (candidate != null && phase == Phase.ELECTION) {
// change server state in the ELECTION phase
- final boolean termUpdated =
- changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
+ Pair<Boolean, CompletableFuture<Void>> pair = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
+ final boolean termUpdated = pair.first;
+ waitFutureAndJoin(pair.second);
if (voteGranted) {
state.grantVote(candidate.getId());
}
Index: ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java (revision 62ae6d9f068ce01dd467a6fa83fb74dad9c2c8d8)
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java (date 1729087936578)
@@ -242,6 +242,9 @@
runImpl();
} finally {
stopped.complete(null);
+ synchronized (server) {
+ server.notifyAll();
+ }
}
}
@@ -256,7 +259,7 @@
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
- server.changeToLeader();
+ server.changeToLeader().join();
}
}
}
@133tosakarin , sorry for the delay. Will review this soon.
@133tosakarin , thanks a lot for working on this! The idea is great.
- We should avoid using
Optional. Usingfinalis better since the compiler will enforce that the variable must be initialized before use.- Let's pass an
AtomicBooleaninstead of adding the newPairclass. It will have much less code change.See https://issues.apache.org/jira/secure/attachment/13072253/1168_review.patch
Sze, thank you for your advice!
Our current commit is to move future.join out of the lock, but OneSizeFitsQuorum thinks there may be concurrency issues.
What do you think?
Suppose we have the following code:
synchronized(server) {
...
final CompletableFuture<Void> future = state.shudown()
f.join()
}
-
I agree with you that calling
join()at the end ofsynchronized(server)block seems useless since it just waits for thestatethread to terminate. If anything needs to be synchronized, it should be done inside theshutdown()method. Of course, thestatethread could besynchronized(server)in itsrun()method. The same synchronization will happen also in the non-shutdown cases. It there is a bug, the non-shutdown cases should also have the same bug. -
I could see that calling
join()at the middle ofsynchronized(server)may be useful in some cases -- the caller wants to wait of thestatethread to complete for some reasons such as getting a final result. However, we are not doing that in our shutdown code.
Suppose we have the following code:
synchronized(server) { ... final CompletableFuture<Void> future = state.shudown() f.join() }
- I agree with you that calling
join()at the end ofsynchronized(server)block seems useless since it just waits for thestatethread to terminate. If anything needs to be synchronized, it should be done inside theshutdown()method. Of course, thestatethread could besynchronized(server)in itsrun()method. The same synchronization will happen also in the non-shutdown cases. It there is a bug, the non-shutdown cases should also have the same bug.- I could see that calling
join()at the middle ofsynchronized(server)may be useful in some cases -- the caller wants to wait of thestatethread to complete for some reasons such as getting a final result. However, we are not doing that in our shutdown code.
Thank you for your valuable advice! We have reached a consensus
Regarding the question raised by OneSizeFitsQuorum, the concern might be that when thread A waits for the state to shut down outside the synchronized (server) block, it is possible that the state thread could modify some data of the server. When thread A is woken up, since some server data may have been changed by the state thread before its shutdown, the question is whether this could have any subsequent impact on thread A.
OneSizeFitsQuorum recommends writing it as follows:
For Thread A:
synchronized (server) {
future = state.shutdown();
while (!future.isDone()) {
server.wait();
}
}
For Thread State:
Future<Void> stopped;
...
try {
run();
} finally {
stopped.complete();
synchronized (server) {
server.notify();
}
}
In our code, the State thread may be LogAppender, FollowerState, LeaderElection
- Version 1
synchronized (server) {
future = state.shutdown();
while (!future.isDone()) {
server.wait();
}
}
try {
run();
} finally {
stopped.complete();
synchronized (server) {
server.notify();
}
}
- Version 2
synchronized (server) {
future = state.shutdown();
}
future.join()
try {
run();
} finally {
stopped.complete();
}
Version 1 and Version 2 seem to be the same. Could you point out when they will be different?
- Version 1
synchronized (server) { future = state.shutdown(); while (!future.isDone()) { server.wait(); } }try { run(); } finally { stopped.complete(); synchronized (server) { server.notify(); } }
- Version 2
synchronized (server) { future = state.shutdown(); } future.join()try { run(); } finally { stopped.complete(); }Version 1 and Version 2 seem to be the same. Could you point out when they will be different?
For version 1, we need to wait for another thread to wake up before continuing execution.
For version 2, the current thread and the thread that needs to be shut down may execute in parallel.
For version 2, the current thread and the thread that needs to be shut down may execute in parallel.
No. future.join() means that it is waiting for stopped to complete.
For version 2, the current thread and the thread that needs to be shut down may execute in parallel.
No.
future.join()means that it is waiting forstoppedto complete.
- Version 1
synchronized (server) { future = state.shutdown(); while (!future.isDone()) { server.wait(); } }try { run(); } finally { stopped.complete(); synchronized (server) { server.notify(); } }
- Version 2
synchronized (server) { future = state.shutdown(); } future.join()try { run(); } finally { stopped.complete(); }Version 1 and Version 2 seem to be the same. Could you point out when they will be different?
For version 1, we need to wait for another thread to wake up before continuing execution.
For version 2, the current thread and the thread that needs to be shut down may execute in parallel.
For version 2, the current thread and the thread that needs to be shut down may execute in parallel.
No.
future.join()means that it is waiting forstoppedto complete.
I seem to know!
Only when thread a enters future.join can the state thread execute.
So, is it possible to modify the content of the server when the state thread executes?
Consider a real-world scenario:
The leader, while executing synchronized changeToFollower(...), calls future.join() to wait for all logAppender tasks to complete. However, at that moment, the logAppender might already be blocked, waiting to acquire the lock held by changeToFollower. This situation leads to a deadlock.
Previously, we made a change to return a future from the shutdown method (it didn't have a return value before), but we overlooked an important aspect: the fact that synchronized is reentrant. Even if we release the lock held at the changeToFollower level, the upper-layer locks are still in place. Therefore, we need to ensure that all upper-layer locks are fully released before executing future.join.
This is why, in the current commit, I use CompletableFuture.runAsync(future::join) when the function returns midway or encounters an exception.
Of course, if the function returns midway or encounters an exception, we could simply ignore the future.
... . Even if we release the lock held at the changeToFollower level, the upper-layer locks are still in place. ...
This is a good. We need to make sure that it is not holding the server lock at all. Have you found any cases that https://issues.apache.org/jira/secure/attachment/13072253/1168_review.patch is still hold the server lock when calling join()?
One easy way checking it is to add the following Preconditions.
Preconditions.assertTrue(!Thread.holdsLock(server));
... . Even if we release the lock held at the changeToFollower level, the upper-layer locks are still in place. ...
This is a good. We need to make sure that it is not holding the server lock at all. Have you found any cases that https://issues.apache.org/jira/secure/attachment/13072253/1168_review.patch is still hold the server lock when calling join()?
Except for the last future.thenApply(dummy -> reply), it seems that the other instances are joining within the lock.
private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+ final CompletableFuture<Void> future;
synchronized (server) {
final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, InstallSnapshotResult.NOT_LEADER);
+ return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(
+ leaderId, getMemberId(), currentTerm, InstallSnapshotResult.NOT_LEADER));
}
- server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
+ future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
@@ -245,8 +250,9 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
+ final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+ return future.thenApply(dummy -> reply);
}
final RaftPeerProto leaderProto;
@@ -323,8 +329,9 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+ return future.thenApply(dummy -> reply);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
@@ -341,8 +348,9 @@ class SnapshotInstallationHandler {
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
+ return future.thenApply(dummy -> reply);
}
// Otherwise, Snapshot installation is in progress.
@@ -350,8 +358,9 @@ class SnapshotInstallationHandler {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
+ return future.thenApply(dummy -> reply);
}
One easy way checking it is to add the following
Preconditions.Preconditions.assertTrue(!Thread.holdsLock(server));
yes, we can do this check before call future.join()
Sorry, I don't understand this comment. In the code you posted, there is no join() calls at all.
BTW, future.thenApply(..) is not the same as join(). It still returns a un-joined future.
Sorry, I don't understand this comment. In the code you posted, there is no
join()calls at all.
Sorry, I don't understand this comment. In the code you posted, there is no
join()calls at all.
That's good! I'm sorry for misunderstanding the use of thenApply. With that clarification, there shouldn't be any issues.
BTW,
future.thenApply(..)is not the same asjoin(). It still returns a un-joined future.
Thank you very much for your explanation.
Remove join in FollowerState and LeaderElection.
For FollowerState, changing to Candidate will close FollowerState and then return to FollowerState.stopped, which is equivalent to waiting for oneself to close.
LeaderElection is similar to FollowerState. When changed to Leader, LeaderElection will be closed and then returned to LeaderElection.stop.
There seems to be a strange error: the address has already been used
There seems to be a strange error: the address has already been used
It happens occasionally. It may be a test problem (or library, OS bugs.) Not sure if there is a way to prevent it.
Let's simply rerun the tests for now.
#[d0cd3de]
In this commit [d0cd3de ] , the following modifications are mainly made
- Since Follower and Candidate close themselves, calling future.join at this time will deadlock, so cancel the corresponding return or ignore it.
- There are still some tests with LeakSize > 0, I'm not sure if this is related to the current modification.