Что такое mailbox и как избежать его переполнения?

В Elixir (как и в Erlang), каждый процесс имеет свою собственную почтовую очередь сообщений — mailbox. Эта очередь используется для асинхронной передачи сообщений между процессами. При вызове send, сообщение помещается в mailbox получателя, где может быть позднее обработано через конструкцию receive.

Что такое mailbox

Mailbox (почтовый ящик) — это внутренняя очередь, ассоциированная с каждым процессом BEAM. Она:

  • Является FIFO (first-in, first-out) — сообщения обрабатываются в порядке поступления.

  • Изолирована: каждый процесс имеет свою собственную очередь.

  • Может хранить неограниченное количество сообщений (ограничение — память).

  • Не очищается автоматически — только когда процесс сам извлечёт сообщение через receive.

Как работает send и receive

Отправка сообщения:

send(pid, {:msg, "Привет"})

Сообщение {:msg, "Привет"} попадёт в mailbox процесса pid.

Приём сообщений:

receive do
{:msg, text} -> IO.puts("Получено: #{text}")
end

Этот код будет ожидать появления подходящего сообщения в mailbox и обработает его. Важно понимать, что если сообщение не соответствует шаблону — оно остаётся в mailbox и может быть извлечено позже.

Возможные проблемы при переполнении mailbox

  1. Процесс не обрабатывает сообщения:
    Если вы отправляете много сообщений, но не вызываете receive, очередь будет расти.

  2. Сообщения не соответствуют шаблонам:
    Если receive не охватывает все типы сообщений, они могут накапливаться в mailbox навсегда.

  3. Долгоживущие процессы без receive:
    Например, задача отправлена через spawn, но внутри нет логики обработки сообщений — mailbox будет заполняться при любом send.

  4. Накопление мусора:
    В больших приложениях это приводит к увеличению потребления памяти, а в крайних случаях — к Out Of Memory (OOM) или замедлению системы.

Как избежать переполнения mailbox

1. Ограничьте количество сообщений

  • Не отправляйте сообщения без крайней необходимости.

  • Не используйте send как механизм хранения состояний.

2. Обрабатывайте все сообщения

receive do
any -> обработка(any)
after
0 -> :ok
end

Использование конструкции after 0 — это способ избежать блокировки и обработать сообщения, если они есть.

3. Проверяйте размер mailbox

В Elixir напрямую узнать размер mailbox нельзя, но Erlang даёт такую возможность через process_info/2.

:erlang.process_info(self(), :message_queue_len)
\# => {:message_queue_len, 0}

Можно обернуть это в функцию и вызывать её периодически.

4. Фильтрация сообщений в receive

Важно не только вызывать receive, но и обеспечить, чтобы он соответствовал структуре входящих сообщений. Неподходящие сообщения будут накапливаться.

receive do
{:expected, data} -> обработка(data)
_ -> :skip
end

Можно использовать after с тайм-аутом для обработки непредвиденных ситуаций.

5. Используйте GenServer или другие OTP-компоненты

GenServer реализует почтовый ящик и автоматически извлекает сообщения из него, вызывая handle_call, handle_cast и handle_info. Он снижает вероятность переполнения mailbox за счёт структурированной обработки:

def handle_info({:msg, text}, state) do
\# Обработка произвольных сообщений
{:noreply, state}
end

6. Реализация backpressure

Если один процесс перегружен, лучше реализовать механизм обратного давления, при котором отправляющий процесс ждёт, прежде чем отправлять следующее сообщение. Это можно реализовать через GenStage, Flow, Broadway.

Пример плохой практики

defmodule Flood do
def start do
pid = spawn(fn -> loop() end)
Enum.each(1..1_000_000, fn i -> send(pid, {:msg, i}) end)
end
def loop do
receive do
{:msg, \_i} -> :ok # медленно, без таймаута и контроля
end
loop()
end
end

Этот код создаёт миллион сообщений без ограничения скорости — процесс не успеет их обработать, и mailbox вырастет до гигантских размеров.

Пример хорошей практики с контролем

defmodule Safe do
def start do
pid = spawn(fn -> loop() end)
Enum.each(1..1000, fn i ->
send(pid, {:msg, i})
wait_if_needed(pid)
end)
end
defp wait_if_needed(pid) do
{:message_queue_len, len} = :erlang.process_info(pid, :message_queue_len)
if len > 100 do
Process.sleep(10)
end
end
defp loop do
receive do
{:msg, i} ->
IO.inspect(i)
end
loop()
end
end

Этот подход снижает риск переполнения и даёт контроль над системой.