realtime icon indicating copy to clipboard operation
realtime copied to clipboard

fix: Allow Overriding of `max_channels_per_client` via Environment Variable

Open barrownicholas opened this issue 1 year ago • 4 comments

What kind of change does this PR introduce?

Bug fix: allows overriding the default of 100 for max_channels_per_client on a realtime tenant.

What is the current behavior?

The default is locked at 100 without any ready way to override it.

What is the new behavior?

Allows a user to specify MAX_CHANNELS_PER_CLIENT in the runtime environment to override this value.

Additional context

Fixes https://github.com/supabase/realtime/issues/843

barrownicholas avatar May 03 '24 21:05 barrownicholas

@barrownicholas is attempting to deploy a commit to the Supabase Team on Vercel.

A member of the Team first needs to authorize it.

vercel[bot] avatar May 03 '24 21:05 vercel[bot]

@filipecabaco when you have time could you take a peak at this? You've been a huge help in the past, sorry to bother, you're just the best point-of-contact I have!!

barrownicholas avatar May 03 '24 21:05 barrownicholas

Also, can confirm that this built successfully in Docker on my local machine

barrownicholas avatar May 03 '24 21:05 barrownicholas

👋 will check today sorry for the delay

filipecabaco avatar May 09 '24 12:05 filipecabaco

@filipecabaco good idea, just pushed those in 11197b4

barrownicholas avatar Jun 17 '24 23:06 barrownicholas

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

barrownicholas avatar Jun 18 '24 14:06 barrownicholas

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

filipecabaco avatar Jun 18 '24 16:06 filipecabaco

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

Will do then! As soon as this ships, I'll open up a new fork + PR and start that

barrownicholas avatar Jun 18 '24 17:06 barrownicholas

small ci error, could you run mix format and commit?

filipecabaco avatar Jun 18 '24 17:06 filipecabaco

sorry about that, should be good now @filipecabaco

barrownicholas avatar Jun 18 '24 17:06 barrownicholas

@filipecabaco looks like I accidenatally duplicated lines 24-26 in tenant.ex while trying to resolve merge conflicts, which produced the failure in the CI workflow. I think it should be good to go now.

barrownicholas avatar Jun 18 '24 17:06 barrownicholas

🤦‍♂️

my bad forgot that schema is a compiled thing, tenant.ex should actually do this with a changeset change. Here's a proposal for the change that can easily adapt to the other fields when we need to do it lib/realtime/api/tenant.ex

defmodule Realtime.Api.Tenant do
  @moduledoc """
  Describes a database/tenant which makes use of the realtime service.
  """
  use Ecto.Schema
  import Ecto.Changeset
  alias Realtime.Api.Extensions
  alias Realtime.Crypto

  @type t :: %__MODULE__{}

  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  schema "tenants" do
    field(:name, :string)
    field(:external_id, :string)
    field(:jwt_secret, :string)
    field(:jwt_jwks, :map)
    field(:postgres_cdc_default, :string)
    field(:max_concurrent_users, :integer, default: 200)
    field(:max_events_per_second, :integer, default: 100)
    field(:max_bytes_per_second, :integer, default: 100_000)
    field(:max_channels_per_client, :integer)
    field(:max_joins_per_second, :integer, default: 100)
    field(:suspend, :boolean, default: false)
    field(:events_per_second_rolling, :float, virtual: true)
    field(:events_per_second_now, :integer, virtual: true)
    field(:enable_authorization, :boolean, default: false)

    has_many(:extensions, Realtime.Api.Extensions,
      foreign_key: :tenant_external_id,
      references: :external_id,
      on_delete: :delete_all,
      on_replace: :delete
    )

    timestamps()
  end

  @doc false
  def changeset(tenant, attrs) do
    # TODO: remove after infra update
    extension_key =
      if attrs[:extensions] do
        :extensions
      else
        "extensions"
      end

    attrs =
      if attrs[extension_key] do
        ext =
          Enum.map(attrs[extension_key], fn
            %{"type" => "postgres"} = e -> %{e | "type" => "postgres_cdc_rls"}
            e -> e
          end)

        %{attrs | extension_key => ext}
      else
        attrs
      end

    tenant
    |> cast(attrs, [
      :name,
      :external_id,
      :jwt_secret,
      :jwt_jwks,
      :max_concurrent_users,
      :max_events_per_second,
      :postgres_cdc_default,
      :max_bytes_per_second,
      :max_channels_per_client,
      :max_joins_per_second,
      :suspend,
      :enable_authorization
    ])
    |> validate_required([
      :external_id,
      :jwt_secret
    ])
    |> unique_constraint([:external_id])
    |> encrypt_jwt_secret()
    |> maybe_set_default(:max_channels_per_client, :tenant_max_channels_per_client)
    |> cast_assoc(:extensions, with: &Extensions.changeset/2)
  end

  def maybe_set_default(changeset, property, config_key) do
    has_key? = Map.get(changeset.data, property) || Map.get(changeset.changes, property)

    if has_key? do
      changeset
    else
      put_change(changeset, property, Application.fetch_env!(:realtime, config_key))
    end
  end

  def encrypt_jwt_secret(changeset) do
    update_change(changeset, :jwt_secret, &Crypto.encrypt!/1)
  end
end

and we also need to change the runtime.exs we have at the moment ignores tests:

import Config

config :logflare_logger_backend,
  url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app")

app_name = System.get_env("FLY_APP_NAME", "")
default_db_host = System.get_env("DB_HOST", "localhost")
username = System.get_env("DB_USER", "postgres")
password = System.get_env("DB_PASSWORD", "postgres")
database = System.get_env("DB_NAME", "postgres")
port = System.get_env("DB_PORT", "5432")
slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")

config :realtime,
  tenant_max_channels_per_client:
    System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer()

if config_env() == :prod do
  secret_key_base =
    System.get_env("SECRET_KEY_BASE") ||
      raise """
      environment variable SECRET_KEY_BASE is missing.
      You can generate one by calling: mix phx.gen.secret
      """

  if app_name == "" do
    raise "APP_NAME not available"
  end

  config :realtime, RealtimeWeb.Endpoint,
    server: true,
    url: [host: "#{app_name}.fly.dev", port: 80],
    http: [
      port: String.to_integer(System.get_env("PORT") || "4000"),
      protocol_options: [
        max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_LENGTH") || "4096")
      ],
      transport_options: [
        # max_connection is per connection supervisor
        # num_conns_sups defaults to num_acceptors
        # total conns accepted here is max_connections * num_acceptors
        # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/
        max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"),
        num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"),
        # IMPORTANT: support IPv6 addresses
        socket_opts: [:inet6]
      ]
    ],
    check_origin: false,
    secret_key_base: secret_key_base
end

if config_env() != :test do
  platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly

  config :realtime,
    secure_channels: System.get_env("SECURE_CHANNELS", "true") == "true",
    jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),
    api_jwt_secret: System.get_env("API_JWT_SECRET"),
    api_blocklist: System.get_env("API_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_blocklist: System.get_env("METRICS_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"),
    db_enc_key: System.get_env("DB_ENC_KEY"),
    region: System.get_env("FLY_REGION") || System.get_env("REGION"),
    fly_alloc_id: System.get_env("FLY_ALLOC_ID", ""),
    prom_poll_rate: System.get_env("PROM_POLL_RATE", "5000") |> String.to_integer(),
    platform: platform,
    slot_name_suffix: slot_name_suffix

  queue_target = System.get_env("DB_QUEUE_TARGET", "5000") |> String.to_integer()
  queue_interval = System.get_env("DB_QUEUE_INTERVAL", "5000") |> String.to_integer()

  after_connect_query_args =
    case System.get_env("DB_AFTER_CONNECT_QUERY") do
      nil -> nil
      query -> {Postgrex, :query!, [query, []]}
    end

  config :realtime, Realtime.Repo,
    hostname: default_db_host,
    username: username,
    password: password,
    database: database,
    port: port,
    pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(),
    queue_target: queue_target,
    queue_interval: queue_interval,
    parameters: [
      application_name: "supabase_mt_realtime"
    ],
    after_connect: after_connect_query_args

  replica_repos = %{
    Realtime.Repo.Replica.FRA => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.IAD => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.SIN => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.SJC => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Singapore => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.London => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.NorthVirginia => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.Oregon => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.SanJose => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Local => default_db_host
  }

  # username, password, database, and port must match primary credentials
  for {replica_repo, hostname} <- replica_repos do
    config :realtime, replica_repo,
      hostname: hostname,
      username: username,
      password: password,
      database: database,
      port: port,
      pool_size: System.get_env("DB_REPLICA_POOL_SIZE", "5") |> String.to_integer(),
      queue_target: queue_target,
      queue_interval: queue_interval,
      parameters: [
        application_name: "supabase_mt_realtime_ro"
      ]
  end
end

default_cluster_strategy =
  config_env()
  |> case do
    :prod -> "DNS"
    _ -> "EPMD"
  end

cluster_topologies =
  System.get_env("CLUSTER_STRATEGIES", default_cluster_strategy)
  |> String.upcase()
  |> String.split(",")
  |> Enum.reduce([], fn strategy, acc ->
    strategy
    |> String.trim()
    |> case do
      "DNS" ->
        [
          fly6pn: [
            strategy: Cluster.Strategy.DNSPoll,
            config: [
              polling_interval: 5_000,
              query: System.get_env("DNS_NODES"),
              node_basename: app_name
            ]
          ]
        ] ++ acc

      "POSTGRES" ->
        version = "#{Application.spec(:realtime)[:vsn]}" |> String.replace(".", "_")

        [
          postgres: [
            strategy: Realtime.Cluster.Strategy.Postgres,
            config: [
              hostname: default_db_host,
              username: username,
              password: password,
              database: database,
              port: port,
              parameters: [
                application_name: "cluster_node_#{node()}"
              ],
              heartbeat_interval: 5_000,
              node_timeout: 15_000,
              channel_name:
                System.get_env("POSTGRES_CLUSTER_CHANNEL_NAME", "realtime_cluster_#{version}")
            ]
          ]
        ] ++ acc

      "EPMD" ->
        [
          dev: [
            strategy: Cluster.Strategy.Epmd,
            config: [
              hosts: [:"[email protected]", :"[email protected]"]
            ],
            connect: {:net_kernel, :connect_node, []},
            disconnect: {:net_kernel, :disconnect_node, []}
          ]
        ] ++ acc

      _ ->
        acc
    end
  end)

config :libcluster,
  debug: false,
  topologies: cluster_topologies

if System.get_env("LOGS_ENGINE") == "logflare" do
  if !System.get_env("LOGFLARE_API_KEY") or !System.get_env("LOGFLARE_SOURCE_ID") do
    raise """
    Environment variable LOGFLARE_API_KEY or LOGFLARE_SOURCE_ID is missing.
    Check those variables or choose another LOGS_ENGINE.
    """
  end

  config :logger,
    backends: [LogflareLogger.HttpBackend]
end

Tested locally and looks good Screenshot 2024-06-18 at 19 05 20

filipecabaco avatar Jun 18 '24 18:06 filipecabaco

should actually do this with a changeset change

@filipecabaco just refactored those two files based on those changes

barrownicholas avatar Jun 18 '24 18:06 barrownicholas

merged 🙏 thank you so much for all the patience!

filipecabaco avatar Jun 18 '24 22:06 filipecabaco

:tada: This PR is included in version 2.29.10 :tada:

The release is available on GitHub release

Your semantic-release bot :package::rocket:

kiwicopple avatar Jun 18 '24 22:06 kiwicopple