Replicator fails to gracefully restart replication on sink error and recovery
Describe the bug Replicator fails to gracefully retry if there is a sink error. What makes this a fatal condition for us is that the replicator seems to stall indefinitely, and never recover - while appearing to be healthy.
At a bare minimum, if this is a terminal state, it should terminate itself so our orchestrator (in our case AWS ECS) can restart the container.
At the moment, there appears to be no way to (autonomously) determine if the replicator is still healthy and perform restart. Therefore, our replication pipeline is completely stalled until we manually restart the replicator - with potentially days elapsed between stalls.
To Reproduce Steps to reproduce the behavior:
- Run a single node kurrent instance
- Sink to the single-instance via grpc
- Cause a temporary connection failure (firewall block)
- Note that once the connection failure is resolved, the replicator does not seem to continue processing. The process remains running
Expected behavior The replicator should tolerate sink failures (via grpc) and resume once the sink is back-online
Actual behavior The replicator seems to stall indefinitely...
- Operating system:
- AWS ECS With the docker container
- EventStore client version (if applicable):
- 24.10
Additional context
We're using the replicator docker container: eventstore/replicator:latest
{"@t":"2025-11-19T20:18:55.8277873Z","@m":"Error: \"Status(StatusCode=\\\"DeadlineExceeded\\\", Detail=\\\"Timeout\\\")\", will fail","@i":"02639a3a","@l":"Error","@x":"Grpc.Core.RpcException: Status(StatusCode=\"DeadlineExceeded\", Detail=\"Timeout\")\n at EventStore.Client.Streams.BatchAppendResp.ToWriteResult()\n at EventStore.Client.EventStoreClient.StreamAppender.<Duplex>g__Receive|13_1()\n at EventStore.Client.EventStoreClient.StreamAppender.<>c__DisplayClass12_0.<<AppendInternal>g__Operation|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at EventStore.Client.Diagnostics.ActivitySourceExtensions.TraceClientOperation[T](ActivitySource source, Func`1 tracedOperation, String operationName, Func`1 tagsFactory)\n at EventStore.Client.EventStoreClient.AppendToStreamAsync(String streamName, StreamState expectedState, IEnumerable`1 eventData, Action`1 configureOperationOptions, Nullable`1 deadline, UserCredentials userCredentials, CancellationToken cancellationToken)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.<>c__DisplayClass4_0.<<WriteEvent>g__AppendEvent|0>d.MoveNext() in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 38\n--- End of stack trace from previous location ---\n at Ubiquitous.Metrics.Metrics.MeasureTask[T](Func`1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.WriteEvent(BaseProposedEvent proposedEvent, CancellationToken cancellationToken) in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 23\n at Kurrent.Replicator.Sink.SinkPipelineExtensions.<>c__DisplayClass0_0.<<UseEventWriter>b__0>d.MoveNext() in /app/src/Kurrent.Replicator/Sink/SinkPipe.cs:line 57\n--- End of stack trace from previous location ---\n at GreenPipes.Filters.AsyncDelegateFilter`1.<>c__DisplayClass3_0.<<Send>g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at Kurrent.Replicator.LoggingFilter`1.Send(T context, IPipe`1 next) in /app/src/Kurrent.Replicator/Logging.cs:line 12\n at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)","Error":"Status(StatusCode=\"DeadlineExceeded\", Detail=\"Timeout\")","SourceContext":"Kurrent.Replicator.Observers.LoggingRetryObserver"}
{"@t":"2025-11-19T20:18:55.8252671Z","@m":"Error occured in the \"SinkContext\" pipe EventDetails { Stream: \"CreditorApplication:fe339a23-66b6-470a-9059-11ad4b28267f_0dae62c0-1b5a-4027-98e2-a589ff9f657b\", EventId: 8eea35e1-c0da-4a1c-9841-c04711105e19, EventType: \"CreditorCommentReceived\", ContentType: \"application/json\" }: \"Status(StatusCode=\\\"Unavailable\\\", Detail=\\\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\\\", DebugException=\\\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\\\")\"","@i":"13462698","@l":"Error","@x":"Grpc.Core.RpcException: Status(StatusCode=\"Unavailable\", Detail=\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\", DebugException=\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\")\n ---> System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\n ---> System.Net.Sockets.SocketException (111): Connection refused\n at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)\n at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)\n at System.Net.Sockets.Socket.<ConnectAsync>g__WaitForConnectWithCancellation|285_0(AwaitableSocketAsyncEventArgs saea, ValueTask connectTask, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)\n --- End of inner exception stack trace ---\n at System.Net.Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.InjectNewHttp2ConnectionAsync(QueueItem queueItem)\n at System.Threading.Tasks.TaskCompletionSourceWithCancellation`1.WaitWithCancellationAsync(CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)\n at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpClient.<SendAsync>g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)\n at Grpc.Net.Client.Internal.GrpcCall`2.RunCall(HttpRequestMessage request, Nullable`1 timeout)\n --- End of inner exception stack trace ---\n at EventStore.Client.Interceptors.TypedExceptionInterceptor.<>c__DisplayClass1_0.<.ctor>b__0(RpcException rpcEx)\n at EventStore.Client.Interceptors.RpcExceptionConversionExtensions.<>c__DisplayClass1_0`1.<Apply>b__0(Task`1 t)\n at System.Threading.Tasks.ContinuationResultTaskFromResultTask`2.InnerInvoke()\n at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)\n--- End of stack trace from previous location ---\n at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)\n at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)\n--- End of stack trace from previous location ---\n at EventStore.Client.GrpcServerCapabilitiesClient.GetAsync(CallInvoker callInvoker, CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClientBase.GetChannelInfoExpensive(ReconnectionRequired reconnectionRequired, Action`1 onReconnectionRequired, IChannelSelector channelSelector, CancellationToken cancellationToken)\n at EventStore.Client.SharingProvider`2.FillBoxAsync(TaskCompletionSource`1 box, TInput input)\n at EventStore.Client.TaskExtensions.WithCancellation[T](Task`1 task, CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClientBase.GetChannelInfo(CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClient.StreamAppender.Duplex(ValueTask`1 channelInfoTask)\n at EventStore.Client.EventStoreClient.AppendToStreamAsync(String streamName, StreamState expectedState, IEnumerable`1 eventData, Action`1 configureOperationOptions, Nullable`1 deadline, UserCredentials userCredentials, CancellationToken cancellationToken)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.<>c__DisplayClass4_0.<<WriteEvent>g__AppendEvent|0>d.MoveNext() in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 38\n--- End of stack trace from previous location ---\n at Ubiquitous.Metrics.Metrics.MeasureTask[T](Func`1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.WriteEvent(BaseProposedEvent proposedEvent, CancellationToken cancellationToken) in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 23\n at Kurrent.Replicator.Sink.SinkPipelineExtensions.<>c__DisplayClass0_0.<<UseEventWriter>b__0>d.MoveNext() in /app/src/Kurrent.Replicator/Sink/SinkPipe.cs:line 57\n--- End of stack trace from previous location ---\n at GreenPipes.Filters.AsyncDelegateFilter`1.<>c__DisplayClass3_0.<<Send>g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at Kurrent.Replicator.LoggingFilter`1.Send(T context, IPipe`1 next) in /app/src/Kurrent.Replicator/Logging.cs:line 12","Type":"SinkContext","Event":{"Stream":"CreditorApplication:fe339a23-66b6-470a-9059-11ad4b28267f_0dae62c0-1b5a-4027-98e2-a589ff9f657b","EventId":"8eea35e1-c0da-4a1c-9841-c04711105e19","EventType":"CreditorCommentReceived","ContentType":"application/json","$type":"EventDetails"},"Message":"Status(StatusCode=\"Unavailable\", Detail=\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\", DebugException=\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\")","SourceContext":"Kurrent.Replicator.LoggingFilter`1[T]"}
{"@t":"2025-11-19T20:18:55.8277873Z","@m":"Error: \"Status(StatusCode=\\\"DeadlineExceeded\\\", Detail=\\\"Timeout\\\")\", will fail","@i":"02639a3a","@l":"Error","@x":"Grpc.Core.RpcException: Status(StatusCode=\"DeadlineExceeded\", Detail=\"Timeout\")\n at EventStore.Client.Streams.BatchAppendResp.ToWriteResult()\n at EventStore.Client.EventStoreClient.StreamAppender.<Duplex>g__Receive|13_1()\n at EventStore.Client.EventStoreClient.StreamAppender.<>c__DisplayClass12_0.<<AppendInternal>g__Operation|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at EventStore.Client.Diagnostics.ActivitySourceExtensions.TraceClientOperation[T](ActivitySource source, Func`1 tracedOperation, String operationName, Func`1 tagsFactory)\n at EventStore.Client.EventStoreClient.AppendToStreamAsync(String streamName, StreamState expectedState, IEnumerable`1 eventData, Action`1 configureOperationOptions, Nullable`1 deadline, UserCredentials userCredentials, CancellationToken cancellationToken)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.<>c__DisplayClass4_0.<<WriteEvent>g__AppendEvent|0>d.MoveNext() in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 38\n--- End of stack trace from previous location ---\n at Ubiquitous.Metrics.Metrics.MeasureTask[T](Func`1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.WriteEvent(BaseProposedEvent proposedEvent, CancellationToken cancellationToken) in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 23\n at Kurrent.Replicator.Sink.SinkPipelineExtensions.<>c__DisplayClass0_0.<<UseEventWriter>b__0>d.MoveNext() in /app/src/Kurrent.Replicator/Sink/SinkPipe.cs:line 57\n--- End of stack trace from previous location ---\n at GreenPipes.Filters.AsyncDelegateFilter`1.<>c__DisplayClass3_0.<<Send>g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at Kurrent.Replicator.LoggingFilter`1.Send(T context, IPipe`1 next) in /app/src/Kurrent.Replicator/Logging.cs:line 12\n at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)","Error":"Status(StatusCode=\"DeadlineExceeded\", Detail=\"Timeout\")","SourceContext":"Kurrent.Replicator.Observers.LoggingRetryObserver"}
[4ee8d740d39346b3ad3c7a0f42780d78](https://906079881165-5qughyol.ca-west-1.console.aws.amazon.com/ecs/v2/clusters/credit-app-production-es-replica/services/eventstore-replicator/tasks/4ee8d740d39346b3ad3c7a0f42780d78?region=ca-west-1)
eventstore-replicator
November 19, 2025, 13:18
{"@t":"2025-11-19T20:18:55.8252671Z","@m":"Error occured in the \"SinkContext\" pipe EventDetails { Stream: \"CreditorApplication:fe339a23-66b6-470a-9059-11ad4b28267f_0dae62c0-1b5a-4027-98e2-a589ff9f657b\", EventId: 8eea35e1-c0da-4a1c-9841-c04711105e19, EventType: \"CreditorCommentReceived\", ContentType: \"application/json\" }: \"Status(StatusCode=\\\"Unavailable\\\", Detail=\\\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\\\", DebugException=\\\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\\\")\"","@i":"13462698","@l":"Error","@x":"Grpc.Core.RpcException: Status(StatusCode=\"Unavailable\", Detail=\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\", DebugException=\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\")\n ---> System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\n ---> System.Net.Sockets.SocketException (111): Connection refused\n at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)\n at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)\n at System.Net.Sockets.Socket.<ConnectAsync>g__WaitForConnectWithCancellation|285_0(AwaitableSocketAsyncEventArgs saea, ValueTask connectTask, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)\n --- End of inner exception stack trace ---\n at System.Net.Http.HttpConnectionPool.ConnectToTcpHostAsync(String host, Int32 port, HttpRequestMessage initialRequest, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.ConnectAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.InjectNewHttp2ConnectionAsync(QueueItem queueItem)\n at System.Threading.Tasks.TaskCompletionSourceWithCancellation`1.WaitWithCancellationAsync(CancellationToken cancellationToken)\n at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)\n at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)\n at System.Net.Http.HttpClient.<SendAsync>g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)\n at Grpc.Net.Client.Internal.GrpcCall`2.RunCall(HttpRequestMessage request, Nullable`1 timeout)\n --- End of inner exception stack trace ---\n at EventStore.Client.Interceptors.TypedExceptionInterceptor.<>c__DisplayClass1_0.<.ctor>b__0(RpcException rpcEx)\n at EventStore.Client.Interceptors.RpcExceptionConversionExtensions.<>c__DisplayClass1_0`1.<Apply>b__0(Task`1 t)\n at System.Threading.Tasks.ContinuationResultTaskFromResultTask`2.InnerInvoke()\n at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)\n--- End of stack trace from previous location ---\n at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)\n at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)\n--- End of stack trace from previous location ---\n at EventStore.Client.GrpcServerCapabilitiesClient.GetAsync(CallInvoker callInvoker, CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClientBase.GetChannelInfoExpensive(ReconnectionRequired reconnectionRequired, Action`1 onReconnectionRequired, IChannelSelector channelSelector, CancellationToken cancellationToken)\n at EventStore.Client.SharingProvider`2.FillBoxAsync(TaskCompletionSource`1 box, TInput input)\n at EventStore.Client.TaskExtensions.WithCancellation[T](Task`1 task, CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClientBase.GetChannelInfo(CancellationToken cancellationToken)\n at EventStore.Client.EventStoreClient.StreamAppender.Duplex(ValueTask`1 channelInfoTask)\n at EventStore.Client.EventStoreClient.AppendToStreamAsync(String streamName, StreamState expectedState, IEnumerable`1 eventData, Action`1 configureOperationOptions, Nullable`1 deadline, UserCredentials userCredentials, CancellationToken cancellationToken)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.<>c__DisplayClass4_0.<<WriteEvent>g__AppendEvent|0>d.MoveNext() in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 38\n--- End of stack trace from previous location ---\n at Ubiquitous.Metrics.Metrics.MeasureTask[T](Func`1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at Kurrent.Replicator.KurrentDb.GrpcEventWriter.WriteEvent(BaseProposedEvent proposedEvent, CancellationToken cancellationToken) in /app/src/Kurrent.Replicator.KurrentDb/GrpcEventWriter.cs:line 23\n at Kurrent.Replicator.Sink.SinkPipelineExtensions.<>c__DisplayClass0_0.<<UseEventWriter>b__0>d.MoveNext() in /app/src/Kurrent.Replicator/Sink/SinkPipe.cs:line 57\n--- End of stack trace from previous location ---\n at GreenPipes.Filters.AsyncDelegateFilter`1.<>c__DisplayClass3_0.<<Send>g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at Kurrent.Replicator.LoggingFilter`1.Send(T context, IPipe`1 next) in /app/src/Kurrent.Replicator/Logging.cs:line 12","Type":"SinkContext","Event":{"Stream":"CreditorApplication:fe339a23-66b6-470a-9059-11ad4b28267f_0dae62c0-1b5a-4027-98e2-a589ff9f657b","EventId":"8eea35e1-c0da-4a1c-9841-c04711105e19","EventType":"CreditorCommentReceived","ContentType":"application/json","$type":"EventDetails"},"Message":"Status(StatusCode=\"Unavailable\", Detail=\"Error starting gRPC call. HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113) SocketException: Connection refused\", DebugException=\"System.Net.Http.HttpRequestException: Connection refused (eventstore-replica-20fd76379648b7a6.elb.ca-west-1.amazonaws.com:2113)\")","SourceContext":"Kurrent.Replicator.LoggingFilter`1[T]"}