elsa
elsa copied to clipboard
Bug: Non-group workers are not restarted after crash
In a group supervision tree, workers behave as expected: If their handler triggers a crash, they are restarted and the group can continue handling messages.
In a non-group supervision tree, crashed workers are simply ignored and never restarted. This is unintuitive and a pain to deal with, since the outer supervisor (the process returned by Elsa.Supervisor.start_link/1) keeps running as though everything were fine. I would expect the DynamicProcessManager to take care of worker restarts.
Here is an example integration test that fails and demonstrates the problem:
test "restarts a crashed worker that isn't in a group" do
topic = "consumer-test3"
Elsa.create_topic(@brokers, topic)
start_supervised!(
{Elsa.Supervisor,
connection: :name1,
endpoints: @brokers,
consumer: [
topic: topic,
handler: Testing.ExampleMessageHandlerWithState,
handler_init_args: %{pid: self()},
begin_offset: :earliest
]}
)
send_messages(topic, ["message1"])
send_messages(topic, ["message2"])
assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000
assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000
kill_worker(topic)
send_messages(topic, ["message3"])
send_messages(topic, ["message4"])
# These assertions fail, because the worker wasn't brought back up.
assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000
assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000
end
defmodule Testing.ExampleMessageHandlerWithState do
use Elsa.Consumer.MessageHandler
def init(args) do
{:ok, args}
end
def handle_messages(messages, state) do
Enum.each(messages, &send(state.pid, {:message, &1}))
{:ack, state}
end
end
defp send_messages(topic, messages) do
:brod.start_link_client(@brokers, :test_client)
:brod.start_producer(:test_client, topic, [])
messages
|> Enum.with_index()
|> Enum.each(fn {msg, index} ->
partition = rem(index, 2)
:brod.produce_sync(:test_client, topic, partition, "", msg)
end)
end
defp kill_worker(topic) do
partition = 0
worker_pid = Elsa.Registry.whereis_name({:elsa_registry_name1, :"worker_#{topic}_#{partition}"})
Process.exit(worker_pid, :kill)
assert false == Process.alive?(worker_pid)
end