Как реализовать коммуникацию между процессами?

В Elixir коммуникация между процессами реализуется через обмен сообщениями, который основан на акторной модели, унаследованной от Erlang. Каждый процесс имеет свой уникальный идентификатор (PID) и собственную очередь сообщений (mailbox). Обмен сообщениями осуществляется с помощью операторов send и receive, а также с использованием абстракций OTP, таких как GenServer.

1. Создание процессов

Для взаимодействия между процессами сначала их нужно создать. Это делается с помощью spawn/1, spawn/3 или через OTP-абстракции:

pid = spawn(fn -> listen() end)

2. Отправка сообщений — send/2

Оператор send/2 отправляет сообщение в почтовый ящик другого процесса:

send(pid, {:msg, "Привет"})
  • pid — PID получателя.

  • {:msg, "Привет"} — произвольный кортеж, который используется как сообщение.

3. Получение сообщений — receive

Процесс может получить сообщение из своего почтового ящика через receive:

receive do
{:msg, text} -> IO.puts("Получено сообщение: #{text}")
end
  • receive блокирует выполнение, пока не найдёт сообщение, соответствующее шаблону.

  • Порядок — FIFO, но с учётом фильтрации по шаблонам.

4. Подтверждение и ответ

Чтобы процесс-отправитель мог получить ответ, он обычно указывает self() в теле сообщения:

send(pid, {:request, "данные", self()})

Процесс получатель:

receive do
{:request, msg, from} ->
send(from, {:response, "ответ на #{msg}"})
end

Процесс-отправитель может затем получить ответ:

receive do
{:response, reply} -> IO.puts("Получен ответ: #{reply}")
end

5. Распределенная архитектура

Процессы могут работать на разных узлах (нодах). Для этого:

  • Узлы запускаются с именем (elixir --sname node1).

  • Соединяются через Node.connect(:'node2@host').

  • После подключения можно использовать send({:name, :'node2@host'}, message).

6. Регистрируемые имена процессов

Чтобы избежать необходимости вручную передавать PID, можно зарегистрировать процесс:

Process.register(self(), :echo)

Теперь отправка сообщения:

send(:echo, :ping)

7. Пример простой коммуникации

defmodule Echo do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:echo, msg, sender} ->
send(sender, {:reply, msg})
end
loop()
end
end
pid = Echo.start()
send(pid, {:echo, "hello", self()})
receive do
{:reply, msg} -> IO.puts("Получено: #{msg}")
end

8. Использование GenServer для надёжной коммуникации

Базовая реализация с использованием GenServer:

defmodule MyServer do
use GenServer
def start_link(\_) do
GenServer.start_link(\__MODULE_\_, %{}, name: :my_server)
end
def handle_call(:ping, \_from, state) do
{:reply, :pong, state}
end
end
{:ok, \_} = MyServer.start_link(nil)
GenServer.call(:my_server, :ping) # => :pong
  • GenServer.call/2 — синхронный вызов, ожидает ответ.

  • GenServer.cast/2 — асинхронный, не ждёт результата.

9. Использование Task и Task.async

Ещё один способ запуска процесса и ожидания результата:

task = Task.async(fn -> 1 + 1 end)
result = Task.await(task)
IO.puts("Результат: #{result}")
  • Под капотом Task — это процессы с spawn_link.

10. Пример очереди обработки сообщений

defmodule Worker do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:process, data, sender} ->
result = String.upcase(data)
send(sender, {:result, result})
end
loop()
end
end
pid = Worker.start()
send(pid, {:process, "текст", self()})
receive do
{:result, processed} -> IO.puts("Результат: #{processed}")
end

11. Обработка таймаутов

В receive можно указать after, чтобы не ждать вечно:

receive do
{:ok, data} -> IO.inspect(data)
after
2000 -> IO.puts("Таймаут ожидания ответа")
end

12. Потеря сообщений

Если receive не содержит шаблона для сообщения — оно останется в mailbox. Это может привести к накоплению "мусора". Такие сообщения следует обрабатывать или очищать почтовый ящик.

13. Модель "обратного вызова"

Распространённый паттерн:

  • Запрос отправляется вместе с from_pid

  • Получатель формирует ответ и возвращает его на указанный PID

  • Аналогично работе GenServer.call/2

14. Использование send_after и Process.send_after

Можно отложить отправку сообщения:

Process.send_after(self(), :wake_up, 5000)
receive do
:wake_up -> IO.puts("Просыпаемся!")
end

15. Применение Registry в сложных системах

Elixir позволяет использовать Registry — процесс, хранящий именованные PID:

{:ok, \_} = Registry.start_link(keys: :unique, name: MyRegistry)
Registry.register(MyRegistry, "user:123", \[\])
Registry.lookup(MyRegistry, "user:123")

Позволяет гибко управлять множеством процессов и адресовать сообщения по имени.