elsa icon indicating copy to clipboard operation
elsa copied to clipboard

Bug: Non-group workers are not restarted after crash

Open jtrees opened this issue 3 years ago • 0 comments

In a group supervision tree, workers behave as expected: If their handler triggers a crash, they are restarted and the group can continue handling messages.

In a non-group supervision tree, crashed workers are simply ignored and never restarted. This is unintuitive and a pain to deal with, since the outer supervisor (the process returned by Elsa.Supervisor.start_link/1) keeps running as though everything were fine. I would expect the DynamicProcessManager to take care of worker restarts.

Here is an example integration test that fails and demonstrates the problem:

test "restarts a crashed worker that isn't in a group" do
  topic = "consumer-test3"
  Elsa.create_topic(@brokers, topic)

  start_supervised!(
    {Elsa.Supervisor,
     connection: :name1,
     endpoints: @brokers,
     consumer: [
       topic: topic,
       handler: Testing.ExampleMessageHandlerWithState,
       handler_init_args: %{pid: self()},
       begin_offset: :earliest
     ]}
  )

  send_messages(topic, ["message1"])
  send_messages(topic, ["message2"])

  assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000
  assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000

  kill_worker(topic)

  send_messages(topic, ["message3"])
  send_messages(topic, ["message4"])

  # These assertions fail, because the worker wasn't brought back up.
  assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000
  assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000
end

defmodule Testing.ExampleMessageHandlerWithState do
  use Elsa.Consumer.MessageHandler

  def init(args) do
    {:ok, args}
  end

  def handle_messages(messages, state) do
    Enum.each(messages, &send(state.pid, {:message, &1}))
    {:ack, state}
  end
end

defp send_messages(topic, messages) do
  :brod.start_link_client(@brokers, :test_client)
  :brod.start_producer(:test_client, topic, [])

  messages
  |> Enum.with_index()
  |> Enum.each(fn {msg, index} ->
    partition = rem(index, 2)
    :brod.produce_sync(:test_client, topic, partition, "", msg)
  end)
end

defp kill_worker(topic) do
  partition = 0

  worker_pid = Elsa.Registry.whereis_name({:elsa_registry_name1, :"worker_#{topic}_#{partition}"})
  Process.exit(worker_pid, :kill)

  assert false == Process.alive?(worker_pid)
end

jtrees avatar Mar 31 '22 12:03 jtrees