Elixir - Process

Getting StartedrのProcessについてサクッとまとめました。

Processes - Elixir

  • spawn ... 子プロセスを生成する
  • spawn_link ... 子プロセスを生成する(例外発生時に共倒れ)
  • send, receive ... プロセス間のメッセージ送受信
  • Task ... spawnのラップ、結果も返してくれる
  • State ... send, receiveの再帰呼び出しで実現

Processes

すべてのコードはプロセス中で実行されます。プロセスはお互いに隔離されており、並列で実行されメッセージパッシングを通じてやり取りします。並列処理のベースであるだけではなく、これにより分散型のフォールトトレラントプログラムを構築する役割があります。

ElixirのプロセスはOSのそれと混同すべきではありません。他のプログラミング言語のスレッドとは違い、メモリとCPUの観点からすると非常に軽量です。そのため、数十万のプロセスを同時に実行することも珍しくありません。

spawn

spawn/1は新しいプロセスを生成しPIDを返します。 生成されたプロセスは関数を実行し終了します。

iex> pid = spawn fn -> 1 + 2 end
#PID<0.58.0>
iex> Process.alive?(pid)
false

プロセス自身のIDはself/0で取得できます。

iex> self()
#PID<0.56.0>
iex> Process.alive?(self())
true

send と receive

プロセス間のメッセージのやり取りにはsend/2receive/1を利用できます。 以下は自分自身にメッセージを送信するサンプルです。

defmodule ProcessSample do
  def send_message(msg) do
    # 自分自身のメールボックスに保存
    send self(), msg
  end

  def receive_message do
    # 送信されたメッセージを受け取る
    receive do
      {:msg, msg} -> msg
    after
      # タイムアウトを設定
      # ※設定しない場合はパターンマッチするメッセージを受信するまで待ち続ける
      1_000 -> 'timeout'
    end
  end
end

# 送信したメッセージがパターンマッチする場合
ProcessSample.send_message({:msg, 'hello, world'})

IO.puts ProcessSample.receive_message

# マッチしない場合
ProcessSample.send_message('hello, world')

IO.puts ProcessSample.receive_message
$ elixir process.exs
hello, world
timeout

これをspawn/1を利用してプロセス間でやり取りするように書き換えてみます。

defmodule InterProcessSample do
  def run do
    parent = self()

    # PIDを送信する子プロセスを生成
    spawn fn -> send(parent, {:pid, inspect(self())}) end
  end

  def receive_message do
    receive do
      {:pid, pid} -> "I'm #{pid}."
    after
      1_000 -> 'timeout'
    end
  end
end

InterProcessSample.run

IO.puts InterProcessSample.receive_message
$ elixir process.exs
I'm #PID<0.52.0>.

Links

プロセスの生成で実際によく利用されるのはspawn_link/1です。 spawn/1は子プロセスが例外を起こしても親プロセスは実行され続けますが、 spawn_link/1は親プロセスも共に終了します。

# spawn.exs

# spawn/1は子プロセスで例外が起きてもプログラムは終了しない
# spawn fn -> raise "oops" end

# spawn_link/1はプログラムが終了する
spawn_link fn -> raise "oops" end

receive do
  :hello -> '子プロセスが例外を起こすまで待つ'
end

この仕組みはフォールトトレラントシステムを構築するのに重要な役割を果たします。 Elixirではプロセスの死活監視、生成を行うSupervisorにリンクすることが多いです。

他のプログラミング言語ではtry/catch等で例外処理をしますが、 このような仕組みがあるため、例外は放っておくのが実際のところです。

Tasks

Taskはspawnより詳細なエラーレポートを提供します。 Task.start/1, Task.start_link/1はPIDと併せて結果も返してくれるためSupervisorで利用されます。

iex> spawn fn -> raise "oops" end
#PID<0.58.0>
iex>
10:40:03.817 [error] Process #PID<0.58.0> raised an exception
** (RuntimeError) oops
    :erlang.apply/2

iex> Task.start fn -> raise "oops" end
{:ok, #PID<0.60.0>}
iex>
10:40:11.695 [error] Task #PID<0.60.0> started from #PID<0.56.0> terminating
** (RuntimeError) oops
    (elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<20.54118792/0 in :erl_eval.expr/5>
    Args: []

State

アプリケーションの設定や読み込んだファイルの内容等の状態を保持する場合にもプロセスを利用します。
以下はキーバリューストアのサンプルです。

# kv.exs

defmodule KV do
  def start_link do
    # 空のMapとともに子プロセスを生成
    Task.start_link(fn -> loop(%{}) end)
  end

  defp loop(map) do
    receive do
      {:get, key, caller} ->
        # caller(呼び出し元)にメッセージを送信
        send caller, Map.get(map, key)
        loop(map)
      {:put, key, value} ->
        loop(Map.put(map, key, value))
    end
  end
end

これをiexで実行してみます。 flush/0はメールボックス内のメッセージを出力します。

$ iex kv.exs
iex> {:ok, pid} = KV.start_link
{:ok, #PID<0.62.0>}
iex> send pid, {:put, :hello, :world}
{:put, :hello, :world}
iex> send pid, {:get, :hello, self()}
{:get, :hello, #PID<0.60.0>}
iex> flush
:world
:ok

また、pidに名称を与えて実行することも可能です。

iex> {:ok, pid} = KV.start_link
{:ok, #PID<0.63.0>}
iex> Process.register(pid, :kv)
true
iex> send :kv, {:put, :hello, :world}
{:put, :hello, :world}

大抵の場合これらのパターンを自前で実装することはなく、 例えばAgent等の抽象化されたものを利用します。

iex> {:ok, pid} = Agent.start_link(fn -> %{} end)
{:ok, #PID<0.72.0>}
iex> Agent.update(pid, fn map -> Map.put(map, :hello, :world) end)
:ok
iex> Agent.get(pid, fn map -> Map.get(map, :hello) end)
:world