distributed-proccessのexecise(Parallel and Concurrent Programming in Haskell Chapter 14)

第14章の最後の部分はexeciseという位置づけになっている。ヒントが示されているのでそれに従って実装してみた。

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch14.html

章の前半についてまとめたものはこちら
distributed-process - 分散処理(Parallel and Concurrent Programming in Haskell Chapter 14) - MEMOcho-

distributed chat serverのアーキテクチャ変更

chatサーバをmaster-slaveの形で実装したがこれだとserverノードの追加や削除はできない。ここではそれを可能にするためmaster-slaveという役割をなくしすべてのサーバが平等な役割を担うようにする。

まずDistribeUtilを使うのをやめて以下のような形にmainを修正する。

main = do
 [port, chat_port] <- getArgs
 backend <- initializeBackend "localhost" port
                              (Main.__remoteTable initRemoteTable)
 node <- newLocalNode backend
 Node.runProcess node (master backend chat_port)

サーバ同士が通信するためのポートとクライアントが接続するためのポートをそれぞれ引数で渡す。newLocalNodeでノードを生成してクライアント用のポートと共にmasterの処理へ渡す。runProcessでProcessモナドを実行できる。

定義はこちら
http://hackage.haskell.org/package/distributed-process-0.5.3/docs/Control-Distributed-Process-Node.html

次にmasterの処理。

master :: Backend -> Int -> Process ()
master backend port = do
  peers <- liftIO $ findPeers backend 1000
  mypid <- getSelfPid
  register "chatServer" mypid
  forM_ peers $ \nid -> do
    whereisRemoteAsync nid "chatServer"
  
  chatServer port 

まずは存在する他のノードを探すためfindPeers。これは同期処理なのでタイムアウトを指定する。上記だと1ms。

次に自分のProcessIdをchatServerという名前で登録する。(register)

whereisRemoteAsyncは名前の通り非同期処理で存在する他のProcessを知るため、各ノードに対してリクエストを投げる。これに対する返答はchatServer内のメインループで受け取る。

chatServer :: Int -> Process ()
chatServer port = do
  server <- newServer []
  liftIO $ forkIO (socketListener server port)
  spawnLocal $ proxy server
  forever $ do
    receiveWait
      [ match $ handleRemoteMessage server
      , match $ handleMonitorNotification server
      , match $ handleWhereIsReply server
      , matchAny $ \_ -> return ()
      ]

expectで処理していたところをreceiveWaitを用いてメッセージ毎に処理分けする。whereisRemoteAsyncに対する返答はhandleWhereIsReplyで受ける。

handleWhereIsReply :: Server -> WhereIsReply -> Process ()
handleWhereIsReply _ (WhereIsReply _ Nothing) = return ()
handleWhereIsReply server@Server{..} (WhereIsReply _ (Just pid)) 
  | pid == spid = return ()
  | otherwise = liftIO $ atomically $ do
      addServer server pid
      startMonitor server pid
      sendRemote server pid $ MsgServerInfo spid
      teachMyClient server pid 

ちなみにWhereIsReplyの型はこちら。

data WhereIsReply = WhereIsReply String (Maybe ProcessId)

各ノードから返ってくる返答を見てNothingであればchatServerという名前のProcessは存在しないので無視。JustならそのProcessIdを確認する。

注意点は自分のノードからも返答が返ってくること。なので自分のProcessIdだった場合も無視。他のノードのProcessIdだった場合は以下の処理。
1. 自分の持っているServerデータに相手を登録
2. モニタリング開始(相手のProcessが死んだことの検知)
3. 相手に自分のProcessIdを通知
4. 相手に自分のlocal clientを通知

MsgServerInfoは今回新しく定義したPMessage。自分のProcessIdを教えるためのサーバ間メッセージ。

addServer, startMonitor, teachMyClientはそれぞれ名前の通りの処理。clientの通知には自分クライアントの数だけMsgNewClientを投げるようにすることで新たに定義する手間を省いた。

handleRemoteMessageはほぼmaster-slave時の実装にMsgServerInfoへの処理を加えるだけ。

handleRemoteMessage :: Server -> PMessage -> Process ()           
handleRemoteMessage server@Server{..} msg = liftIO $ atomically $ 
  ...(省略)
  MsgServerInfo pid -> do    
    addServer server pid     
    teachMyClient server pid 

自分のクライアントを相手に教える必要があるためこちらからもteachMyClientを実行する。

追加で実装したのはこれくらい。あと、debugのために登録されているクライアント名など表示するようなコマンドを追加したりしたがそれは本筋とは外れるので省略

コード全体はこちら。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/DistribChatNoSlave.hs

簡易的なKVSの実装

とても簡易的な実装ではあるが、4段階に分けてKVSを実装していく。最終的にはノードが一つ落ちても動く状態にする。

こちらに本の著者が実装したクライアントがあるのでそれが動作するような形にしていく。
https://github.com/simonmar/parconc-examples/blob/master/distrib-db/db.hs

(1) シングルノードでdb.hsを実行

今回実装するKVSはget, setのみができるもの。各インターフェースはこれ。

type Database
type Key   = String
type Value = String

createDB :: Process Database
set      :: Database -> Key -> Value -> Process Bool
get      :: Database -> Key -> Process (Maybe Value)

やることは2つ。

  • createDBの実行でローカルにDBのプロセスを立ち上げる
  • get, setは上記で立ちあげたプロセスとメッセージを通じてやりとりするよう実装

db.hsを見るとcreateDBにpeersを渡してDatabase型のデータ(db)を得ている。そこで得たdbをgetやsetに渡すことで通信するということらしい。なのでDatabaseは単純にProcessIdとして、get/setではそのProcessへのsendという形にする。

type Database = ProcessId

メッセージを見てDBプロセスが知るべきことは3つ。

  • 実行するコマンド
  • 操作するキーと値
  • レスポンスを返す相手

なのでRequestとして以下のように定義。

data Request = ReqOp Command (SendPort Response)
  deriving (Typeable, Generic)

instance Binary Request

data Command = Get Key
             | Set Key Value
  deriving (Typeable, Generic)

instance Binary Command

レスポンスの相手判別はチャネルを使って解決。

また、レスポンスはインターフェースの定義から以下のように決める。

data Response = ResGetResult (Maybe Value)
              | ResSetResult Bool
  deriving (Typeable, Generic)

setはRequestのデータを生成してそれをsendするだけ。あとはreceiveChanで返答を待つ。

set :: Database -> Key -> Value -> Process Bool
set db k v = do
  (sp, rp) <- newChan
  send db $ ReqOp (Set k v) sp
  ResSetResult r <- receiveChan rp
  return r

getについてはRequest型のコンストラクタを変えるのみ。

createDBはクライアントから受け取ったリクエストに応じてget, setを実行しレスポンスを返す。DBの状態を更新していくのはMVarで表現した。今回はマルチスレッドにはしないが。

createDB :: [NodeId] -> Process Database
createDB _ = spawnLocal $ do
  mvar <- liftIO $ newMVar M.empty
  forever $ do
    m <- expect
    handleMessage m mvar

handleMessage :: Request -> MVar (M.Map Key Value) -> Process ()

handleMessage (ReqOp (Get k) port) mvar = do
  map <- liftIO $ readMVar mvar
  let r = M.lookup k map
  sendChan port (ResGetResult r)

handleMessage (ReqOp (Set k v) port) mvar = do
  map <- liftIO $ takeMVar mvar
  let map' = M.insert k v map
  liftIO $ putMVar mvar map'
  sendChan port (ResSetResult $ (M.size map + 1) == M.size map')

状態を更新していくだけのことを考えたらforeverでループするのではなく、handleMessageを再帰的に使って処理する方がよい。

最後にRemoteTableは特に必要ないのでrcdataをidとしておく。

rcdata :: RemoteTable -> RemoteTable
rcdata = id

これで第1段階の実装は終了。

(2) DBプロセスを複数のノードに配置

DBそのものはworkerとして別のノードで動かし、masterプロセスでそれらへリクエストを転送する形にする。

キーのシャーディングは実用では大変重要だが今回と特に気にせずキーの頭文字のmodで判別。

変更するのはほとんどcreateDBのみ。

createDB :: [NodeId] -> Process Database          
createDB peers = spawnLocal $ do                  
  pids <- forM peers $ \nid -> do                 
    say $ "spawn on: " ++ show nid                
    spawn nid $ $(mkStaticClosure 'worker)        
                                                  
  let forward = forever $ do                      
        req <- expect                             
        let index = findWorker req $ length pids  
        send (pids !! index) req                  
                                                  
  if length pids == 0                             
    then worker                                   
    else forward                                  

各ノード上でDBプロセスを立ち上げる。そしてキーによって転送すべきプロセスを決定してリクエストを転送する。ノードがない場合は第1段階と同じように動作するようにした。

本ではtemplate haskellの制約のためworkerの処理を別モジュールに分ける必要があると書いてある。これは関数を書く順番を気にする必要があるためと考えられる。例えば上記のコードの場合、workerという関数をcreateDBより前に書いておく必要がある。そうしないとコンパイルが通らない。そういったことを気にするよりもモジュールを分けてしまったほうがよいということかと。

ちなみにworkerの関数は第1段階のcreateDBの処理と一緒。

その他の処理はこちらを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/Kvs2/KvsMaster.hs

(3) masterプロセスにてworkerのダウンを検知

createDB内の処理でメッセージを受け取っていた部分(expect)でreceiveWaitを使うことで処理を分ける。Request or ProcessMonitorNotificationか。その他に変更はない。

createDB :: [NodeId] -> Process Database
createDB peers = spawnLocal $ do
  pids <- forM peers $ \nid -> do
    say $ "spawn on: " ++ show nid
    spawn nid $ $(mkStaticClosure 'worker)

  _ <- mapM_ monitor pids

  let handle = forever $ do
        receiveWait 
          [ match handleNotification
          , match $ handleRequest pids
          , matchAny $ \msg -> do
              say $ printf "received unknownMessage: %s" (show msg)
          ]

  if null pids
    then worker
    else handle

handleNotification :: ProcessMonitorNotification -> Process ()
handleNotification (ProcessMonitorNotification ref pid reason) = do
  say $ printf "died worker %s by %s" (show pid) (show reason)
  unmonitor ref

handleRequest :: [ProcessId] -> Request -> Process ()
handleRequest pids req = do
  let index = findWorker req $ length pids
  send (pids !! index) req

(4) 各workerをグループ化して冗長性を確保

workerを2つ一組にして同じデータを持たせておく。それぞれに対してset, getを転送することで実現し、getのとき2つ同じ結果が返ってくることは気にしない。

いままではメインループをforeverで回していたが、Requestを処理する関数を再帰的に呼ぶ形に修正する。これは死んだプロセスを転送先から排除するために各リクエストの処理の結果を渡していくため。ここもMVarでやってもよいが無駄なパフォーマンスの劣化になってしまう。

また、2つのプロセスをグループ化する処理は前から2つずつ組にしていくだけ。転送先の判定は先ほどまでと同じでキーの頭文字。対象となったグループ内のプロセスすべて(2つ)に転送する。

createDB :: [NodeId] -> Process Database
createDB peers = spawnLocal $ do
  pids <- forM peers $ \nid -> do
    say $ "spawn on: " ++ show nid
    spawn nid $ $(mkStaticClosure 'worker)

  _ <- mapM_ monitor pids

  let groups = groupPair pids

  if null pids
    then worker
    else waitMessage groups

waitMessage :: [[ProcessId]] -> Process ()
waitMessage groups = receiveWait
  [ match $ handleNotification groups
  , match $ handleRequest groups
  , matchAny $ \msg -> do
      say $ printf "received unknownMessage: %s" (show msg)
  ]

その他の処理についてはこちらを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/Kvs4/KvsMaster.hs

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング