並行処理導入 MVar(Parallel and Concurrent Programming in Haskell Chapter 7)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch07.html

この章以降は並行処理について
第7章はMVarを使った並行処理の方法について

別スレッドで処理を開始するにはforkIOを使う

forkIO :: IO () -> IO ThreadId

型からわかるようにforkIOは実行結果を返さない
よって別スレッドで行った処理の結果を取得するには
データを橋渡しするものが必要であり、それがMVar

ここではforkIOを使った簡単な例として
A,Bを出力するというのが挙げられている

main = do
  hSetBuffering stdout NoBuffering
  forkIO (replicateM_ 1000 (putChar 'A'))
  replicateM_ 1000 (putChar 'B')

出力をバッファリングしない設定をし、
forkしたスレッドではA,親スレッドではBを出力する
結果は以下のようにAとBが連続した状態になる

BABABABABABABABABABABABABABABABABABAB...

ここでABが綺麗に交互に現れているのはランタイムがそうなるように調整してくれているため
この出力はstdoutのハンドルを取り合うスレッド競合の例として考えることができる
詳しくは章の最後の公平性のところにも書いてある

簡単な例(リマインダー)

forkしたスレッドで指定した秒数後にメッセージを出力するという例
exitを入力すると抜けるようにする場合以下のように書ける
hSetBufferingを入れないと指定した秒数後に正しく出力されないので注意

import Control.Concurrent
import Control.Monad
import System.IO
import Text.Printf

main :: IO ()
main = do
  hSetBuffering stdout NoBuffering
  loop
  where
    loop = do
      s <- getLine
      if s == "exit" 
         then return () 
         else do forkIO $ setReminder s
                 loop

setReminder :: String -> IO ()
setReminder s = do
  let t = read s :: Int
  printf "Remind you %d seconds\n" t
  threadDelay $ t * 10^6
  printf "%d seconds is up!\n" t

これを実行してみるとこうなる

$ ./reminder
2
Remind you 2 seconds
5
Remind you 5 seconds
2 seconds is up!
exit
$ 

exitを入力したため5秒後のリマインドは行われなかった
つまりメインスレッドが終了した場合、forkしたスレッドも終了するということ
今回の例だとforkしたスレッドの実行を待ってくれた方が都合がよいと言えるが
実行終了を待たずにすべてのスレッドが終了する方がシンプルであるためこのような設計にしてあるらしい

後述の節でforkしたスレッドの終了を待つような実装が紹介されている

MVarによるスレッド間のやりとり

この節ではスレッド間の情報のやりとりを簡単に行うためMVarを導入する
MVarは第4章で出てきたIVarと似ていて箱と考えるのがしっくりくる
ただし、IVarと違って中身を書き換えることができる

APIは以下のよう

data MVar a
newEmptyMVar :: IO (MVar a)
newMVar      :: a -> IO (MVar a)
takeMVar     :: MVar a -> IO a
putMVar      :: MVar a -> a -> IO ()

takeMVarは中身が入っていればそれを取得して箱を空にする
中身が空の場合はブロックして中身が入るのを待つ
putMVarはtakeMVarの逆で中身が空であれば指定した値を箱に入れる
中身が既に入っていればブロックして中身が空になるのを待つ

こんな感じで使える

main = do
  m <- newEmptyMVar
  forkIO $ do
    putMVar m 'x'
    putMVar m 'y'
  r <- takeMVar m
  print r
  r <- takeMVar m
  print r

実行してみると以下のよう

'x'
'y'

空のMVarを作成したら別スレッドで値x, yを入れる
メインスレッドでは値を読み取って出力という動作を2回行う
putMVarとtakeMVarの実行順序は以下のようになる
1. forkしたスレッドでputMVarが実行されxが入る(メインスレッドのtakeMVarはそれまでブロック)
2. メインスレッドのtakeMVarでxが取得されてMVarは空になる(forkしたスレッドのputMVar m 'y'はそれまでブロック)
3. forkしたスレッドでputMVarが実行されyが入る(メインスレッドのtakeMVarはそれまでブロック)
4. メインスレッドのtakeMVarでyが取得されてMVarは空になる

もし、takeMVarに対応するputMVarがなかった場合はランタイムがそれを検知して例外を投げてくれる
BlockedIndefinitelyOnMVarという例外らしい

以上よりMVarについてまとめ

  • スレッド間での情報伝達のために一つだけメッセージを保持できる箱
  • 書き換え可能な状態を表す箱として使え、リソースのロックやより大きなデータ構造を表す部品として使うこともできる

MVarによるチャネル(ロギング)

ここでは別スレッドでの処理が必要な例としてシンプルなロガーを実装してみる

トップレベルに一つ全体で共有するロガーを定義するのではなく、生成して使いまわすようにする
こうすることで複数のロガーを別々に使用するなどのメリットがある

実装の全体はこちら

mport Control.Concurrent

data Logger = Logger (MVar LogCommand)

data LogCommand = Message String | Stop (MVar ())

initLogger :: IO Logger
initLogger = do
  m <- newEmptyMVar
  let l = Logger m
  forkIO (logger l)
  return l

logger :: Logger -> IO () 
logger (Logger m) = loop
  where
    loop = do
      cmd <- takeMVar m
      case cmd of
        Message s -> do 
          print s
          loop
        Stop s -> do 
          print "stop logger"
          putMVar s ()

logMessage :: Logger -> String -> IO ()
logMessage (Logger m) msg = putMVar m $ Message msg

logStop :: Logger -> IO ()
logStop (Logger m) = do
  m' <- newEmptyMVar
  putMVar m $ Stop m'
  takeMVar m'

initLoggerでは空のMVarを生成してそれをLoggerとして組み立てる
作ったLoggerはforkした別スレッドで処理

ログの処理自体はloggerにて実装
takeMVarでMVarを見てあれば値を取り出す
取り出した値をパターンマッチしてメッセージの表示やロガーのストップを処理

Stopの際にユニットをMVarに入れているのはメインスレッド側にロガーを止めたことを知らせるためである
ここでユニットがputされることでlogStopでtakeMVarが成功して処理が終わる
つまり、forkしたスレッドでロガーの処理が止まるまでメインスレッド側のlogStopは終了しないようになっている

logMessageでは単にputMVarにより出力したいメッセージをMVarに入れるのみ
これによりMVarが空の場合はすぐにlogMessageは返ることになる

この実装の問題点は複数のスレッドから一つのロガーに対してメッセージを送っていた場合に応答に時間がかかるということである
これはログを保持するためのチャネルが一つしかないことが原因
これを解決するにはチャネルを増やして同時に処理できる量を増やす必要があるが
それには後述の節で説明する方法が使える(読み書きが別のチャネル)

状態共有のためのコンテナ

ここではロックを使った処理について電話帳を例にして実装
複数のスレッドから読み書きされうるリソースの場合、
データを書き込んでいる途中の状態で読み出したりできると不正な結果を返すことになりかねない
その際、ロックを取ることによりリソースを操作できるスレッドを制限しデータの不整合を防ぐ

実装の全体はこちら
PhoneBook.hs

module Phonebook where                                             
                                                                   
import Control.Concurrent                                          
import qualified Data.Map as M                                     
                                                                   
type Name = String                                                 
type PhoneNumber = String                                          
type PhoneBook = M.Map Name PhoneNumber                            
                                                                   
newtype PhoneBookState = PhoneBookState (MVar PhoneBook)           
                                                                   
new :: IO PhoneBookState                                           
new = do                                                           
  m <- newMVar M.empty                                             
  return $ PhoneBookState m                                        
                                                                   
insert :: PhoneBookState -> Name -> PhoneNumber -> IO ()           
insert (PhoneBookState m) name number = do                         
  book <- takeMVar m                                               
  putMVar m $ M.insert name number book                            
                                                                   
lookup :: PhoneBookState -> Name -> IO (Maybe PhoneNumber)         
lookup (PhoneBookState m) name = do                                
  book <- takeMVar m                                               
  putMVar m book                                                   
  return $ M.lookup name book                                      

Main.hs

import qualified Phonebook as P   

main :: IO ()                                                            
main = do                                                                
  s <- P.new                                                              
  sequence_ [ P.insert s ("name" ++ show n) (show n) | n <- [1..1000] ]   
  P.lookup s "name1" >>= print                                            
  P.lookup s "dummy" >>= print                                            

PhoneBookは電話帳を表し、名前と番号のMapとして定義
そのデータに対して複数スレッドから読み書きを行うため
PhoneBookをMVarでwrapしたものをPhoneBookStateとして定義する

このように操作したいデータとそれをwrapしたデータのように分けることは良い習慣である
もしこれをライブラリとして公開することにした場合、操作したいデータについては変更することなく
PhoneBookStateだけを抽象的に表現するよう変更するだけでよい

newについては空のMapを入れたMVarを生成して返すのみ
insertはPhoneBookStateから取り出したMVar内にあるPhoneBookに新たなエントリを足して再度MVarに入れなおす
lookupもPhoneBookStateからMVarを取り出し、その中のPhoneBookから指定したnameのデータを取り出して返す
それぞれtakeMVar, putMVarがロックの取得・解放に対応している
lookupの際、putMVarをしておかないとロックが解放されず他の誰も操作できなくなってしまうことに注意

この例でわかる有益なことは、
変更不可能なhaskellのどんなデータでも、MVarに包むことで複数スレッドで共有可能なmutableなデータとして扱えるということである

immutableなデータをmutableな構造で包むことによるさらなるメリットは
lookupの操作自体はロックを解放した状態で実行できるという点である
これによりロックのかかる時間は短く済む
これは包まれているデータがimmutableであることによって可能になる

遅延評価によって発生するメリット・デメリットもある
insertの際にputMVarをしている部分について
メリット:M.insertの実行はロックを解放してから行われるためロック時間が短い
デメリット:多くのスレッドからinsertが使われている場合、評価前の状態で各表現が保持されるためメモリを余計に食う

もし、デメリットを考慮してメモリをあまり食わないようにしたければ以下のように正格評価されるようにすればよい

putMVar m $! M.insert name number book

ただし、これだとメリットも消えてしまう
いいとこ取りをしたいならこちらの書き方がある

let book ' = M.insert name number book
putMVar m book'
seq book' $ return ()

book'として評価前の表現を保持しておき、putMVarによってロックを解放した直後にseqで評価を行う
ただし、少しコードがわかりづらい

読み書きが別のチャネル

ここでは複数の値を保持できるチャネルをMVarを使って実装する
FIFOで読み書きを行う簡単なqueueであり、Control.Concurrent.Chanで実装されている

データ構造を表す図はこちら
(原文より引用)
f:id:y-kamiya:20150215204936p:plain

データの定義

data Chan a = Chan (MVar (Stream a)) (MVar (Stream a))          

type Stream a = MVar (Item a)                                   
data Item a = Item a (Stream a)                                 

Stream aはItemをMVarで包んだもの
Itemは自身の値と次のItemを含むStreamを持つ
Chanは2つのStreamを指していて、一つがread用(先頭)一つがwrite用(末尾)となる
write用のポインタが指しているのは次に値があいる場所なので常に空のMVar(hole)となっている

Chanを操作するための関数はこちら

newChan :: IO (Chan a)                  
newChan = do                            
  hole <- newEmptyMVar                  
  readVar <- newMVar hole               
  writeVar <- newMVar hole              
  return $ Chan readVar writeVar        
                                        
readChan :: Chan a -> IO a              
readChan (Chan readVar _) = do          
  stream <- takeMVar readVar            
  -- Item val tail <- takeMVar stream   
  Item val tail <- readMVar stream      
  putMVar readVar tail                  
  return val                            
                                        
writeChan :: Chan a -> a -> IO ()       
writeChan (Chan _ writeVar) val = do    
  newHole <- newEmptyMVar               
  oldHole <- takeMVar writeVar          
  putMVar oldHole $ Item val newHole    
  putMVar writeVar newHole              

newChanでは空のMVarを2つ作ってそれらをread用、write用とする
それぞれが指すStreamは同じものになっている

writeChanはwrite用のMVarを取り出してそれに指定した値をItemとして入れる
その際、空のMVarを一つ作ってそれを新しいwrite用のMVarとする

readChanはread用のMVarを取り出してその値を返す
その際、後続のStreamをread用のMVarに入れておく

このあと、チャネルを複製したり頭の方に値を追加したりという関数を定義する部分が続く
関数そのものは重要ではないためここでは書かない

この節で重要なことは、MVarで大きな構造を作ろうとするとややこしくなりがちであるということ
さらにここでは出なかったが例外などを考え始めるとさらに複雑になってしまう
それを解決する方法は別の章で

MVarの公平性

MVarを使った際にブロックされたスレッドがたくさんあったなら永遠に実行されないスレッドが発生してしまうのでは?という話

ghcではどのスレッドがCPUを使うかをラウンドロビンで決定しているため
それぞれのスレッドのCPU時間は等しくならないものの、一つのスレッドが専有し続けるということも起こらない

また、takeMVarでブロックされたスレッドはそのMVarに紐づくFIFOのqueueに入って順番待ちをする
なので対応するputMVarがありさえすればいつかは処理される
さらにスレッドを起こす操作とブロックされていた操作の実行はアトミックに実行されるよう実装されているため
起こされたけれども他のスレッドが実行されたため再度ブロックキューに戻るということはない

この章の最初の例で上げたABAB...と表示するプログラムでは交互に出力が行われたのはこれらのためである
ただし、あるスレッドが実行される前にOSで割り込みの処理が入るなどした場合は処理が飛ばされることもある

公平性を保つための実装についてまとめると、
ブロックされたスレッドのうち一つだけが起こされるということが鍵になっている
これはパフォーマンス上でも重要なことである

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング