読者です 読者をやめる 読者になる 読者になる

distributed-process - 分散処理(Parallel and Concurrent Programming in Haskell Chapter 14)

distributed-process - 分散処理(Parallel and Concurrent Programming in Haskell Chapter 14)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch14.html

前の章についてまとめたものはこちら
http://jsapachehtml.hatenablog.com/entry/2015/03/17/072031

第14章では分散プログラミングについて。
haskellではCloud Haskellというプロジェクトで分散プログラミング用のライブラリを開発している。パッケージ名はdistributed-process。

並行処理と分散処理の違いはプロセスが同じマシンで動くか違うマシンで動くかという点。分散処理は各プロセスが異なるマシン上で動いている状態を扱うため、並行処理とは違った以下のような問題点が出てくる。

  • どれか一つのプロセスだけがマシンのトラブルなどで止まってしまう可能性があること。それを当然あり得るとして考慮する必要が出てくる。
  • 各プロセスがやりとりする方法を考える必要がある。同じマシン上ならば共有メモリの値を読むことで実現できたが違うマシン上ではそうはいかず、ネットワーク越しにやりとりすることになる。なので情報の共有にとても時間がかかる。
  • システム全体でグローバルな状態を共有するのが難しい。どのようにしてコンセンサスを取るのかについてアルゴリズムを考える必要がある。

haskellの分散処理ではメッセージパッシングを基本的なモデルとし、TChanをプリミティブとして使っている。

haskellでは言語そのものでは分散処理をサポートいない。代わりにdistributed-processというパッケージを使う。また、それとともにネットワークの処理を扱うパッケージを利用する必要がある。

トランスポートレイヤーの処理はdistributed-processとうまく分離されており、使いたい機能に合わせて付け替えることができる。この章ではトランスポートレイヤーとしてdistributed-process-simplelocalnetを使う。

ping/pong

masterプロセスからslaveプロセスへpingを投げるとpongを返すという例。

今回使うdistributed-processのAPIはこれら。

data Process   -- instance Monad, MonadIO

data NodeId    -- instance Eq, Ord, Show, Typeable, Binary
data ProcessId -- instance Eq, Ord, Show, Typeable, Binary

-- 自身のプロセスIDやノードIDを取得
getSelfPid  :: Process ProcessId
getSelfNode :: Process NodeId

-- 他のマシン上でプロセスを起動
spawn  :: NodeId -> Closure (Process ()) -> Process ProcessId

-- メッセージの送受信
send   :: Serializable a => ProcessId -> a -> Process ()
expect :: Serializable a => Process a

-- プロセスの終了
terminate :: Process a

-- Processモナド内での出力
say :: String -> Process ()

プロセスというのはここで新しく出てきた概念で以下のように説明されている。

  • スレッドは同じマシン上で起動するが、プロセスは他のマシン上で起動する可能性がある
  • プロセスはIOではなくProcessモナド内で動作し、すべてのメッセージパッシングはProcessモナド内で行う

メッセージの型

master, slave間でやりとりするメッセージは以下の形で定義する。

{-# Language DeriveDataTypeable #-}
{-# Language DeriveGeneric #-}     

data Message = Ping ProcessId
             | Pong ProcessId
    deriving (Typeable, Generic)       

instance Binary Message  

Ping, PongにProcessIdを含めたのはそれぞれ誰から来たものかを受け取り側が知るため。

Messageがネットワークワーク越しに送信される際は一度バイトデータに変換され、受け取り側で再度haskellのデータに組み直される。(serialize, deserialize)

そのためプロセス間でやりとりされるデータはすべてBinaryのインスタンスにする必要がある。変換の方法はBinaryのインスタンスを自分で導出することで定義できるが基本的には特殊な形にする必要がないため自動導出してほしい。DeriveDataTypeable、DeriveGenericはそのために必要なプラグマ。

BinaryとTypableはSerializableとしてパッケージングされておりControl.Distributed.Process.Serializableにて以下のように定義されている。

class (Binary a, Typeable a) => Serializable a
instance (Binary a, Typeable a) => Serializable a

slave側の処理

slave側でpingが届くのを待ち、それに対してpongを応答する処理。

pingServer :: Process ()
pingServer = do
    Ping from <- expect                              
    say $ printf "ping received from %s" (show from) 
    mypid <- getSelfPid                              
    send from (Pong mypid)                          

$(remotable ['pingServer])

まず、関数はProcessモナド内で処理していることに注意。expectによって届いた値をPing fromとしてパターンマッチで取りだす。

各プロセスはチャネルを一つずつ持っていてどんな型のメッセージでも受け取る。expectは型を指定してチャネルから値を取り出す関数である。

expectはreadと似ているが、文脈通りの型にparseできない場合に失敗するreadと違い、expectは型が異なった場合に処理をせずに飛ばす。そしてチャネル内で最初に見つかった指定の型の値を取り出す。型に合わないメッセージはチャネルに残りつづける。また、チャネルに指定の型がなかった場合、expectはブロックすることになるためデッドロックしないよう注意が必要である。

sendでProcessIdを指定してメッセージを送っている。型は以下。

send :: (Serializable a) => ProcessId -> a -> Process ()

最後のremotableは他のマシン上で指定した関数を実行するために必要な設定を提供している。

master側の処理

次にmaster側の処理について。

master :: Process ()
master = do
  node <- getSelfNode                               

  say $ printf "spawning on %s" (show node)
  pid <- spawn node $(mkStaticClosure 'pingServer)  

  mypid <- getSelfPid                               
  say $ printf "sending ping to %s" (show pid)
  send pid (Ping mypid)                             

  Pong _ <- expect                                  
  say "pong."

  terminate        

masterではslaveノードの起動、pingの送信、pongの受信、プロセス終了という順で処理を行う。

spawnの型は以下。

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

NodeIdはプロセスを立ちあげたい対象のノードのID。今回はgetSelfNodeで取得した値を使うことで自身のノードとしている。

2つ目の引数Closure (Process ())で行いたい処理を指定。本質的にはProcess ()を相手のノードに伝えればよいが簡単にはシリアライズすることができない。大量のデータへの参照があったりローカルノード内で共有するような値を持っていたり。そこでClosureでシリアライズの処理を行っている。ちなみにmkStaticClosureは引数なしの関数をClosureにしてくれる。

slaveノードを起動したあと、masterからpingをsendし、expectで返答の受信を待つ。

main関数

mainは以下のようになる。

main :: IO ()
main = distribMain (\_ -> master) Main.__remoteTable

distribMainはdistributed-process-simplelocalnetの機能を使ったutility関数でmasterやslaveの起動を行っている。
https://github.com/simonmar/parconc-examples/blob/master/DistribUtils.hs

Main.__remoteTableはremotableによって生成された関数で他のマシンで関数を実行するためのメタデータを持っている。

実行結果は以下のようになる。

$ ./ping
pid://localhost:44444:0:3: spawning on nid://localhost:44444:0
pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4
pid://localhost:44444:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44444:0:3: pong.

複数ノードでping/pong

先ほどの例を拡張して複数ノードとping/pongのやりとりをする例。

存在するすべてのノード上でプロセスを生成してping/pongする。

master :: [NodeId] -> Process ()                    
master peers = do

  ps <- forM peers $ \nid -> do                     
          say $ printf "spawning on %s" (show nid)
          spawn nid $(mkStaticClosure 'pingServer)

  mypid <- getSelfPid

  forM_ ps $ \pid -> do                              
    say $ printf "pinging %s" (show pid)
    send pid (Ping mypid)

  waitForPongs ps                                   

  say "All pongs successfully received"
  terminate

waitForPongs :: [ProcessId] -> Process ()           
waitForPongs [] = return ()
waitForPongs ps = do
  m <- expect
  case m of
    Pong p -> waitForPongs (filter (/= p) ps)
    _  -> say "MASTER received ping" >> terminate

まず存在するノードの発見はdistributed-processのフレームワーク上で行ってくれる。masterはそこで見つかったノードを[NodeId]として引数にとる。

spawnやsendの先ほどとの違いはforMですべてのノードに対して行うという点だけ。

waitForPongsではメッセージの受信を立ちあげたすべてのプロセスから行うためのヘルパー。受け取ったメッセージの送り元のプロセスを除いていき、[ProcessId]のリストが空になれば全て受信し終えたので終了。

main関数は以下のようになる。

main :: IO ()
main = distribMain master Main.__remoteTable

[NodeId]をフレームワーク側から受け取るようにしたため_ -> masterの部分だけ修正した。

実行

複数ノードを立ち上げるには以下のようにする。

$ ./ping-multi slave 44445 &
$ ./ping-multi slave 44446 &

distribMain関数内では引数の形に合わせてdistributed-process-simplelocalnetの関数(startMaster, startSlave)が呼ばれており、slaveの場合ポート番号を指定することでそこで新しいノードを立ち上げることができる。バックグラウンドプロセスにしないと応答が返ってこないので注意。詳しくはこちらで。
https://github.com/simonmar/parconc-examples/blob/master/DistribUtils.hs

実行結果はこんな感じ。

$ ./ping-multi
pid://localhost:44444:0:3: spawning on nid://localhost:44445:0
pid://localhost:44444:0:3: spawning on nid://localhost:44446:0
pid://localhost:44444:0:3: pinging pid://localhost:44445:0:4
pid://localhost:44444:0:3: pinging pid://localhost:44446:0:4
pid://localhost:44446:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44445:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44444:0:3: All pongs successfully received

今回は同じホスト上で別のノードを立ちあげたが、別ホストにノードを立ち上げることもできる。その際はノード立ちあげ時にホストも指定するだけ。

$ ./ping-multi slave 192.168.100.2 44445 &

その他は何も変更なく指定したホスト上で実行できる。

typed channel

ping/pongの例では一つのチャネルに他の全てのプロセスからのメッセージを受信することになっていた。しかし、それだと以下のようなデメリットがある。

  • expectでチャネルにあるメッセージの中で型が一致するものを毎回探すことになるため動作が遅い
  • メッセージの中に明示的にどのプロセスから来たかの情報を含める必要がある

これらの問題を解決できる方法としてtyped channelがある。typed channelで用いるAPIはこちらのようになっている。

data SendPort a     -- instance of Typeable, Binary
data ReceivePort a

newChan :: Serializable a => Process (SendPort a, ReceivePort a)

sendChan :: Serializable a => SendPort a -> a -> Process ()

receiveChan :: Serializable a => ReceivePort a -> Process a

まず、送信用・受信用のポートがデータ型として用意されている。SendPort aはSerializableであることに注意。

newChanで新しい一組の送信受信用ポートを取得でき、SendPortから送信したメッセージは対になるReceivePortから受信することができる。sendChan, receiveChanでそれぞれのポートを指定してメッセージの送受信を実行する。

これを使ってやりたいことは以下。

  • クライアント側(メッセージを送りたい側)でnewChanによりチャネルを生成
  • サーバ側(メッセージを受け取りたい側)へSendPortと共にメッセージを送る
  • サーバ側では送られてきたSendPortを使ってメッセージに返答する
  • クライアント側ではReceivePortを見ることでメッセージを受信する

typed channelのよいところはこのチャネルに送られてくるメッセージが誰からのものかというが明らかな点である。ProcessIdなどをメッセージに含める必要はない。また、やりとりされるメッセージの型も指定されている。

ping/pongの例をtyped channelを使って書き直すと以下のようになる。
(もともと本にあるコードとは少し変えている)

data Message = Ping (SendPort String) deriving (Typeable, Generic)                         
                                                                                           
instance Binary Message                                                                    
                                                                                           
pingServer :: Process ()                                                                   
pingServer = do                                                                            
  Ping chan <- expect                                                                      
  mypid <- getSelfPid                                                                      
  sendChan chan $ "pong from " ++ show mypid                                               
                                                                                           
$(remotable ['pingServer])                                                                 
                                                                                           
master :: [NodeId] -> Process ()                                                           
master peers = do                                                                          
  ps <- forM peers $ \nid -> do                                                            
    say $ printf "spawning on %s" (show nid)                                               
    spawn nid $(mkStaticClosure 'pingServer)                                               
                                                                                           
  ports <- forM ps $ \pid -> do                                                            
    say $ printf "pinging %s" (show pid)                                                   
    (sp, rp) <- newChan                                                                    
    send pid $ Ping sp                                                                     
    return rp                                                                              
                                                                                           
  forM_ ports $ \port -> do                                                                
    m <- receiveChan port                                                                  
    say $ show m                                                                           
    return ()                                                                              
                                                                                           
  say "All pongs received"                                                                 
  terminate                                                                                
                                                                                           
main :: IO ()                                                                              
main = distribMain master Main.__remoteTable                    

MessageにはProcessIdを含める必要がない。今回はStringを含めるようにした。

pingServerでは送られてくるSendPortをexpectで受け取り、それを使ってmasterプロセスに対して返答する。

masterの処理はspawnするまでは変更なし。各プロセスに対してメッセージを送る際にそれぞれチャネルを生成してSendPortを送る。相手からのメッセージを受信はreceiveChanでReceivePortに来る値を読むだけ。

ここで一つ気になるのはmasterからslaveへのメッセージ送信時はtyped channelを使わないのかという点。注意しなければいけないのはReceivePort aはSerializableになっていないということ。ReceivePortを他のプロセスに送れてしまうと送信したメッセージを複数の相手にルーティングする必要が出てくるため、実装が複雑化してしまうと考えられるためそれを防ぐためにできないようになっている。

なので、typed channelを使って双方向に通信したい場合、spawn実行時にslave側で新たにtyped channelを生成してそのSendPortを送り返してもらう必要がある。ただ、今回の例ではping/pongを行うために無駄に複雑な処理をする必要はないためmasterからslaveへの送信はsendを使ってダイレクトに行うようにしている。

エラーハンドリング

プロセス内でエラーが発生した場合の対処について。

ping/pongの例の中でexpectの際に以下のようにパターンマッチして値を取り出している。

Ping chan <- expect                                                                      

ここで送られてきた値がPongだとするとパターンマッチに失敗してプロセスが死ぬ。その際、それをmaster側で検知したい。

それにはwithMonitorを使う。

withMonitor pid $ do                                                    
  send pid $ Pong sp                                                    
  receiveWait                                                           
    [ match $ \(ProcessMonitorNotification _ deadpid reason) -> do      
        say $ printf "process %s died: %s" (show deadpid) (show reason) 
        terminate                                                       
    ]                                                                   

withMonitorはpidと監視のためのアクション(Process ())をとる。アクション内ではreceiveWaitとmatchによって検知したい状態を指定する。

receiveWaitやmatchの型は以下のようになっている。また、実はexpectはreceiveWaitを使って実装されている。

receiveWait :: [Match b] -> Process b
match :: forall a b. Serializable a => (a -> Process b) -> Match b

expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]

なのでreceiveWait内でexpectの代わりに正規の値を待つことも可能。

ProcessMonitorNotificationはエラーを起こしたプロセスのIDやその原因を含むデータ型。

data ProcessMonitorNotification
  = ProcessMonitorNotification MonitorRef ProcessId DiedReason

data DiedReason
  = DiedNormal             -- Normal termination
  | DiedException !String  -- The process exited with an exception
  | DiedDisconnect         -- We got disconnected from the process node
  | DiedNodeDown           -- The process node died
  | DiedUnknownId          -- Invalid (process/node/channel) identifier

DiedReasonとして原因が返ってくるのでなぜ死んだのかを判別する助けになる。ただ、これだけだとコードのどこを修正すればよいかわからないことも多いが。

typed channelを使った実装をやってみた際、なかなかうまくいかずにデバッグする方法を調べたのでそれについては別の記事にまとめた。
http://jsapachehtml.hatenablog.com/entry/2015/03/22/133345

分散チャットサーバ

第12章で作成したチャットサーバを分散環境で使えるように拡張する。具体的には複数のマシンでチャットサーバを起動させてクライアントはどれにつなげても互いにチャットができるという形。

まずはping/pongの例でやったようにmaster/slaveの役割を持つ形で実装する。その後、各サーバが対等な関係になるよう手を加える。

第12章での実装と大きく違ってくるのは以下の2点。

  • ClientがRemoteClient, LocalClientに分離される
  • サーバ間でのメッセージやりとりが存在する

修正点はすべて上記2点に関わる部分であり、そのためのデータ型定義とLocal, Remoteへのメッセージの投げ分けとなる。

サーバの定義についてはわかりづらい点があったのでそれだけ書いておく。

data Server = Server
  { clients   :: TVar (Map ClientName Client)
  , proxychan :: TChan (Process ())
  , servers   :: TVar [ProcessId]
  , spid      :: ProcessId
  }

第12章では単にclientのMapを保持しているのみだったが今回は他にもいくつかのデータをもつ。わかりづらいのがproxychanであり、Process ()を持ったTChanである。これは他のサーバへのメッセージを一旦持っておくためのチャネル。

proxychanにデータを入れる関数はsendRemote。入ったデータを処理していくのはproxy。

sendRemote :: Server -> ProcessId -> PMessage -> STM ()
sendRemote Server{..} pid pmsg = writeTChan proxychan (send pid pmsg)

proxy :: Server -> Process ()
proxy Server{..} = forever $ join $ liftIO $ atomically $ readTChan proxychan

sendRemoteは名前の通りではなく、他のサーバへメッセージを投げるわけではない。ProcessIdとPMessageを指定してsendするアクションを自身のproxychanに入れておく。すると、proxyによって読み取られてProcess ()が実行され、ここで実際に他のサーバに対してメッセージが送られる。

サーバ間のメッセージ送信はdistributed-processの機能を用いて行うのでProcessモナド内の処理となる。しかし、サーバの処理自体は通常のforkIOしたスレッドで行われている。つまりIOモナド内で行われているためProcessモナドを実行できない。

そのため、一旦proxychanに溜めておいてProcessモナドを実行できるスレッドから読み取ることでそれを実行している。各スレッドはchatServer関数で作られる。

chatServer :: Int -> Process ()
chatServer port = do
  server <- newServer []
  liftIO $ forkIO (socketListener server port)           
  spawnLocal (proxy server)                              
  forever $ do m <- expect; handleRemoteMessage server m

socketListenerはクライアントからの接続を監視してaccept、talkという流れ処理をする(12章での処理とほぼ同じで通常のIO処理)。spawnLocalで作成したスレッドがproxyを実行している。そして、メインスレッドはforeverで他のサーバから来たメッセージを受け取って処理していく。

コードのその他の部分については本 or こちらを参照。
https://github.com/simonmar/parconc-examples/blob/master/distrib-chat/chat.hs

ちなみに本の通りに実装しただけだとまだ動かないので注意。Clientが2つ分離されたりしたことによりtalkやcheckAddClientなどいくつかの関数を修正する必要があった。ただ、コンパイラの出すエラーのおかげで修正すべき部分はわかるので↑のコードを見つつやれば問題ない。

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング