Improve behaviour during network failure / downed nodes
I noticed a few things on the behaviour for network issues that could simplify / improve the system in such situations and wanted to share them to start a discussion on the topic.
The first thing I noticed is that RaftNetworkFactory::connect returns Network not Result<Netowrk> this prevent's the runtime from gracefully handling downed nodes.
The second thing I noticed is that when network errors occur, there is no backoff, meaning that in the case of an error, the system just keeps hammering the other side relentlessly.
This gave me the thought to perhaps treat the networking bit somewhat like erlang does for servers, meaning all calls become available, and errors will lead to eventual reinitialization and backoff.
This would allow moving the downed node behavior into the runtime and saving people from needing to implement reconnects etc., for themself.
So as a first thought:
If we make RaftNetworkFactory::connect failable, the raft engine can keep track of successful and unsuccessful connections. Unsuccessful connections can be re-initialized using a backoff strategy (exponential backoff with a limit, perhaps eventually a trait that could be passed to fine-tune this).
All errors in the network lead to the network state being set to downed, forcing a reconnect. (Not sure about this, this could also lead to problems perhaps some level of escalating grace period x errors for downed, or X errors in N time for downed would be betteR)
With those two things, the network becomes quite a bit easier to write, and a lot of the more complicated (and more error-prone) logic could be hoisted into the engine.
👋 Thanks for opening this issue!
Get help or engage by:
-
/help: to print help messages. -
/assignme: to assign this issue to you.
If we make
RaftNetworkFactory::connectfailable, the raft engine can keep track of successful and unsuccessful connections. Unsuccessful connections can be re-initialized using a backoff strategy (exponential backoff with a limit, perhaps eventually a trait that could be passed to fine-tune this).
By design, RaftNetworkFactory::connect() is not meant to let an application make a real TCP connection at once, but instead, it is meant to let an application create an instance that will actually build a TCP connection when one of send_append_entries(), send_vote() or send_install_snapshot() is called.
Thus the error handling such as backoff should be implemented inside send_append_entries() and is application specific.
Is this a way that will solve your problem?
You are right; the method name connect() is slightly misleading.
All errors in the network lead to the network state being set to downed, forcing a reconnect. (Not sure about this, this could also lead to problems perhaps some level of escalating grace period x errors for downed, or X errors in N time for downed would be betteR)
Like the above, an application can track network issues inside their implementation, such as inside an implementation of send_append_entries().
Is this OK for you?
Oh, that connect just initializes a stand-by network makes a lot of sense for the signature. I still would prefer a Result to be honest for nothing else as it allows other things to be available (for example, address parsing) or allowing for perform validation on the Network to ensure it's sensible configured but that's really a low priority preference :)
As to having logic in the implementation I'm honestly torn, I mean it's OK but I am conflicted if it's "good" or "optional".
The conflicting thoughts on that I have are.
On one side complex applications will probably want a high degree of control - so they will want to do a lot of the logic themsefls.
On the other side, much of network connection handling logic will be very similar and the raft engine knows more about the state than the application (i.e. it could optimize for long-lasting vs one-off connections from knowledge of ring state and how nodes communicate). Also for new applications, I totally see a big benefit in abstracting part of that away, but perhaps the solution here is not make the engine do it to provide something like rocks-store just as tcp-network, or websocket-netowrk that is a bit of a drop in networking for people that deem it as "good enough"?
Oh, that connect just initializes a stand-by network makes a lot of sense for the signature. I still would prefer a Result to be honest for nothing else as it allows other things to be available (for example, address parsing) or allowing for perform validation on the Network to ensure it's sensible configured but that's really a low priority preference :)
If a function returns an error, it assumes the caller can handle it appropriately.
But openraft can not deal with an application error(it could be any error), the only thing it can do is to stop running.
Thus from this point, the application can just panic() if it encounters an error when building a network instance, IMHO.
On the other side, much of network connection handling logic will be very similar and the raft engine knows more about the state than the application (i.e. it could optimize for long-lasting vs one-off connections from knowledge of ring state and how nodes communicate).
Hmm... I'm afraid I can not agree that the raft engine knows more than the network implementation. Everything about the network raft knows can only be reported by the network implementation. Thus a decision can just be made by the implementation itself.
Actually in one of my project there is an Arc<ConnectionPool> in the RaftNetwork implementation. This way it can create a persistent connection or just one-shot connection by its need.
BTW, extracting common logic from these 3 RaftNetwork methods into a separate function does not look like difficult to me. :DD
After a night, it feels RaftNetworkFactory::connect() returning a Result would be better.
As you said, there may be misconfigured node addresses being used. In such cases, the raft has a chance to discover it early. But still, the raft can do nothing about such an error, except print an error log.
Using Result instead of panic() will let the cluster keep working even when there is only a minority of misconfigured nodes.
Yay :)
Looking more into the second part of it, I don't think backoffs can be implemented in the application, if a request takes too long I get these errors:
[2022-07-21T09:23:12Z ERROR openraft::replication] error replication to target=3141107818908514662 error=timeout after 50ms when AppendEntries 0->3141107818908514662
(why they take too long is a different question :P)
but since it timeouts after 50ms the backoff for failure can't be done in the application, any backoff would simply be overruled by the timeout. So that is something where extending the engine seems to be required?
Yay :)
Looking more into the second part of it, I don't think backoffs can be implemented in the application, if a request takes too long I get these errors:
[2022-07-21T09:23:12Z ERROR openraft::replication] error replication to target=3141107818908514662 error=timeout after 50ms when AppendEntries 0->3141107818908514662
The timeout is controlled by the heartbeat interval, i.e, an RPC to send entries or to send a heartbeat(an empty send-entries request) should not take longer than the interval between two heartbeats:
https://github.com/datafuselabs/openraft/blob/d40dabe684d86b7e1b30807cba7f8bd5c8464493/openraft/src/replication/mod.rs#L326
It's possible to add another config to control the max timeout for an RPC to solve this problem. This way an application can have a backoff algorithm that takes a long time.
But this will introduce another problem: a follower that does not receive an RPC from the leader will switch to a candidate state and starts to elect.
In other words, no matter what the backoff algorithm is, a send-entries or heartbeat RPC that can not finish in heartbeat interval will always break the cluster and result in a new round of election.
(why they take too long is a different question :P)
(I do not care:D)
but since it timeouts after
50msthe backoff for failure can't be done in the application, any backoff would simply be overruled by the timeout. So that is something where extending the engine seems to be required?
As I explained, a backoff should never exceed the heartbeat interval or the leadership will lose
Oh yes a timeout to a "functioning" node is not the scenario I'm concerned about, I'm thinking about a scenario where for (reason) a node in a cluster (or two) are down. Let me draw the worst case scenario to illustrate.
We have a 7 node cluster. All set up and running smoonthy.
Now we encoutner one of those dreaded network partitions (Oh no!), and split the cluster in two, for simplicities sake lets say side A (1,2,3,4) and and side B (5,6,7)
On both sides at least one of the nodes will now start hammering the network every heartbeat_interval trying to reach the other nodes getting network failures over and over again.
A should to re-elect, a leader (yay!) but B won't (that's fine). But even after the leader election we still have the problem that said leader will keep trying to connect 5-7 every 50ms (default) or in other words, try to open 60 (3x1000/50) network connections every second which will not help the system health and potentially make the failure situation worse. 60 new connections per second aren't terrible, but the problem gets worse with every new member in the cluster (and I'm not sure how it behaves on the B side where no leader is elected, I suspect all nodes will try to reach out?).
This is where I feel a backoff strategy would make a lot of sense, it is what I would implement in a client library to save the upstream service from being overflooded with errors or the requesting system to get clogged up with network problems.
I don't think this would make short term worse, as the current timeout already equals the heartbeat interval so if it is hit we're in re-election territory anyway so missing one or five heartbeats doesn't make much of a difference at that point
EDIT:
Thinking about implementing a backoff in application space. I don't think that is possible in a sensible way, it the only thing I could see is setting heartbeat_interval to a absurdly high number (a few seconds) and sleep-ing in every request to back off. This would kind of get the effect of a backoff but only be introducing requests that are ridiculously slow not by not sending requests to ease up on the other side. It would also make the cluster less stable / slower to react to failure. That could probably be circumvented by separating rpc_timeout and heartbeat_interval but it would feel like a bit of a hacky solution to the whole thing
Ashould to re-elect, a leader (yay!) but B won't (that's fine). But even after the leader election we still have the problem that said leader will keep trying to connect 5-7 every 50ms (default) or in other words, try to open 60 (3x1000/50) network connections every second which will not help the system health and potentially make the failure situation worse. 60 new connections per second aren't terrible, but the problem gets worse with every new member in the cluster.
An application could record the last several failures to send out RPC, and if the application believes there are too many errors, it could sleep for every to stop sending out any RPC. Then the raft will timeouts this RPC, and will retry later around. E.g.:
impl RaftNetwork for AppNetwork {
fn send_vote(&mut self, vote) {
if self.failures > 3 {
sleep(10000000).await;
}
let res = self.tcp.send(vote).await;
if res.is_err() { self.failures+=1; ) else { self.failures = 0; };
res
}
}
This way, you do not have to set heartbeat_interval or other configs to an insane value to implement a backoff behavior.
(and I'm not sure how it behaves on the B side where no leader is elected, I suspect all nodes will try to reach out?).
Yes, nodes in sub-cluster B will try to elect and will always fail.
I don't think that would work. When I read the code correctly that timeout would simply lead to a lot of them piling up and being executed later - as the vote is spawed in a new task and doesn't prevent multiple of those tasks to be spawned in parallel.
What I could see is returning an error on each call instead of sending; perhaps that's an option? Perhaps with adding something like ReplicationError::Backoff ?
I don't think that would work. When I read the code correctly that timeout would simply lead to a lot of them piling up and being executed later - as the vote is spawed in a new task and doesn't prevent multiple of those tasks to be spawned in parallel.
Right. vote request should have timeout control. This is a missing feature :(. It should be done the same as append-entries does: https://github.com/datafuselabs/openraft/blob/062d24cbdeb4939469a400772e494554be4232b6/openraft/src/core/raft_core.rs#L390-L409
What I could see is returning an error on each call instead of sending; perhaps that's an option? Perhaps with adding something like
ReplicationError::Backoff?
When the raft sees a BackOff error, what does it do with it?
As I can see, it is application specific. The raft could just stop calling RaftNetwork API for a while. But what if the application wants to retry sending out some data to check if the network is restored?
IMHO, it's the raft's responsibility to tell the application what should be done, and it's the application's responsibility to decide how to do it. Raft just needs to ensure everything goes well no matter what the application's decision is, e.g., not to pile up too many unfinished tasks.
The way append-entries we're back to the problem that the ttl overrides the timeout in the application, if ttl is 50ms and the application wants to delay for 1s the delay would be canceled after 50ms bringing it to the problem of having set silly long ttl for requests :(
async-timeouts cancel the offending future if the timeout is hit so any sleep in there will simply be canceled out ahead of time :(
What I was thinking is that backoff could signal a duration for how long the raft engine shouldn't try to communicate with the node in question. That sounds like somewhat of a decent compromise, it leaves all logic related to backoffs in the application while allowing the application to truly stop the engine from trying to communicate with a node.
IMHO, it's the raft's responsibility to tell the application what should be done, and it's the application's responsibility to decide how to do it.
I kind of agree :D that's why my initial thought was that backoff could live in the engine, that way the engine tells the application when it's not OK to keep ~~beating a dead horse~~ bothering a dead host, the application the only would be responsible for the communication when things are fine. It sure would make it easier to get a good :tm: implementation at the cost of not allowing to super specialize it unless the strategy is trained out.
EDIT:
Don't get me wrong, I totally get where you're coming from, this is right at the edge of where the application should be responsible and the engine should be responsible. My main problem is that in the current implementation, timeouts and spawns make it pretty much impossible for the application to take responsibility as it's decisions will be overwritten by either a timeout or a spawn, and the engine doesn't handle backoffs either.
I'm following your discussion, since we have a similar problem in our project - we don't want to send unnecessary messages (and we don't even want to send empty ping messages, see below).
We have a different method for keeping "liveness" info of a link between two nodes. As long as the link is deemed "live" any consensus domains communicating via this link (we have replicas of multiple/many consensus domains hosted on the same node) we expect the message to ultimately go through. As soon as the "liveness" property is not guaranteed anymore, we can inform all affected consensus domains about the fact and thus for example let them start a new election after the timeout. I.e., instead of empty ping messages, we have external liveness check.
I didn't look into the implementation in openraft yet, but this external liveness check could be likely integrated fairly simply instead of traditional heartbeat. Then, as soon as the link is deemed down by the external agent (which can do for example its own heartbeat on behalf of multiple consensus domains), we can tell all affected local openraft replicas, so they can start their "usual" handling of replication timeout (and start an election on a follower).
@Licenser Maybe you could live with something like this as well? Not sure about your use case. But, as I wrote, this is in a concept phase and it isn't implemented yet, we wanted to start with simple openraft and initially use UDP directly to transport messages.
The way
append-entrieswe're back to the problem that thettloverrides the timeout in the application, ifttlis 50ms and the application wants to delay for 1s the delay would be canceled after 50ms bringing it to the problem of having set silly longttlfor requests :(
The heartbeat interval should be longer than the time a normal RPC takes. So that raft could work correctly.
If an application tries to delay 1s while the heartbeat interval is 50ms, it could just sleep for the next 20 heartbeat RPC. And raft is responsible to guarantee no piling up.
async-timeouts cancel the offending future if the timeout is hit so any sleep in there will simply be canceled out ahead of time :(
Right. The application needs to have a global backoff sleep time. Not just a per-RPC sleep time.
What I was thinking is that backoff could signal a duration for how long the raft engine shouldn't try to communicate with the node in question. That sounds like somewhat of a decent compromise, it leaves all logic related to backoffs in the application while allowing the application to truly stop the engine from trying to communicate with a node.
If to implement such a feature, it will just be a wrapper of RaftNetwork, e.g.,:
struct BackoffNetwork<N: RaftNetwork> {
backoff: Instant,
inner: N,
}
impl<N> RaftNetwork for BackoffNetwork<N> {
fn send_append_entries(&mut self, req) -> Result<> {
if Instant::now() < self.backoff {
sleep(100000000).await;
}
let res = self.inner.send_append_entries(req).await;
if let RPCError::Backoff(sleep_time) = 7res {
self.backoff = Instant::now() + sleep_time;
} else {
// no need to backoff
self.backoff = Instant::now();
}
res
}
}
And if it is just a wrapper(and it meets your need:D), it can be just implemented by an application without introducing a wrapper.
IMHO, it's the raft's responsibility to tell the application what should be done, and it's the application's responsibility to decide how to do it.
I kind of agree :D that's why my initial thought was that backoff could live in the engine, that way the engine tells the application when it's not OK to keep ~beating a dead horse~ bothering a dead host, the application the only would be responsible for the communication when things are fine. It sure would make it easier to get a good ™️ implementation at the cost of not allowing to super specialize it unless the strategy is trained out.
May the network wrapper above meet your need to separate backoff and sending-rpc?
EDIT:
My main problem is that in the current implementation, timeouts and spawns make it pretty much impossible for the application to take responsibility as it's decisions will be overwritten by either a timeout or a spawn, and the engine doesn't handle backoffs either.
Right. That's why the backoff time should be global instead of per-RPC.
Sorry for the late reply, the last few days have been hectic. That's a really neat idea, I never thought about that kind of layering, and I really like it! It would still give timeout errors in the logs but it's a good stop-gap :).
We've implemented something similar to this:
struct BackoffNetwork<N: RaftNetwork> { backoff: Instant, inner: N, } impl<N> RaftNetwork for BackoffNetwork<N> { fn send_append_entries(&mut self, req) -> Result<> { if Instant::now() < self.backoff { sleep(100000000).await; } let res = self.inner.send_append_entries(req).await; if let RPCError::Backoff(sleep_time) = 7res { self.backoff = Instant::now() + sleep_time; } else { // no need to backoff self.backoff = Instant::now(); } res } }
We still hit the 50ms heartbeat timeout. Why would this wrapper let the app sleep? Doesn't BackoffNetwork's send_append_entries have its own 50ms timeout to deal with since it also implements the RaftNetwork trait?
We still hit the 50ms heartbeat timeout. Why would this wrapper let the app sleep? Doesn't BackoffNetwork's send_append_entries have its own 50ms timeout to deal with since it also implements the RaftNetwork trait?
If send_append_entries() does not return in 50ms, it(the Future running send_append_entries() will be dropped, and a timeout error will occur. It's as expected IMHO, No? 🤔
Since I'm still (again) dealing with this.
I think the issue people are trying to deal with here is that if there is a node down, other nodes will create an error log every 50ms which is computationally and IO costly and also makes it really hard to understand the error as any meaningful message. Basically a single downed/incapacitated node now harms the entire cluster by spamming all logs with repeated messages.
This is where the idea of a timeout came from, for an operator there is no benefit of seeing 20 messages/second of RPCError err=timeout after 50ms when AppendEntries and for a network perspective there is no benefit of re-trying a connection 20 times a second if it's down.
The network part can be dealt with (as described above) by adding a sleep in the RaftNetwork implementation but the first problem, the spammy logs, is impossible to fix / adjust for a user of the library.
Moving the timeout into the core library would solve both problems, but if that's not the goal that's understandable too it makes it a bit harder to use but more flexible which is a fair traidoff, that said as the library exists now the spam problem is impossible to fix which makes it really hard to use it in any production capacity.
To elaborate a bit further, with backoff handled in implementation there is no way for the library to decide if a timeout occurs through a new error or through a backoff that's where moving that logic up would be helpful.
This is where the idea of a timeout came from, for an operator there is no benefit of seeing 20 messages/second of
RPCError err=timeout after 50ms when AppendEntriesand for a network perspective there is no benefit of re-trying a connection 20 times a second if it's down.
@Licenser You're correct, I hadn't considered the possibility of error message flooding. In this case, the replication mod should be able to handle nodes that are unreachable.
Such an error will be returned to openraft in RPCError:
https://github.com/drmingdrmer/openraft/blob/17a658f41ba193609ee3ac626149ef50592d937c/openraft/src/error.rs#L245-L254
My initial proposal is to allow an implementation of RaftNetwork::send_append_entries() to return a designated error(Unreachable(NodeId)) that indicates a delay in retrying the operation. And let openraft enable users to define their own backoff algorithm or use a default one.
Opinion?
That sounds like a great solution, that way the implementation can return Unreachable(NodeId) if it is in "backoff mode" and then try to re-connect when it gets the next error! Brilliant :D I love it!
BTW, I'm wondering if the backoff is the right way to go. It depends on the network implementation. If you have a relatively low volume of updates and use, for instance, UDP to send the messages, then yes, this will likely work very well.
What I am worried about is when you have a lot of data to send. Then, the server will retry sending entire missing log at once. Maybe it would be better to allow the connection to report a partial success. Say, I want to send 100 entries, 50 are through and then the timeout hits. The next time, we'll try to re-send all 100 entries (and some) again, so the timeout hits after 50 entries again. And we'll never make progress. Of course, this can likely be handled in the Network implementation by noting down the last successful entry index before the send was killed by the timeout and next time just sending the rest.
But, it might be more desirable to pass the ttl to send_append_entries() instead of limiting the time to send from the outside. As long as there is progress sending individual entries, the call would not time out. I.e., instead of timeouting the entire call of sending 100 entries, you'd time out sending individual entries in the network implementation. This could be made even compatible, by invoking a default-implemented wrapper method send_append_entries_with_timeout(), which would by default just call timeout(ttl, send_append_entries(...)).await.
Any take on this?