英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch13.html
このページで紹介しているコードはほとんど上記ページからの引用。
これまで並行処理で用いてきたスレッドを使って並列処理を行うという話。
本の前半で扱ったEvalやParを使えばそれでよいのだが、以下のような場合はそれができない。
- 並列にしたい処理中に入出力が含まれている
- 並列にしたい処理が非決定的である
最初、この2つは同じことをいっているような感じを受けたが異なることを言っていた。こちらのブログにわかりやすくまとまっていた。STモナドとIOモナドについて。
http://uehaj.hatenablog.com/entry/2014/01/29/110222
STモナドを使えば非決定性のある計算も扱うことができるようだが、その中で並列処理を行うのは難しく、それならば並行処理としてそれを実現すべきというのが導入。
ただし、この章の最後まで行くとParのIO版であるParIOを使用することでパフォーマンス的にもコード的にもうまくいくということが書いてある。なのでやはり並列処理用のParなどが使える状況であればそれを使う方法を考えるのが最も良さそう。それが無理なら仕方ないのでスレッドを作ることで並列化を実現するという感じか。
ファイル探索
この章で使う例は簡略化したfindコマンド。指定した名前のファイルを指定したディレクトリ以下から再帰的に探して最初に見つかったものを返すプログラム。まずsequentialに書いてそれを並列化していく。
sequential version
まずはsequentialなプログラムを書く。
import System.Environment (getArgs) import System.Directory (getDirectoryContents, doesDirectoryExist) import System.FilePath ((</>)) import Data.List (sort) main :: IO () main = do (filename:dir:[]) <- getArgs print =<< find_seq filename dir find_seq :: String -> FilePath -> IO (Maybe FilePath) find_seq filename dir = do fs <- getDirectoryContents dir let fs' = sort $ filter (`notElem` [".", ".."]) fs if any (== filename) fs' then return $ Just $ dir </> filename else loop fs' where loop [] = return Nothing loop (f:fs) = do let dir' = dir </> f isdir <- doesDirectoryExist dir' if isdir then do r <- find_seq filename dir' case r of Just _ -> return r Nothing -> loop fs else loop fs
getDirectoryContentsで指定したディレクトリ内の要素をすべて取ってくる。sort, filterで現在のディレクトリと親を除いている。その中に名前の一致する要素があれば返して終了、なければloop。
loopではディレクトリ内の要素のリストからディレクトリだけを選別し、そのディレクトリに対して再度検索を行う。
parallel version
sequentialなバージョンを元に並列化する。
並列化の際気をつけることは以下。
- エラー発生時にすべてのスレッドが終了する
- ファイルを見つけた場合はその時点ですべてのスレッドが終了する
これらを満たすために10章で使ったwithAsyncを用いる。Asyncは指定したIOを別スレッドで開始するもの。withAsyncはそれをbracketで包むことで実行終了時のクリーンアップ処理を内包している。
Control.Concurrent.Asyncに含まれているので今回は自前の実装はしない。
https://hackage.haskell.org/package/async-2.0.1.4/docs/Control-Concurrent-Async.html#v:withAsync
Async関係の関数が以前出てきたのは11章なのでこちら参照。
http://jsapachehtml.hatenablog.com/entry/2015/03/08/140358
各ディレクトリを並列に処理するために新しいスレッドを順次立ち上げていくがその構造はツリー形になり、それはfoldを使って表せる。畳み込みに使う関数をsubfindとして以下のように定義する。
import Control.Concurrent.Async subfind :: String -> FilePath -> ([Async (Maybe FilePath)] -> IO (Maybe FilePath)) -> [Async (Maybe FilePath)] -> IO (Maybe FilePath) subfind filename dir inner asyncs = do isdir <- doesDirectoryExist dir if not isdir then inner asyncs else withAsync (find_par filename dir) $ \a -> inner (a:asyncs) find_par :: String -> FilePath -> IO (Maybe FilePath) find_par filename dir = do fs <- getDirectoryContents dir let fs' = sort $ filter (`notElem` [".", ".."]) fs if any (== filename) fs' then return $ Just $ dir </> filename else do let ps = map (dir </>) fs' foldr (subfind filename) dowait ps [] where dowait as = loop $ reverse as loop [] = return Nothing loop (a:as) = do r <- wait a case r of Nothing -> loop as Just p -> return $ Just p
現在対象としているディレクトリに指定したファイルが見つからない場合にはその中の各ディレクトリを再帰的に検索していく。検索を再帰的に行うのがsubfindだがその中でfindを呼んでいるのでわかりづらい。さらにfoldrしているところはとても難解で理解に苦労した。
別のページとしてまとめたので詳細はこちらで。
subfindの実装(Parallel and Concurrent Programming in Haskell Chapter 13の一部) - MEMOcho-
パフォーマンス
sequentialなバージョンとparallelなバージョンで存在しないファイルを検索した場合以下のような結果になった。
ちなみに検索対象は自分のマシンのホームディレクトリだが、全部で66000のエントリがあり、そのうち35000がディレクトリだった。
また、以下のコマンド実行時はfindというコマンドを実行してその第1引数としてfind_seqやfind_parのように呼び出すバージョンを指定するようにした。
詳細はgithubを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Find/Main.hs#L13
まずsequentialバージョン。
$ ./dist/build/find/find find_seq aaaaa ~/ +RTS -s Nothing 509,897,416 bytes allocated in the heap 13,693,824 bytes copied during GC 177,128 bytes maximum residency (4 sample(s)) 25,928 bytes maximum slop 2 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 981 colls, 0 par 0.02s 0.02s 0.0000s 0.0002s Gen 1 4 colls, 0 par 0.00s 0.00s 0.0001s 0.0002s TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.31s ( 0.71s elapsed) GC time 0.02s ( 0.02s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.33s ( 0.73s elapsed)
次にparallelバージョン。
$ ./dist/build/find/find find_par aaaaa ~/ +RTS -s Nothing 546,395,680 bytes allocated in the heap 157,722,216 bytes copied during GC 10,989,232 bytes maximum residency (21 sample(s)) 621,552 bytes maximum slop 28 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 1016 colls, 0 par 0.19s 0.20s 0.0002s 0.0005s Gen 1 21 colls, 0 par 0.04s 0.05s 0.0025s 0.0053s TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.38s ( 0.75s elapsed) GC time 0.24s ( 0.25s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.62s ( 1.01s elapsed)
2つの結果で異なるのは使用メモリと経過時間。使用メモリについてはparallelバージョンの方が14倍も多くなっている。これはスレッドを生成するなどparallelに処理するためのオーバヘッドがかかるため。経過時間についてもオーバヘッドの分parallelバージョンの方が大きくなっている。
parallelバージョンをコア2つ検索した場合。
./dist/build/find/find find_par aaaaa ~ +RTS -s -N2 Nothing 546,511,664 bytes allocated in the heap 139,327,048 bytes copied during GC 8,853,000 bytes maximum residency (21 sample(s)) 514,080 bytes maximum slop 23 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 574 colls, 574 par 0.14s 0.09s 0.0002s 0.0004s Gen 1 21 colls, 20 par 0.05s 0.03s 0.0013s 0.0032s Parallel GC work balance: 73.39% (serial 0%, perfect 100%) TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.42s ( 0.43s elapsed) GC time 0.18s ( 0.12s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.61s ( 0.55s elapsed)
parallelバージョンをコア4つ検索した場合。
./dist/build/find/find find_par aaaaa ~ +RTS -s -N4 Nothing 546,697,408 bytes allocated in the heap 127,976,816 bytes copied during GC 9,595,944 bytes maximum residency (18 sample(s)) 592,392 bytes maximum slop 25 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 322 colls, 322 par 0.17s 0.05s 0.0002s 0.0005s Gen 1 18 colls, 17 par 0.06s 0.02s 0.0011s 0.0024s Parallel GC work balance: 63.98% (serial 0%, perfect 100%) TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.44s ( 0.22s elapsed) GC time 0.23s ( 0.07s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.67s ( 0.29s elapsed)
コアを1つでfind_parを実行したときに比べてほぼ2倍、4倍のスピードアップを実現した。
本だと1コアの場合と比べて2コア場合以下のような結果になっている。
- 経過時間は2倍以上早くなった
- GCにかかる時間が減少した
- 使用メモリが減少した
私の環境だとそこまでのパフォーマンスアップにはならなかったが、これはファイル数が少ないためその影響が出づらい、またはghcのバージョンの違いなどが影響しているのではと思われる。
セマフォによる粒度の調整
ここではsequentialバージョンと比べたときに2倍、4倍とはなっていないことに着目してそのオーバヘッド分がどこから来ているのかを考える。
sequentialな場合のthreadscope画像
4コアの場合のthreadscope画像
4コアの画像では細かくてわかりづらいものの、水色や黄緑のラインが細かく入っているのがわかる。水色がスレッド生成、黄緑がスレッドの起動を表している。4コアの場合はスレッドの生成や起動が頻繁に行われていることが画像からわかる。
現在のparallelバージョンのプログラムだとスレッドの生成(=Asyncの生成)はディレクトリの数だけ起こる。これは粒度が細かすぎると考えられる。
第1部でやったように任意の大きさのchunkに分けることで粒度を調整するというのが思い浮かぶが、今回扱っているのはディレクトリ構成であり各ディレクトリ内に含まれるエントリも均一ではないためうまく粒度を調整できない。
よってここでは別の方法を使う。今回問題なのはスレッドが生成されすぎてしまうということなので、スレッドの生成数に上限を設けることで対応する。スレッド数が上限値に達しているときは新しいスレッドを作らず、そのスレッドでsequentialに処理を続ける。現在のスレッド数を数えて判定するためにセマフォを用いる。
よく使われるセマフォの機能としては、リソースを使う際にカウントを-1し、使い終わったらカウントを+1する。カウントが0の際は1以上になるのを待つ。
今回の場合、カウントが0の場合はリソースの解放を待つのでなくそのまま処理を続けるということなので同期を取るのに比べると簡単に実装できる。ここではそれをNon Blocking Semaphore(NBSem)として実装する。
newtype NBSem = NBSem (MVar Int) newNBSem :: Int -> IO NBSem newNBSem i = do m <- newMVar i return $ NBSem m tryAcquireNBSem :: NBSem -> IO Bool tryAcquireNBSem (NBSem m) = do modifyMVar m $ \i -> if i == 0 then return (i, False) else let !z = i-1 in return (z, True) releaseNBSem :: NBSem -> IO () releaseNBSem (NBSem m) = do modifyMVar m $ \i -> let !z = i+1 in return (z,())
MVarの中にカウントを持つ形の実装。リソースの取得・解放においてはMVarの中の値を+1, -1するだけ。
NBSemを使うとsubfindは以下のように書ける。
subfind :: NBSem -> String -> FilePath -> ([Async (Maybe FilePath)] -> IO (Maybe FilePath)) -> [Async (Maybe FilePath)] -> IO (Maybe FilePath) subfind sem filename dir inner asyncs = do isdir <- doesDirectoryExist dir if not isdir then inner asyncs else do q <- tryAcquireNBSem sem if q then do let dofind = find sem filename dir `finally` releaseNBSem sem withAsync dofind $ \a -> inner (a:asyncs) else do r <- find sem filename dir case r of Nothing -> inner asyncs Just _ -> return r
ディレクトリ内に指定のファイルが見つからなかったときはまずセマフォのカウントを確認。カウントが0の状態であればwithAsyncによって新しいスレッドを生成する。そうでなければそのスレッドにてfindの処理を続ける。
findにはセマフォの値を渡すようにする
main = do [n,s,d] <- getArgs sem <- newNBSem (read n) find sem s d >>= print
これを用いた検索の結果は以下のようになった。
4コアでセマフォが8のとき。
./dist/build/find/find find_par_sem 8 aaaaa ~ +RTS -s -N4 -l Nothing 531,868,400 bytes allocated in the heap 24,751,824 bytes copied during GC 1,163,720 bytes maximum residency (21 sample(s)) 78,832 bytes maximum slop 5 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 331 colls, 331 par 0.05s 0.02s 0.0001s 0.0003s Gen 1 21 colls, 20 par 0.01s 0.00s 0.0001s 0.0004s Parallel GC work balance: 30.21% (serial 0%, perfect 100%) TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.41s ( 0.27s elapsed) GC time 0.06s ( 0.02s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.47s ( 0.29s elapsed)
私の環境ではfind_parとくらべてそれほど差が出なかった。載せた結果の経過時間は一致している。何度も実行してだいたいの平均を比べてもほぼ同じくらいの経過時間だった。こちらもファイル数が少なすぎたのかもしれない。
threadscopeの結果は以下。
水色の線(スレッドの生成)が減っているのがわかる。ただ、問題はGC以外のタイミングでもスレッドの実行が止まっているときがある点である。これはセマフォのread, writeで競合してブロックされるスレッドが存在するためと考えられる。
よってNBSemを改善したい。ブロックしてしまうのが原因であればSTMを使えば回避できるが今回はIORefを使う。
atomicModifyIORef :: IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefはIORef内の値を取り出して指定した関数を適用する。関数は元々入っていた値と新しく入れた値のtupleを返すもの。最終的には新しく入れた値がIOとして返ってくる。
atomicModifyIORefは処理できるトランザクションが一つのSTMと考えることができ、今回の場合はカウントの値を増減するのみの処理であるためSTMを使うよりもオーバヘッドが小さくできる。
{-# LANGUAGE BangPatterns #-} import Data.IORef newtype NBSem = NBSem (IORef Int) newNBSem :: Int -> IO NBSem newNBSem i = do m <- newIORef i return $ NBSem m tryWaitNBSem :: NBSem -> IO Bool tryWaitNBSem (NBSem m) = do atomicModifyIORef m $ \i -> if i == 0 then (i, False) else let !z = i-1 in (z, True) signalNBSem :: NBSem -> IO () signalNBSem (NBSem m) = do atomicModifyIORef m $ \i -> let !z = i+1 in (z,())
ポイントはBangPatternを使って!z = i-1とすることで計算されずに保持されるサンクをなくすようにしている点。
main関数は先ほどと同じでも問題ないが、コア数と取得してその4倍の値をセマフォに設定している。
main = do [s,d] <- getArgs n <- getNumCapabilities sem <- newNBSem (if n == 1 then 0 else n * 4) find sem s d >>= print
実行結果は以下。
./dist/build/find/find find_par_sem_ioref aaaaa ~ +RTS -s -N4 -l Nothing 535,757,128 bytes allocated in the heap 30,604,776 bytes copied during GC 1,491,200 bytes maximum residency (23 sample(s)) 95,552 bytes maximum slop 6 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 305 colls, 305 par 0.05s 0.02s 0.0001s 0.0003s Gen 1 23 colls, 22 par 0.01s 0.00s 0.0002s 0.0003s Parallel GC work balance: 43.12% (serial 0%, perfect 100%) TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.42s ( 0.23s elapsed) GC time 0.07s ( 0.02s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.48s ( 0.25s elapsed)
find_parと比べて若干高速化できた。
ParIOを使ったバージョン
第4章ではParモナドを使って並列処理を行った。今回やっている内容だと並列にしたい処理中にIOが入っている。この場合、ParIOを使うことでParと同じ操作ができる。操作として異なる点はParIOの実行にはrunParIOを用いるということだけ。
処理の性質としては中でIOを実行しているので非決定性を持つことになる。(実行毎に結果が違う可能性あり)
ParIOを用いることでsubfind, find, mainは以下のように書ける。
subfind :: String -> FilePath -> ([IVar (Maybe FilePath)] -> ParIO (Maybe FilePath)) -> [IVar (Maybe FilePath)] -> ParIO (Maybe FilePath) subfind filename dir inner ivars = do isdir <- liftIO $ doesDirectoryExist dir if not isdir then inner ivars else do v <- new fork $ find filename dir >>= put v inner $ v : ivars find :: String -> FilePath -> ParIO (Maybe FilePath) find filename dir = do fs <- liftIO $ getDirectoryContents dir let fs' = sort $ filter (`notElem` [".", ".."]) fs if any (== filename) fs' then return $ Just $ dir </> filename else do let ps = map (dir </>) fs' foldr (subfind filename) dowait ps [] where dowait vs = loop $ reverse vs loop [] = return Nothing loop (v:vs) = do r <- get v case r of Nothing -> loop vs Just p -> return $ Just p main = do [s,d] <- getArgs runParIO (find s d) >>= print
findの変更点は値を取り出す際にwaitではなくgetを使うようにしたことだけ。
subfindではまず型を以下のように変更。
- MVar => IVar
- IO => ParIO
関数内の処理としては、newで新たにIVarを生成してforkした上でfindを実行する(forkIOとは違ってスレッドが立ち上がるわけではない)。findの結果は生成したIVarに格納される。
mainではrunParIOによってParIOの処理を実行する。
実行結果。
./dist/build/find/find find_par_io aaaaa ~ +RTS -s -N4 -l Nothing 543,403,528 bytes allocated in the heap 34,353,688 bytes copied during GC 1,120,752 bytes maximum residency (14 sample(s)) 103,512 bytes maximum slop 6 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 348 colls, 348 par 0.07s 0.03s 0.0001s 0.0008s Gen 1 14 colls, 13 par 0.02s 0.01s 0.0004s 0.0010s Parallel GC work balance: 23.24% (serial 0%, perfect 100%) TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 0.40s ( 0.21s elapsed) GC time 0.09s ( 0.03s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 0.49s ( 0.25s elapsed)
私の環境ではIORefを用いたバージョンとほぼ同じ結果となった。平均するとこちらの方が若干パフォーマンスがよかったかもという感じ。
パフォーマンスがよい理由としては、まず共有状態のロックを取ったり確認したりという処理がなくなったこと。また、Parの並列処理の実行は効率よくなるようにうまくスケジューリングされていることが挙げられる。
ただし、上のプログラムはエラー時に処理が止まるようになっていない。これについては読者への宿題ということらしい。
ちなみにParIOを用いたバージョンのthreadscopeはこちらのよう。
スレッドの生成を示す水色のバーはなくなっている。

- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る

- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る