第14章の最後の部分はexeciseという位置づけになっている。ヒントが示されているのでそれに従って実装してみた。
英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch14.html
章の前半についてまとめたものはこちら
distributed-process - 分散処理(Parallel and Concurrent Programming in Haskell Chapter 14) - MEMOcho-
distributed chat serverのアーキテクチャ変更
chatサーバをmaster-slaveの形で実装したがこれだとserverノードの追加や削除はできない。ここではそれを可能にするためmaster-slaveという役割をなくしすべてのサーバが平等な役割を担うようにする。
まずDistribeUtilを使うのをやめて以下のような形にmainを修正する。
main = do [port, chat_port] <- getArgs backend <- initializeBackend "localhost" port (Main.__remoteTable initRemoteTable) node <- newLocalNode backend Node.runProcess node (master backend chat_port)
サーバ同士が通信するためのポートとクライアントが接続するためのポートをそれぞれ引数で渡す。newLocalNodeでノードを生成してクライアント用のポートと共にmasterの処理へ渡す。runProcessでProcessモナドを実行できる。
次にmasterの処理。
master :: Backend -> Int -> Process () master backend port = do peers <- liftIO $ findPeers backend 1000 mypid <- getSelfPid register "chatServer" mypid forM_ peers $ \nid -> do whereisRemoteAsync nid "chatServer" chatServer port
まずは存在する他のノードを探すためfindPeers。これは同期処理なのでタイムアウトを指定する。上記だと1ms。
次に自分のProcessIdをchatServerという名前で登録する。(register)
whereisRemoteAsyncは名前の通り非同期処理で存在する他のProcessを知るため、各ノードに対してリクエストを投げる。これに対する返答はchatServer内のメインループで受け取る。
chatServer :: Int -> Process () chatServer port = do server <- newServer [] liftIO $ forkIO (socketListener server port) spawnLocal $ proxy server forever $ do receiveWait [ match $ handleRemoteMessage server , match $ handleMonitorNotification server , match $ handleWhereIsReply server , matchAny $ \_ -> return () ]
expectで処理していたところをreceiveWaitを用いてメッセージ毎に処理分けする。whereisRemoteAsyncに対する返答はhandleWhereIsReplyで受ける。
handleWhereIsReply :: Server -> WhereIsReply -> Process () handleWhereIsReply _ (WhereIsReply _ Nothing) = return () handleWhereIsReply server@Server{..} (WhereIsReply _ (Just pid)) | pid == spid = return () | otherwise = liftIO $ atomically $ do addServer server pid startMonitor server pid sendRemote server pid $ MsgServerInfo spid teachMyClient server pid
ちなみにWhereIsReplyの型はこちら。
data WhereIsReply = WhereIsReply String (Maybe ProcessId)
各ノードから返ってくる返答を見てNothingであればchatServerという名前のProcessは存在しないので無視。JustならそのProcessIdを確認する。
注意点は自分のノードからも返答が返ってくること。なので自分のProcessIdだった場合も無視。他のノードのProcessIdだった場合は以下の処理。
1. 自分の持っているServerデータに相手を登録
2. モニタリング開始(相手のProcessが死んだことの検知)
3. 相手に自分のProcessIdを通知
4. 相手に自分のlocal clientを通知
MsgServerInfoは今回新しく定義したPMessage。自分のProcessIdを教えるためのサーバ間メッセージ。
addServer, startMonitor, teachMyClientはそれぞれ名前の通りの処理。clientの通知には自分クライアントの数だけMsgNewClientを投げるようにすることで新たに定義する手間を省いた。
handleRemoteMessageはほぼmaster-slave時の実装にMsgServerInfoへの処理を加えるだけ。
handleRemoteMessage :: Server -> PMessage -> Process () handleRemoteMessage server@Server{..} msg = liftIO $ atomically $ ...(省略) MsgServerInfo pid -> do addServer server pid teachMyClient server pid
自分のクライアントを相手に教える必要があるためこちらからもteachMyClientを実行する。
追加で実装したのはこれくらい。あと、debugのために登録されているクライアント名など表示するようなコマンドを追加したりしたがそれは本筋とは外れるので省略
コード全体はこちら。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/DistribChatNoSlave.hs
簡易的なKVSの実装
とても簡易的な実装ではあるが、4段階に分けてKVSを実装していく。最終的にはノードが一つ落ちても動く状態にする。
こちらに本の著者が実装したクライアントがあるのでそれが動作するような形にしていく。
https://github.com/simonmar/parconc-examples/blob/master/distrib-db/db.hs
(1) シングルノードでdb.hsを実行
今回実装するKVSはget, setのみができるもの。各インターフェースはこれ。
type Database type Key = String type Value = String createDB :: Process Database set :: Database -> Key -> Value -> Process Bool get :: Database -> Key -> Process (Maybe Value)
やることは2つ。
- createDBの実行でローカルにDBのプロセスを立ち上げる
- get, setは上記で立ちあげたプロセスとメッセージを通じてやりとりするよう実装
db.hsを見るとcreateDBにpeersを渡してDatabase型のデータ(db)を得ている。そこで得たdbをgetやsetに渡すことで通信するということらしい。なのでDatabaseは単純にProcessIdとして、get/setではそのProcessへのsendという形にする。
type Database = ProcessId
メッセージを見てDBプロセスが知るべきことは3つ。
- 実行するコマンド
- 操作するキーと値
- レスポンスを返す相手
なのでRequestとして以下のように定義。
data Request = ReqOp Command (SendPort Response) deriving (Typeable, Generic) instance Binary Request data Command = Get Key | Set Key Value deriving (Typeable, Generic) instance Binary Command
レスポンスの相手判別はチャネルを使って解決。
また、レスポンスはインターフェースの定義から以下のように決める。
data Response = ResGetResult (Maybe Value) | ResSetResult Bool deriving (Typeable, Generic)
setはRequestのデータを生成してそれをsendするだけ。あとはreceiveChanで返答を待つ。
set :: Database -> Key -> Value -> Process Bool set db k v = do (sp, rp) <- newChan send db $ ReqOp (Set k v) sp ResSetResult r <- receiveChan rp return r
getについてはRequest型のコンストラクタを変えるのみ。
createDBはクライアントから受け取ったリクエストに応じてget, setを実行しレスポンスを返す。DBの状態を更新していくのはMVarで表現した。今回はマルチスレッドにはしないが。
createDB :: [NodeId] -> Process Database createDB _ = spawnLocal $ do mvar <- liftIO $ newMVar M.empty forever $ do m <- expect handleMessage m mvar handleMessage :: Request -> MVar (M.Map Key Value) -> Process () handleMessage (ReqOp (Get k) port) mvar = do map <- liftIO $ readMVar mvar let r = M.lookup k map sendChan port (ResGetResult r) handleMessage (ReqOp (Set k v) port) mvar = do map <- liftIO $ takeMVar mvar let map' = M.insert k v map liftIO $ putMVar mvar map' sendChan port (ResSetResult $ (M.size map + 1) == M.size map')
状態を更新していくだけのことを考えたらforeverでループするのではなく、handleMessageを再帰的に使って処理する方がよい。
最後にRemoteTableは特に必要ないのでrcdataをidとしておく。
rcdata :: RemoteTable -> RemoteTable rcdata = id
これで第1段階の実装は終了。
(2) DBプロセスを複数のノードに配置
DBそのものはworkerとして別のノードで動かし、masterプロセスでそれらへリクエストを転送する形にする。
キーのシャーディングは実用では大変重要だが今回と特に気にせずキーの頭文字のmodで判別。
変更するのはほとんどcreateDBのみ。
createDB :: [NodeId] -> Process Database createDB peers = spawnLocal $ do pids <- forM peers $ \nid -> do say $ "spawn on: " ++ show nid spawn nid $ $(mkStaticClosure 'worker) let forward = forever $ do req <- expect let index = findWorker req $ length pids send (pids !! index) req if length pids == 0 then worker else forward
各ノード上でDBプロセスを立ち上げる。そしてキーによって転送すべきプロセスを決定してリクエストを転送する。ノードがない場合は第1段階と同じように動作するようにした。
本ではtemplate haskellの制約のためworkerの処理を別モジュールに分ける必要があると書いてある。これは関数を書く順番を気にする必要があるためと考えられる。例えば上記のコードの場合、workerという関数をcreateDBより前に書いておく必要がある。そうしないとコンパイルが通らない。そういったことを気にするよりもモジュールを分けてしまったほうがよいということかと。
ちなみにworkerの関数は第1段階のcreateDBの処理と一緒。
その他の処理はこちらを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/Kvs2/KvsMaster.hs
(3) masterプロセスにてworkerのダウンを検知
createDB内の処理でメッセージを受け取っていた部分(expect)でreceiveWaitを使うことで処理を分ける。Request or ProcessMonitorNotificationか。その他に変更はない。
createDB :: [NodeId] -> Process Database createDB peers = spawnLocal $ do pids <- forM peers $ \nid -> do say $ "spawn on: " ++ show nid spawn nid $ $(mkStaticClosure 'worker) _ <- mapM_ monitor pids let handle = forever $ do receiveWait [ match handleNotification , match $ handleRequest pids , matchAny $ \msg -> do say $ printf "received unknownMessage: %s" (show msg) ] if null pids then worker else handle handleNotification :: ProcessMonitorNotification -> Process () handleNotification (ProcessMonitorNotification ref pid reason) = do say $ printf "died worker %s by %s" (show pid) (show reason) unmonitor ref handleRequest :: [ProcessId] -> Request -> Process () handleRequest pids req = do let index = findWorker req $ length pids send (pids !! index) req
(4) 各workerをグループ化して冗長性を確保
workerを2つ一組にして同じデータを持たせておく。それぞれに対してset, getを転送することで実現し、getのとき2つ同じ結果が返ってくることは気にしない。
いままではメインループをforeverで回していたが、Requestを処理する関数を再帰的に呼ぶ形に修正する。これは死んだプロセスを転送先から排除するために各リクエストの処理の結果を渡していくため。ここもMVarでやってもよいが無駄なパフォーマンスの劣化になってしまう。
また、2つのプロセスをグループ化する処理は前から2つずつ組にしていくだけ。転送先の判定は先ほどまでと同じでキーの頭文字。対象となったグループ内のプロセスすべて(2つ)に転送する。
createDB :: [NodeId] -> Process Database createDB peers = spawnLocal $ do pids <- forM peers $ \nid -> do say $ "spawn on: " ++ show nid spawn nid $ $(mkStaticClosure 'worker) _ <- mapM_ monitor pids let groups = groupPair pids if null pids then worker else waitMessage groups waitMessage :: [[ProcessId]] -> Process () waitMessage groups = receiveWait [ match $ handleNotification groups , match $ handleRequest groups , matchAny $ \msg -> do say $ printf "received unknownMessage: %s" (show msg) ]
その他の処理についてはこちらを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Server/Kvs4/KvsMaster.hs
- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る
- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る