Как реализовать коммуникацию между процессами?
В Elixir коммуникация между процессами реализуется через обмен сообщениями, который основан на акторной модели, унаследованной от Erlang. Каждый процесс имеет свой уникальный идентификатор (PID) и собственную очередь сообщений (mailbox). Обмен сообщениями осуществляется с помощью операторов send и receive, а также с использованием абстракций OTP, таких как GenServer.
1. Создание процессов
Для взаимодействия между процессами сначала их нужно создать. Это делается с помощью spawn/1, spawn/3 или через OTP-абстракции:
pid = spawn(fn -> listen() end)
2. Отправка сообщений — send/2
Оператор send/2 отправляет сообщение в почтовый ящик другого процесса:
send(pid, {:msg, "Привет"})
-
pid — PID получателя.
-
{:msg, "Привет"} — произвольный кортеж, который используется как сообщение.
3. Получение сообщений — receive
Процесс может получить сообщение из своего почтового ящика через receive:
receive do
{:msg, text} -> IO.puts("Получено сообщение: #{text}")
end
-
receive блокирует выполнение, пока не найдёт сообщение, соответствующее шаблону.
-
Порядок — FIFO, но с учётом фильтрации по шаблонам.
4. Подтверждение и ответ
Чтобы процесс-отправитель мог получить ответ, он обычно указывает self() в теле сообщения:
send(pid, {:request, "данные", self()})
Процесс получатель:
receive do
{:request, msg, from} ->
send(from, {:response, "ответ на #{msg}"})
end
Процесс-отправитель может затем получить ответ:
receive do
{:response, reply} -> IO.puts("Получен ответ: #{reply}")
end
5. Распределенная архитектура
Процессы могут работать на разных узлах (нодах). Для этого:
-
Узлы запускаются с именем (elixir --sname node1).
-
Соединяются через Node.connect(:'node2@host').
-
После подключения можно использовать send({:name, :'node2@host'}, message).
6. Регистрируемые имена процессов
Чтобы избежать необходимости вручную передавать PID, можно зарегистрировать процесс:
Process.register(self(), :echo)
Теперь отправка сообщения:
send(:echo, :ping)
7. Пример простой коммуникации
defmodule Echo do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:echo, msg, sender} ->
send(sender, {:reply, msg})
end
loop()
end
end
pid = Echo.start()
send(pid, {:echo, "hello", self()})
receive do
{:reply, msg} -> IO.puts("Получено: #{msg}")
end
8. Использование GenServer для надёжной коммуникации
Базовая реализация с использованием GenServer:
defmodule MyServer do
use GenServer
def start_link(\_) do
GenServer.start_link(\__MODULE_\_, %{}, name: :my_server)
end
def handle_call(:ping, \_from, state) do
{:reply, :pong, state}
end
end
{:ok, \_} = MyServer.start_link(nil)
GenServer.call(:my_server, :ping) # => :pong
-
GenServer.call/2 — синхронный вызов, ожидает ответ.
-
GenServer.cast/2 — асинхронный, не ждёт результата.
9. Использование Task и Task.async
Ещё один способ запуска процесса и ожидания результата:
task = Task.async(fn -> 1 + 1 end)
result = Task.await(task)
IO.puts("Результат: #{result}")
- Под капотом Task — это процессы с spawn_link.
10. Пример очереди обработки сообщений
defmodule Worker do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:process, data, sender} ->
result = String.upcase(data)
send(sender, {:result, result})
end
loop()
end
end
pid = Worker.start()
send(pid, {:process, "текст", self()})
receive do
{:result, processed} -> IO.puts("Результат: #{processed}")
end
11. Обработка таймаутов
В receive можно указать after, чтобы не ждать вечно:
receive do
{:ok, data} -> IO.inspect(data)
after
2000 -> IO.puts("Таймаут ожидания ответа")
end
12. Потеря сообщений
Если receive не содержит шаблона для сообщения — оно останется в mailbox. Это может привести к накоплению "мусора". Такие сообщения следует обрабатывать или очищать почтовый ящик.
13. Модель "обратного вызова"
Распространённый паттерн:
-
Запрос отправляется вместе с from_pid
-
Получатель формирует ответ и возвращает его на указанный PID
-
Аналогично работе GenServer.call/2
14. Использование send_after и Process.send_after
Можно отложить отправку сообщения:
Process.send_after(self(), :wake_up, 5000)
receive do
:wake_up -> IO.puts("Просыпаемся!")
end
15. Применение Registry в сложных системах
Elixir позволяет использовать Registry — процесс, хранящий именованные PID:
{:ok, \_} = Registry.start_link(keys: :unique, name: MyRegistry)
Registry.register(MyRegistry, "user:123", \[\])
Registry.lookup(MyRegistry, "user:123")
Позволяет гибко управлять множеством процессов и адресовать сообщения по имени.