並行処理導入 MVar(Parallel and Concurrent Programming in Haskell Chapter 7)
英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch07.html
この章以降は並行処理について
第7章はMVarを使った並行処理の方法について
別スレッドで処理を開始するにはforkIOを使う
forkIO :: IO () -> IO ThreadId
型からわかるようにforkIOは実行結果を返さない
よって別スレッドで行った処理の結果を取得するには
データを橋渡しするものが必要であり、それがMVar
ここではforkIOを使った簡単な例として
A,Bを出力するというのが挙げられている
main = do hSetBuffering stdout NoBuffering forkIO (replicateM_ 1000 (putChar 'A')) replicateM_ 1000 (putChar 'B')
出力をバッファリングしない設定をし、
forkしたスレッドではA,親スレッドではBを出力する
結果は以下のようにAとBが連続した状態になる
BABABABABABABABABABABABABABABABABABAB...
ここでABが綺麗に交互に現れているのはランタイムがそうなるように調整してくれているため
この出力はstdoutのハンドルを取り合うスレッド競合の例として考えることができる
詳しくは章の最後の公平性のところにも書いてある
簡単な例(リマインダー)
forkしたスレッドで指定した秒数後にメッセージを出力するという例
exitを入力すると抜けるようにする場合以下のように書ける
hSetBufferingを入れないと指定した秒数後に正しく出力されないので注意
import Control.Concurrent import Control.Monad import System.IO import Text.Printf main :: IO () main = do hSetBuffering stdout NoBuffering loop where loop = do s <- getLine if s == "exit" then return () else do forkIO $ setReminder s loop setReminder :: String -> IO () setReminder s = do let t = read s :: Int printf "Remind you %d seconds\n" t threadDelay $ t * 10^6 printf "%d seconds is up!\n" t
これを実行してみるとこうなる
$ ./reminder 2 Remind you 2 seconds 5 Remind you 5 seconds 2 seconds is up! exit $
exitを入力したため5秒後のリマインドは行われなかった
つまりメインスレッドが終了した場合、forkしたスレッドも終了するということ
今回の例だとforkしたスレッドの実行を待ってくれた方が都合がよいと言えるが
実行終了を待たずにすべてのスレッドが終了する方がシンプルであるためこのような設計にしてあるらしい
後述の節でforkしたスレッドの終了を待つような実装が紹介されている
MVarによるスレッド間のやりとり
この節ではスレッド間の情報のやりとりを簡単に行うためMVarを導入する
MVarは第4章で出てきたIVarと似ていて箱と考えるのがしっくりくる
ただし、IVarと違って中身を書き換えることができる
APIは以下のよう
data MVar a newEmptyMVar :: IO (MVar a) newMVar :: a -> IO (MVar a) takeMVar :: MVar a -> IO a putMVar :: MVar a -> a -> IO ()
takeMVarは中身が入っていればそれを取得して箱を空にする
中身が空の場合はブロックして中身が入るのを待つ
putMVarはtakeMVarの逆で中身が空であれば指定した値を箱に入れる
中身が既に入っていればブロックして中身が空になるのを待つ
こんな感じで使える
main = do m <- newEmptyMVar forkIO $ do putMVar m 'x' putMVar m 'y' r <- takeMVar m print r r <- takeMVar m print r
実行してみると以下のよう
'x' 'y'
空のMVarを作成したら別スレッドで値x, yを入れる
メインスレッドでは値を読み取って出力という動作を2回行う
putMVarとtakeMVarの実行順序は以下のようになる
1. forkしたスレッドでputMVarが実行されxが入る(メインスレッドのtakeMVarはそれまでブロック)
2. メインスレッドのtakeMVarでxが取得されてMVarは空になる(forkしたスレッドのputMVar m 'y'はそれまでブロック)
3. forkしたスレッドでputMVarが実行されyが入る(メインスレッドのtakeMVarはそれまでブロック)
4. メインスレッドのtakeMVarでyが取得されてMVarは空になる
もし、takeMVarに対応するputMVarがなかった場合はランタイムがそれを検知して例外を投げてくれる
BlockedIndefinitelyOnMVarという例外らしい
以上よりMVarについてまとめ
- スレッド間での情報伝達のために一つだけメッセージを保持できる箱
- 書き換え可能な状態を表す箱として使え、リソースのロックやより大きなデータ構造を表す部品として使うこともできる
MVarによるチャネル(ロギング)
ここでは別スレッドでの処理が必要な例としてシンプルなロガーを実装してみる
トップレベルに一つ全体で共有するロガーを定義するのではなく、生成して使いまわすようにする
こうすることで複数のロガーを別々に使用するなどのメリットがある
実装の全体はこちら
mport Control.Concurrent data Logger = Logger (MVar LogCommand) data LogCommand = Message String | Stop (MVar ()) initLogger :: IO Logger initLogger = do m <- newEmptyMVar let l = Logger m forkIO (logger l) return l logger :: Logger -> IO () logger (Logger m) = loop where loop = do cmd <- takeMVar m case cmd of Message s -> do print s loop Stop s -> do print "stop logger" putMVar s () logMessage :: Logger -> String -> IO () logMessage (Logger m) msg = putMVar m $ Message msg logStop :: Logger -> IO () logStop (Logger m) = do m' <- newEmptyMVar putMVar m $ Stop m' takeMVar m'
initLoggerでは空のMVarを生成してそれをLoggerとして組み立てる
作ったLoggerはforkした別スレッドで処理
ログの処理自体はloggerにて実装
takeMVarでMVarを見てあれば値を取り出す
取り出した値をパターンマッチしてメッセージの表示やロガーのストップを処理
Stopの際にユニットをMVarに入れているのはメインスレッド側にロガーを止めたことを知らせるためである
ここでユニットがputされることでlogStopでtakeMVarが成功して処理が終わる
つまり、forkしたスレッドでロガーの処理が止まるまでメインスレッド側のlogStopは終了しないようになっている
logMessageでは単にputMVarにより出力したいメッセージをMVarに入れるのみ
これによりMVarが空の場合はすぐにlogMessageは返ることになる
この実装の問題点は複数のスレッドから一つのロガーに対してメッセージを送っていた場合に応答に時間がかかるということである
これはログを保持するためのチャネルが一つしかないことが原因
これを解決するにはチャネルを増やして同時に処理できる量を増やす必要があるが
それには後述の節で説明する方法が使える(読み書きが別のチャネル)
状態共有のためのコンテナ
ここではロックを使った処理について電話帳を例にして実装
複数のスレッドから読み書きされうるリソースの場合、
データを書き込んでいる途中の状態で読み出したりできると不正な結果を返すことになりかねない
その際、ロックを取ることによりリソースを操作できるスレッドを制限しデータの不整合を防ぐ
実装の全体はこちら
PhoneBook.hs
module Phonebook where import Control.Concurrent import qualified Data.Map as M type Name = String type PhoneNumber = String type PhoneBook = M.Map Name PhoneNumber newtype PhoneBookState = PhoneBookState (MVar PhoneBook) new :: IO PhoneBookState new = do m <- newMVar M.empty return $ PhoneBookState m insert :: PhoneBookState -> Name -> PhoneNumber -> IO () insert (PhoneBookState m) name number = do book <- takeMVar m putMVar m $ M.insert name number book lookup :: PhoneBookState -> Name -> IO (Maybe PhoneNumber) lookup (PhoneBookState m) name = do book <- takeMVar m putMVar m book return $ M.lookup name book
Main.hs
import qualified Phonebook as P main :: IO () main = do s <- P.new sequence_ [ P.insert s ("name" ++ show n) (show n) | n <- [1..1000] ] P.lookup s "name1" >>= print P.lookup s "dummy" >>= print
PhoneBookは電話帳を表し、名前と番号のMapとして定義
そのデータに対して複数スレッドから読み書きを行うため
PhoneBookをMVarでwrapしたものをPhoneBookStateとして定義する
このように操作したいデータとそれをwrapしたデータのように分けることは良い習慣である
もしこれをライブラリとして公開することにした場合、操作したいデータについては変更することなく
PhoneBookStateだけを抽象的に表現するよう変更するだけでよい
newについては空のMapを入れたMVarを生成して返すのみ
insertはPhoneBookStateから取り出したMVar内にあるPhoneBookに新たなエントリを足して再度MVarに入れなおす
lookupもPhoneBookStateからMVarを取り出し、その中のPhoneBookから指定したnameのデータを取り出して返す
それぞれtakeMVar, putMVarがロックの取得・解放に対応している
lookupの際、putMVarをしておかないとロックが解放されず他の誰も操作できなくなってしまうことに注意
この例でわかる有益なことは、
変更不可能なhaskellのどんなデータでも、MVarに包むことで複数スレッドで共有可能なmutableなデータとして扱えるということである
immutableなデータをmutableな構造で包むことによるさらなるメリットは
lookupの操作自体はロックを解放した状態で実行できるという点である
これによりロックのかかる時間は短く済む
これは包まれているデータがimmutableであることによって可能になる
遅延評価によって発生するメリット・デメリットもある
insertの際にputMVarをしている部分について
メリット:M.insertの実行はロックを解放してから行われるためロック時間が短い
デメリット:多くのスレッドからinsertが使われている場合、評価前の状態で各表現が保持されるためメモリを余計に食う
もし、デメリットを考慮してメモリをあまり食わないようにしたければ以下のように正格評価されるようにすればよい
putMVar m $! M.insert name number book
ただし、これだとメリットも消えてしまう
いいとこ取りをしたいならこちらの書き方がある
let book ' = M.insert name number book putMVar m book' seq book' $ return ()
book'として評価前の表現を保持しておき、putMVarによってロックを解放した直後にseqで評価を行う
ただし、少しコードがわかりづらい
読み書きが別のチャネル
ここでは複数の値を保持できるチャネルをMVarを使って実装する
FIFOで読み書きを行う簡単なqueueであり、Control.Concurrent.Chanで実装されている
データ構造を表す図はこちら
(原文より引用)
データの定義
data Chan a = Chan (MVar (Stream a)) (MVar (Stream a)) type Stream a = MVar (Item a) data Item a = Item a (Stream a)
Stream aはItemをMVarで包んだもの
Itemは自身の値と次のItemを含むStreamを持つ
Chanは2つのStreamを指していて、一つがread用(先頭)一つがwrite用(末尾)となる
write用のポインタが指しているのは次に値があいる場所なので常に空のMVar(hole)となっている
Chanを操作するための関数はこちら
newChan :: IO (Chan a) newChan = do hole <- newEmptyMVar readVar <- newMVar hole writeVar <- newMVar hole return $ Chan readVar writeVar readChan :: Chan a -> IO a readChan (Chan readVar _) = do stream <- takeMVar readVar -- Item val tail <- takeMVar stream Item val tail <- readMVar stream putMVar readVar tail return val writeChan :: Chan a -> a -> IO () writeChan (Chan _ writeVar) val = do newHole <- newEmptyMVar oldHole <- takeMVar writeVar putMVar oldHole $ Item val newHole putMVar writeVar newHole
newChanでは空のMVarを2つ作ってそれらをread用、write用とする
それぞれが指すStreamは同じものになっている
writeChanはwrite用のMVarを取り出してそれに指定した値をItemとして入れる
その際、空のMVarを一つ作ってそれを新しいwrite用のMVarとする
readChanはread用のMVarを取り出してその値を返す
その際、後続のStreamをread用のMVarに入れておく
このあと、チャネルを複製したり頭の方に値を追加したりという関数を定義する部分が続く
関数そのものは重要ではないためここでは書かない
この節で重要なことは、MVarで大きな構造を作ろうとするとややこしくなりがちであるということ
さらにここでは出なかったが例外などを考え始めるとさらに複雑になってしまう
それを解決する方法は別の章で
MVarの公平性
MVarを使った際にブロックされたスレッドがたくさんあったなら永遠に実行されないスレッドが発生してしまうのでは?という話
ghcではどのスレッドがCPUを使うかをラウンドロビンで決定しているため
それぞれのスレッドのCPU時間は等しくならないものの、一つのスレッドが専有し続けるということも起こらない
また、takeMVarでブロックされたスレッドはそのMVarに紐づくFIFOのqueueに入って順番待ちをする
なので対応するputMVarがありさえすればいつかは処理される
さらにスレッドを起こす操作とブロックされていた操作の実行はアトミックに実行されるよう実装されているため
起こされたけれども他のスレッドが実行されたため再度ブロックキューに戻るということはない
この章の最初の例で上げたABAB...と表示するプログラムでは交互に出力が行われたのはこれらのためである
ただし、あるスレッドが実行される前にOSで割り込みの処理が入るなどした場合は処理が飛ばされることもある
公平性を保つための実装についてまとめると、
ブロックされたスレッドのうち一つだけが起こされるということが鍵になっている
これはパフォーマンス上でも重要なことである

- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る

- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る