読者です 読者をやめる 読者になる 読者になる

簡易的な並行ネットワークサーバ(Parallel and Concurrent Programming in Haskell Chapter 12前半)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch12.html

このページで紹介しているコードはほとんど上記ページからの引用である。


これまで章で出てきた要素を使ってサーバアプリケーションを作成する

サーバ型のアプリケーションは同時にたくさんのクライアントの要求に答える必要がある。スレッドというのはそれを抽象化するのにとても適している。一つのクライアントとの処理を書き、それをforkし複数スレッドにするだけで簡単に多くのクライアントを相手にすることができる。

簡単なサーバ

ポート44444で数値を待ち受けて2倍して返すサーバを書く。quitが入力されると終了するという仕様。
まず一つのクライアントとの処理を書く。

talk :: Handle -> IO ()
talk h = do
  hSetBuffering h LineBuffering
  loop
  where
    loop = do
      line <- hGetLine h
      if line == "quit"
        then hPutStrLn h "finished doubling service"
        else do
          hPutStrLn h $ show $ 2 * (read line :: Integer)
          loop

クライアントのハンドルを引数にとる関数talk。LineBufferingの行は出力を行ごとに行うためのもの。デフォルトだと一定量バッファに溜まったら出力するため、インタラクティブな処理をする場合はモードを変えておく必要がある。

talkのメインの処理はloop。ハンドルから入力を受け取りquitなら終了、それ以外なら2倍した値をハンドルに対して出力して再度loopで入力を待つ。hGetLineは入力があるまでブロックする。

mainは以下のようになる。
クライアントからの接続を待ち受けて、接続があれば別スレッドでtalkを実行してまた待ち受ける状態へ戻る。

trivial = withSocketsDo $ do
  sock <- listenOn $ PortNumber $ fromIntegral port
  printf "Listening on port %d\n" port
  forever $ do
    (handle, host, port) <- accept sock
    printf "Accepted connection from %s: %s\n" host (show port)
    forkFinally (talk handle) (\_ -> hClose handle)

forkFinallyを使うことでtalkの処理が終了した際は必ずハンドルのcloseが行われる。またacceptは接続があるまでブロックする処理。

とても簡単な例だが複雑な処理をするサーバでも基本的にはこの形になる。また、--threadedと+RTS -Nオプションを指定することでマルチコアなマシンでもコードを変えずにパフォーマンスアップできる。

これが可能なのはhaskellの以下の特徴のためである。

  • haskellのスレッドは軽いため一つのクライアントに対し一つのスレッドという対応でもパフォーマンスが発揮できる
  • haskellのepollなどを用いて多重化することで効率のよいIOを実現している

もし上記のような条件が満たせないかったならシングルイベントループの形を採ることになっただろうが、その場合はtalkの処理をforkして複数クライアントに対応するようなことはできなくなる。そうなると状態を持った処理を書く際に、複数のクライアントとのやりとりについて考える必要がでてきてすぐにコードは複雑化してしまう。

epollやIOの多重化については私もよく理解していなかったがこちらにわかりやすい説明をしてくれているページがあった。
http://kamiyasu2.blog.fc2.com/blog-entry-45.html

状態をもったサーバ

先ほどの例を元にして複数のクライアントで状態を共有するようなサーバを書く。具体的な仕様は以下。

  • 共有状態としてcurrent factorを持ち、その数をかけた値を返す
  • 各クライアントは*Nという入力によってcurrent factorを変更できる
  • current factorが変更された場合はすべてのクライアントに通知する

簡単なものから4つのデザインを考える

1. 一つの大きなロック

最も簡単に思いつく形。データ型は以下のように表せる。

data State = State {
  currentFactor :: Int
, clientHandles :: [Handle]
}
newtype StateVar = StateVar (MVar State)

現在のfactorとすべてのhandleをもったデータを生成してそれの操作の際にロックを取るという形。factorの変更の際は、*Nという入力を受けたサーバスレッド(最初のサーバの例でいうtalk)が他の全てスレッドが持っているhandleに直接出力する。この場合はfactorの変更だけでなく各クライアントへの通知の間もロックを持っている必要があるためパフォーマンスが落ちることは明らか。もしロックを取らなければfactorの変更と通知が前後する可能性があり、その場合クライアントが現在のfactorを勘違いしてしまう。

2. サーバスレッド毎にチャネル

並列性を高めるために各サーバスレッドは自身に接続されているクライアントとだけ通信できるようにする。サーバスレッド同士のやりとりは各スレッドが持つチャネルを通じて行う。

data State = State {
  clientChans :: [Chan Message]
}
data Message = FactorChange Int | ClientInput String

newtype StateVar = StateVar (MVar State)

チャネルに入るデータとしてMessageを定義。factorの変更またはクライアントからのinputを表す。ただ、クライアントから直接ClientInput型のデータは受け取れないため、receiveスレッドとして別スレッドを作成してクライアントからの入力を受け付けることにする。入力を受けたreceiveスレッドがClientInput型としてチャネルにデータを追加する。

先ほどの例と比べると改善された。しかしfactorを変更する際にすべてのチャネルをまわってMessageを追加する必要があり、その際はロックが必要がある。

3. ブロードキャストチャネル

すべてのチャネルをまわってMessageを追加するという手間を削減するためにbroadcastチャネルを導入する。第7章で出てきたdupChanによってbroadcastチャネルから各スレッド用のチャネルを作成することで、broadcastチャネルに入れたデータは全てのチャネルから見えるようになる。また、broadcastチャネルにMessageを追加するだけなのでもうMVarは必要ない。

newtype State = State { broadcastChan :: Chan Int }

一つ問題なのはクライアントからのinputとbroadcastチャネルの変化の両方を検知する必要があるということ。両方の入力をマージするために今までと同様にサーバスレッド毎のチャネルが一つ必要になる。なのでここでは3つのスレッドを作る必要がある。

  • Receive Thread: クライアントからのinputをサーバスレッドのチャネルに転送するスレッド
  • Monitor Thread: broadcastチャネルへのinputをサーバスレッドのチャネルに転送するスレッド
  • Server Thread: それぞれからの入力を見て処理を行うスレッド
4. STM

STMを使うことでbroadcastチャネルまわりの処理をを改良することができる。broadcastチャネルの代わりに以下のようにする。

newtype State = State { currentFactor :: TVar Int }

STM上の処理ではTVarの変更を検知して処理を再開することができるため、broadcastチャネルを監視していたMonitor Threadは要らなくなる。

ただし、STMではIOの処理を待つことはできないためreceiveスレッド必要である。

処理の概要は以下

  • receiveスレッドは*Nというコマンドをhandleから読み取ってTChanに転送する
  • serverスレッドはTChanから値を読んでcurrent factorを保持するTVarを更新する
  • TVarの変化はSTMの機構によって他のserverスレッドが検知することになり、結果をそれぞれのクライアントへ通知する

STMを使った実装

共有状態を持たない2倍した値を返すだけのサーバを元にして実装する。

クライアントからのリクエストを受け付けるmain関数はほとんど変更しなくてよい。

main = withSocketsDo $ do
  sock <- listenOn $ PortNumber $ fromIntegral port
  printf "Listening on port %d\n" port
  factor <- atomically $ newTVar 2
  forever $ do
    (handle, host, port) <- accept sock
    printf "Accepted connection from %s: %s\n" host (show port)
    forkFinally (talk handle factor) (\_ -> hClose handle)

変更点は現在のfactorを保持するTVarを生成してtalkに渡すようにするのみ。

talkではserverスレッドとreceiveスレッドをforkする。

talk :: Handle -> TVar Integer -> IO ()
talk h factor = do
  hSetBuffering h LineBuffering
  c <- atomically newTChan
  race (server h factor c) (receive h c)
  return ()

ポイントは2つのスレッド作成にraceを使っていること。こうすることでどちらかが終了した場合にもう片方も終了することが保証できる。エラーハンドリングやリソース解放がraceに隠蔽されているのでとても簡潔に書くことができる。

receive関数は以下

receive :: Handle -> TChan String -> IO ()
receive h c = forever $ do
  line <- hGetLine h
  atomically $ writeTChan c line

クライアントからの入力を受け取ってTChanへ追加するだけ。それをforeverで回しつつける。

server関数がこのサーバのメインロジック。

server :: Handle -> TVar Integer -> TChan String -> IO ()
server h factor c = do
  f <- atomically $ readTVar factor
  hPrintf h "Current factor: %d\n" f
  loop f
  where
    loop f = do
      action <- atomically $ do
        f' <- readTVar factor
        if f /= f'
           then return $ newfactor f'
           else do
             l <- readTChan c
             return $ command f l
      action
    newfactor f = do
      hPrintf h "new factor: %d\n" f
      loop f
    command f s
     = case s of
         "quit" -> hPutStrLn h "finished doubling service"
         '*':s -> do
           atomically $ writeTVar factor (read s :: Integer)
           loop f
         line -> do
           hPutStrLn h $ show $ f * (read line :: Integer)
           loop f

loopの処理がわかりづらいが、基本的な考え方はSTMを実行した結果としてIOアクションを返すということ。そしてそのIOを実行する。STMの中ではIOを実行することができないため、このやり方はSTMを使う上でよく出てくる。

loopのSTM内ではまずTVarの値を読む。現在知ってるfactorの値と異なる場合はそれを新しいfactorとしてクライアントに通知する。値に変化がない場合、TChanから値を読んで入力のパターンに応じて各コマンドの処理を行う。ここでの処理はorElseを使ったブロッキング処理(第10章でてきた)と同じであるが以下の関係があるため、orElseそのものは出てこない。

 (if A then retry else B) `orElse` C  ==>  if A then C else B

ちなみにorElseは1つ目の処理がretryされた場合に2つ目の処理が実行されるという関数。

commandの中では文字列のパターンに応じてクライアントへの返答がIOアクションとして記述されている。quitの場合のみloopが呼ばれないためserver関数自体の実行がそこで終了する。するとraceの働きによってreceiveスレッドも終了してクライアントとの接続が切れることになる。quit以外のパターンだった場合はloopが再度呼ばれることで処理が継続する。

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング