Skip to content

Swarm graceful application handoff vs process kill #117

@cburman01

Description

@cburman01

Hello, I have observed a peculiar behavior and was hoping to get some guidance.

I have noticed there is a different handoff of processes between nodes from when I gracefully terminate the application running as a systemd service eg: systemctl stop verses when I just kill the beam pid eg: kill -9 .

Here is the debug info when I just kill the pid: Nov 14 09:18:32 deverlapp02

Nov 14 09:18:32 dev02 ecommawarehouse[10858]: [debug] [swarm on ecommawarehouse_node@dev02] [tracker:handle_topology_change] restarting :"030_wrk" on ecommawarehouse_node@dev02
Nov 14 09:18:32 deverlapp02 ecommawarehouse[10858]: [debug] [swarm on ecommawarehouse_node@dev02] [tracker:do_track] starting :"030_wrk" on ecommawarehouse_node@dev02

Here is the debug info when I just stop the systemd service on one of the nodes:

Nov 14 09:25:17 deverlapp01 ecommawarehouse[5208]: [debug] [swarm on ecommawarehouse_node@dev01 [tracker:handle_monitor] :"030_wrk" is down: :shutdown
Nov 14 09:25:17 deverlapp01 ecommawarehouse[5208]: [warn] [swarm on ecommawarehouse_node@dev01] [tracker:broadcast_event] broadcast of event ({:untrack, #PID<29757.1594.0>}) was not recevied by [:"ecommawarehouse_node@dev01"]

I am just creating a supervisor that will only allow pids to be registered on only 1 of the nodes in the cluster.. That works fine. The only problem I am having is when systemd gracefully terminates its like it runs different handoff code.

Here is the workers that actually get started. They are generic depending on DB data:

defmodule EcommAwarehouseCore.DataWorker do
  use GenServer
  require Logger

  def register(args) do
    {:ok, _pid} = start_link(args)
  end

  def start_link(seed) do
    GenServer.start_link(seed.module, seed, name: seed.name)
  end

  def init(state) do
    EcommAwarehouseCore.Registry.monitor(self(), state.name, state.module, state.args)
    get_data(state.args.timeout)
    {:ok, Map.put(state, :current, [])}
  end

  defp get_data(timeout) do
    Process.send_after(self(), :get_data, timeout)
  end

  def handle_call(:get, _from, state) do
    {:reply, state.current, state}
  end

  def handle_call({:swarm, :begin_handoff}, _from, state) do
    Logger.info("[Worker] begin_handoff: #{state.name}")
    {:reply, :resume, state}
  end

  def handle_cast({:swarm, :end_handoff}, state) do
    Logger.info("[Worker] begin_handoff: #{state.name}")
    {:noreply, state}
  end

  def handle_cast({:swarm, :resolve_conflict}, state) do
    Logger.info("[Worker] resolve_conflict: #{state.name}")
    {:noreply, state}
  end

  def handle_info(:get_data, state) do
    case state.args.function.(state.args.zone) do
      {:ok, []} ->
        nil

      {:ok, data} ->
        case data == state.current do
          true ->
            :nothing

          false ->
            IO.puts("Changed Detected Broadcasting")

            EcommAwarehouseWeb.Endpoint.broadcast(
              "channel:data",
              state.args.event,
              List.first(data)
            )

            state = %{state | current: data}
        end

      {:error, reason} ->
        Logger.error(reason)
    end

    get_data(state.args.timeout)
    {:noreply, state}
  end

  def handle_info({:swarm, :die}, state) do
    Logger.info("[Worker] swarm stopping worker: #{state.name}")
    {:stop, :normal, state}
  end
end

Here is the registry:

defmodule EcommAwarehouseCore.Registry do
  use GenServer
  require Logger

  @name __MODULE__
  @pg2_group :workers

  def start_link(_) do
    Logger.info("Starting Registry")
    GenServer.start_link(__MODULE__, [], name: @name)
  end

  def register(name, module, args) do
    GenServer.call(@name, {:register, {name, module, args}})
  end

  def monitor(pid, name, module, args) do
    GenServer.cast(@name, {:monitor, pid, {name, module, args}})
  end

  def init(_) do
    Process.send_after(self(), :log_state, 500)
    Process.send_after(self(), :load_zones, 1000)
    Process.flag(:trap_exit, true)
    {:ok, %{}}
  end

  def handle_cast({:monitor, pid, {name, module, args}}, state) do
    :pg2.create(@pg2_group)
    :pg2.join(@pg2_group, pid)
    ref = Process.monitor(pid)
    {:noreply, Map.put(state, ref, {name, module, args})}
  end

  def handle_call({:register, {name, module, args}}, _from, state) do
    case start_via_swarm(name, module, args) do
      {:ok, pid} ->
        {:reply, {:ok, pid}, state}

      {:already_registered, pid} ->
        {:reply, {:ok, pid}, state}

      {:error, reason} ->
        Logger.error("[Registry] error starting #{name} - #{inspect(reason)}")
        {:reply, {:error, reason}, state}
    end
  end

  def handle_info(:load_zones, state) do
    EcommAwarehouseCore.Repo.get_zones()
    |> elem(1)
    |> Enum.map(fn x ->
      EcommAwarehouseCore.Registry.start_via_swarm(
        String.to_atom("#{x.zone}_wrk"),
        EcommAwarehouseCore.DataWorker,
        %{
          function: &EcommAwarehouseCore.Repo.get_pk_zone_wrk/1,
          zone: String.to_char_list(x.zone),
          timeout: 5000,
          event: "update_zone",
          registry: :work
        }
      )

      EcommAwarehouseCore.Registry.start_via_swarm(
        String.to_atom("#{x.zone}_wrker"),
        EcommAwarehouseCore.DataWorker,
        %{
          function: &EcommAwarehouseCore.Repo.get_pk_zone_wrkers/1,
          zone: String.to_char_list(x.zone),
          timeout: 5000,
          event: "update_worker",
          registry: :workers
        }
      )
    end)

    {:noreply, state}
  end

  def handle_info(:log_state, state) do
    total = Swarm.registered() |> Enum.count()
    local = state |> Enum.count()
    Logger.debug("[Registry] Totals:  Swarm/#{total} Local/#{local}")
    Process.send_after(self(), :log_state, 10000)
    {:noreply, state}
  end

  def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do
    {:noreply, Map.delete(state, ref)}
  end

  def handle_info({:DOWN, ref, :process, _pid, :shutdown}, state) do
    {:noreply, Map.delete(state, ref)}
  end

  def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
    Logger.error("Going Down IO.inspect #{ref}")
    case Map.get(state, ref) do
      nil ->
        {:noreply, state}

      {name, module, args} ->
        {:ok, _pid} = start_via_swarm(name, module, args, "restarting")
        {:noreply, Map.delete(state, ref)}
    end
  end

  def start_via_swarm(name, module, args, reason \\ "starting") do
    Logger.debug("[Registry] #{reason} #{name}")
     case Swarm.register_name(name, module, :register, [%{name: name, module: module, args: args}]) do
       {:ok, pid} -> 
        Swarm.join(args.registry, pid)
        {:ok, pid}
      {:error, {reason, pid}} -> 
        {:ok, pid}
     end
  end

  def get_all_values() do
    workers = Swarm.multi_call(:workers, :get)
    work = Swarm.multi_call(:work, :get)
    %{workers: List.foldl(workers, [], &(&1 ++ &2)), work: List.foldl(work, [], &(&1 ++ &2))}
  end
end


Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions