commanded icon indicating copy to clipboard operation
commanded copied to clipboard

Q: how to design commanded with audit log solution in legacy system?

Open elvanja opened this issue 1 year ago • 1 comments

So my team is trying to go the commanded route in a legacy system. Main reason is the requirement for audit log, where logging the commands dispatch (via e.g. middleware like https://github.com/commanded/commanded-audit-middleware/) is a nice solution. Event sourcing is a nice bonus, especially since we have upcoming features for which it will be a perfect match. Apologies upfront for the lengthy post, couldn't find a way to make it much shorter.

Anyway, we have a legacy part of the system where we would like to use the same audit log / commanded idea. There are a few gotchas however:

  • current customer API is synchronous, expecting result of e.g. CRUD operation immediately (succeeded or failed)
  • current API should not be broken, but we are free to introduce V2 which will replace it in the future
  • there is a permissions policy with approvals system in place, which may postpone execution of actions (e.g. until enough approval are collected)

A representative part of this legacy system is e.g. API keys management. Basically a customer can create a new API key, pick up the generated API key secret (revealed only once, on creation) and use the generated secret to access the API. That API key can then be enabled or disabled (lifecycle management). We also have domain actions that do the work, e.g. create, enable or disable, but also for approving/denying etc.

A special part of legacy system is the approvals framework. If applicable permission policy requires 2 users to approve initiated action, the action is not executed immediately. It is stored as erlang term in DB, and when we collect those 2 approvals, only then we execute it.

We came up with something like this:

defmodule Demo.ApiKeys.Aggregate do
  defstruct [
    :aggregate_id,
    :approval_flow_initiated,
    :api_key
  ]

  def execute(%{aggregate_id: nil}, %Commands.Create{} = command) do
    case DomainActions.CreateApiKey.call(command.attrs) do
      {:ok, api_key} -> {:ok, %Events.Created{aggregate_id: command.aggregate_id, api_key: api_key}}
      {:error, reason} -> {:error, reason}
    end
  end

  def execute(%{aggregate_id: aggregate_id}, %Commands.Enable{api_key: api_key} = command) do
    case DomainActions.EnableApiKey.call(api_key, command.subject) do
      {:ok, %Domain.ApprovalRequest{} = request} -> {:ok, %Events.ApprovalFlowRequested{aggregate_id: aggregate_id, request: request, command: command}}
      {:ok, %Domain.ApiKey{} = api_key} -> {:ok, %Events.Enabled{aggregate_id: aggregate_id, api_key: api_key}}
      {:forbidden, reason} -> {:error, {:forbidden, reason}}
      {:error, reason} -> {:error, reason}
    end
  end
  
  def execute(%{aggregate_id: aggregate_id}, %Commands.Disable{api_key: api_key} = command) do
    case DomainActions.DisableApiKey.call(api_key, command.subject) do
      {:ok, %Domain.ApprovalRequest{} = request} -> {:ok, %Events.ApprovalFlowRequested{aggregate_id: aggregate_id, request: request, command: command}}
      {:ok, %Domain.ApiKey{} = api_key} -> {:ok, %Events.Disabled{aggregate_id: aggregate_id, api_key: api_key}}
      {:forbidden, reason} -> {:error, {:forbidden, reason}}
      {:error, reason} -> {:error, reason}
    end
  end

  def execute(%{aggregate_id: aggregate_id, api_key: api_key}, %Commands.Approve{} = command) do
    case DomainActions.Approve.call(command.request, command.attrs) do
      {:ok, response} -> {:ok, [
        %Events.Approved{aggregate_id: aggregate_id, api_key: api_key},
        %Events.Enabled{aggregate_id: aggregate_id, api_key: api_key} # e.g. approval was requested for enabling  
      ]}
      
      {:error, changeset} -> {:error, changeset}
    end
  end

  def execute(%{aggregate_id: aggregate_id, api_key: api_key}, %Commands.Deny{} = command) do
    case DomainActions.Deny.call(command.request, command.attrs) do
      {:ok, response} -> {:ok, %Events.Denied{aggregate_id: aggregate_id, api_key: api_key}}
      {:error, changeset} -> {:error, changeset}
    end
  end

  def apply(aggregate, %Events.ApprovalFlowRequested{}) do
    %{aggregate | approval_flow_initiated: true}
  end

  def apply(%{aggregate_id: nil} = aggregate, %Events.Created{} = event) do
    %{
      aggregate_id: event.aggregate_id,
      api_key: event.api_key,
      approval_flow_initiated: false
    }
  end

  def apply(aggregate, event) do
    Map.merge(aggregate, %{
      api_key: event.api_key,
      approval_flow_initiated: false
    })
  end
end

defmodule Demo.ApiKeys.Commands.Create do
  use Demo.Command, fields: [:attrs], required: :all

  def dispatch(attrs, actor, metadata) do
    api_key_id = Ecto.UUID.generate()
    attrs = Map.put(attrs, :id, api_key_id)
    aggregate_id = api_key_id

    result = CommandApp.dispatch(
      __MODULE__.build(aggregate_id, actor, attrs: attrs),
      causation_id: metadata[:request_id],
      returning: :aggregate_state
    )

    case result do
      {:ok, %{api_key: api_key}} -> {:ok, api_key}
      {:error, reason} -> {:error, reason}
    end
  end
end

defmodule Demo.ApiKeys.Commands.Enable do
  use Demo.Command, fields: [:api_key], skip: [:api_key], required: :all

  def dispatch(api_key, actor, metadata) do
    aggregate_id = api_key.id

    result = CommandApp.dispatch(
      __MODULE__.build(aggregate_id, actor, api_key: api_key),
      causation_id: metadata[:request_id],
      returning: :aggregate_state
    )

    case result do
      {:ok, %{approval_flow_initiated: true}} -> {:ok, :approval_flow_initiated}
      {:ok, %{api_key: api_key}} -> {:ok, api_key}
      {:error, {:forbidden, reason}} -> {:forbidden, reason}
      {:error, reason} -> {:error, reason}
    end
  end
end

Main idea here was to keep using domain actions/logic and just trigger those inside aggregate. This would (we think?!) ensure that we trigger commands which in turn affect the aggregate which in turn generates events after related domain action completes. In the process, custom middleware generates audit logs for dispatched commands.

One of the reasons for this approach was that we were not sure if doing domain actions in handler would indeed match what was triggered from aggregate. E.g. if aggregate just emits Events.Enabled we have no way of knowing if domain action succeeded or maybe it requires approvals or even failed. If there was such a guarantee, then we could do something like:

defmodule Demo.ApiKeys.Aggregate do
  def execute(%{aggregate_id: nil}, %Commands.Create{} = command) do
    %Events.CreateRequested{attrs: command.attrs}
  end

  def execute(%{api_key: api_key}, %Commands.Enable{}) do
    %Events.EnableRequested{api_key: api_key}
  end

  def execute(%{api_key: api_key}, %Commands.Disable{}) do
    %Events.DisableRequested{api_key: api_key}
  end

  def execute(%{api_key: api_key}, %Commands.Approve{}) do
    %Events.ApproveRequested{api_key: api_key}
  end

  def execute(%{api_key: api_key}, %Commands.Deny{}) do
    %Events.DenyRequested{api_key: api_key}
  end
end

defmodule Demo.ApiKeys.Handler do
  use Commanded.Event.Handler, application: Demo.Commanding.Application, name: __MODULE__

  def handle(%Events.CreateRequested{} = event, _metadata) do
    case DomainActions.CreateApiKey.call(event.attrs) do
      {:ok, _} -> :ok
      {:error, reason} -> {:error, reason}
    end
  end

  def handle(%Events.EnableRequested{} = event, metadata) do
    case DomainActions.EnableApiKey.call(event.api_key, metadata.subject) do
      {:ok, %Domain.ApprovalRequest{} = request} -> {:ok, %Events.ApprovalFlowRequested{request: request, command: Commands.Enable}}
      {:ok, %Domain.ApiKey{} = api_key} -> {:ok, %Events.Enabled{api_key: api_key}}
      {:forbidden, reason} -> {:error, {:forbidden, reason}}
      {:error, reason} -> {:error, reason}
    end
  end

  def handle(%Events.DisableRequested{} = event, metadata) do
    case DomainActions.DisableApiKey.call(event.api_key, metadata.subject) do
      {:ok, %Domain.ApprovalRequest{} = request} -> {:ok, %Events.ApprovalFlowRequested{request: request, command: Commands.Disable}}
      {:ok, %Domain.ApiKey{} = api_key} -> {:ok, %Events.Disabled{api_key: api_key}}
      {:forbidden, reason} -> {:error, {:forbidden, reason}}
      {:error, reason} -> {:error, reason}
    end
  end

  def handle(%Events.ApproveRequested{} = event, metadata) do
    case DomainActions.Approve.call(event.request, event.attrs) do
      {:ok, response} -> {:ok, [
        %Events.Approved{api_key: event.api_key},
        %Events.Enabled{api_key: event.api_key} # e.g. approval was requested for enabling  
      ]}

      {:error, changeset} -> {:error, changeset}
    end
  end

  def handle(%Events.DenyRequested{} = event, metadata) do
    case DomainActions.Deny.call(event.request, event.attrs) do
      {:ok, response} -> {:ok, %Events.Denied{api_key: event.api_key}}
      {:error, changeset} -> {:error, changeset}
    end
  end
end

However, this would definitely break the synchronous nature of current API. Also, we found no solution for how to return created API key ID back to API consumer.

We could wait for effect in the API controllers and return appropriate response, but then the issue is how to build reference for consumer to use to inspect the status of given command. Maybe there is a way to use e.g. command ID or similar, but again we could find no way to make it work from https://hexdocs.pm/commanded/Commanded.Application.html#c:dispatch/1. Could we have maybe generated some reference and passed it into the command meta, e.g. as causation / correlation id even, and then have a way to ask commanded for status of given correlation aggregate or command?

As for process managers, as per https://github.com/commanded/commanded/pull/514#issuecomment-1884641211 it looks like it is not a good idea to go with that.

So, I guess the questions is if you have any advice in introducing commanded in legacy apps in general and maybe specific idea for the problem at hand. Thank you for great work :bow:

elvanja avatar Oct 22 '24 13:10 elvanja

P.S. just to verify our assumptions, the execute in aggregate and handle in handler are in transaction with DB calls executed within those functions? Note that event store and domain stuff is in the same DB (different schema). Kind of like Ecto.Multi? If not, is there a way to achieve that?

Update: reading https://github.com/commanded/commanded/blob/master/lib/commanded/commands/dispatcher.ex#L83C1-L90C9 but don't get where it goes from there. Should we maybe use the aggregates idea and go with https://hexdocs.pm/commanded/Commanded.Aggregate.Multi.html somehow?

elvanja avatar Oct 22 '24 17:10 elvanja

@elvanja I'm going to move this one to discussions. I think you might get some good discussion on the Elixir slack in the #commanded channel. You have probably moved on from this problem, but come talk to us about it if you are still curious.

drteeth avatar Jun 24 '25 02:06 drteeth