スレッドによる並列プログラミング(Parallel and Concurrent Programming in Haskell Chapter 13)
英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch13.html
このページで紹介しているコードはほとんど上記ページからの引用。
これまで並行処理で用いてきたスレッドを使って並列処理を行うという話。
本の前半で扱ったEvalやParを使えばそれでよいのだが、以下のような場合はそれができない。
- 並列にしたい処理中に入出力が含まれている
- 並列にしたい処理が非決定的である
最初、この2つは同じことをいっているような感じを受けたが異なることを言っていた。こちらのブログにわかりやすくまとまっていた。STモナドとIOモナドについて。
http://uehaj.hatenablog.com/entry/2014/01/29/110222
STモナドを使えば非決定性のある計算も扱うことができるようだが、その中で並列処理を行うのは難しく、それならば並行処理としてそれを実現すべきというのが導入。
ただし、この章の最後まで行くとParのIO版であるParIOを使用することでパフォーマンス的にもコード的にもうまくいくということが書いてある。なのでやはり並列処理用のParなどが使える状況であればそれを使う方法を考えるのが最も良さそう。それが無理なら仕方ないのでスレッドを作ることで並列化を実現するという感じか。
ファイル探索
この章で使う例は簡略化したfindコマンド。指定した名前のファイルを指定したディレクトリ以下から再帰的に探して最初に見つかったものを返すプログラム。まずsequentialに書いてそれを並列化していく。
sequential version
まずはsequentialなプログラムを書く。
import System.Environment (getArgs)
import System.Directory (getDirectoryContents, doesDirectoryExist)
import System.FilePath ((</>))
import Data.List (sort)
main :: IO ()
main = do
(filename:dir:[]) <- getArgs
print =<< find_seq filename dir
find_seq :: String -> FilePath -> IO (Maybe FilePath)
find_seq filename dir = do
fs <- getDirectoryContents dir
let fs' = sort $ filter (`notElem` [".", ".."]) fs
if any (== filename) fs'
then return $ Just $ dir </> filename
else loop fs'
where
loop [] = return Nothing
loop (f:fs) = do
let dir' = dir </> f
isdir <- doesDirectoryExist dir'
if isdir
then do
r <- find_seq filename dir'
case r of
Just _ -> return r
Nothing -> loop fs
else loop fs getDirectoryContentsで指定したディレクトリ内の要素をすべて取ってくる。sort, filterで現在のディレクトリと親を除いている。その中に名前の一致する要素があれば返して終了、なければloop。
loopではディレクトリ内の要素のリストからディレクトリだけを選別し、そのディレクトリに対して再度検索を行う。
parallel version
sequentialなバージョンを元に並列化する。
並列化の際気をつけることは以下。
- エラー発生時にすべてのスレッドが終了する
- ファイルを見つけた場合はその時点ですべてのスレッドが終了する
これらを満たすために10章で使ったwithAsyncを用いる。Asyncは指定したIOを別スレッドで開始するもの。withAsyncはそれをbracketで包むことで実行終了時のクリーンアップ処理を内包している。
Control.Concurrent.Asyncに含まれているので今回は自前の実装はしない。
https://hackage.haskell.org/package/async-2.0.1.4/docs/Control-Concurrent-Async.html#v:withAsync
Async関係の関数が以前出てきたのは11章なのでこちら参照。
http://jsapachehtml.hatenablog.com/entry/2015/03/08/140358
各ディレクトリを並列に処理するために新しいスレッドを順次立ち上げていくがその構造はツリー形になり、それはfoldを使って表せる。畳み込みに使う関数をsubfindとして以下のように定義する。
import Control.Concurrent.Async
subfind :: String -> FilePath
-> ([Async (Maybe FilePath)] -> IO (Maybe FilePath))
-> [Async (Maybe FilePath)] -> IO (Maybe FilePath)
subfind filename dir inner asyncs = do
isdir <- doesDirectoryExist dir
if not isdir
then inner asyncs
else withAsync (find_par filename dir) $ \a -> inner (a:asyncs)
find_par :: String -> FilePath -> IO (Maybe FilePath)
find_par filename dir = do
fs <- getDirectoryContents dir
let fs' = sort $ filter (`notElem` [".", ".."]) fs
if any (== filename) fs'
then return $ Just $ dir </> filename
else do
let ps = map (dir </>) fs'
foldr (subfind filename) dowait ps []
where
dowait as = loop $ reverse as
loop [] = return Nothing
loop (a:as) = do
r <- wait a
case r of
Nothing -> loop as
Just p -> return $ Just p現在対象としているディレクトリに指定したファイルが見つからない場合にはその中の各ディレクトリを再帰的に検索していく。検索を再帰的に行うのがsubfindだがその中でfindを呼んでいるのでわかりづらい。さらにfoldrしているところはとても難解で理解に苦労した。
別のページとしてまとめたので詳細はこちらで。
subfindの実装(Parallel and Concurrent Programming in Haskell Chapter 13の一部) - MEMOcho-
パフォーマンス
sequentialなバージョンとparallelなバージョンで存在しないファイルを検索した場合以下のような結果になった。
ちなみに検索対象は自分のマシンのホームディレクトリだが、全部で66000のエントリがあり、そのうち35000がディレクトリだった。
また、以下のコマンド実行時はfindというコマンドを実行してその第1引数としてfind_seqやfind_parのように呼び出すバージョンを指定するようにした。
詳細はgithubを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Find/Main.hs#L13
まずsequentialバージョン。
$ ./dist/build/find/find find_seq aaaaa ~/ +RTS -s
Nothing
509,897,416 bytes allocated in the heap
13,693,824 bytes copied during GC
177,128 bytes maximum residency (4 sample(s))
25,928 bytes maximum slop
2 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 981 colls, 0 par 0.02s 0.02s 0.0000s 0.0002s
Gen 1 4 colls, 0 par 0.00s 0.00s 0.0001s 0.0002s
TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.31s ( 0.71s elapsed)
GC time 0.02s ( 0.02s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.33s ( 0.73s elapsed)次にparallelバージョン。
$ ./dist/build/find/find find_par aaaaa ~/ +RTS -s
Nothing
546,395,680 bytes allocated in the heap
157,722,216 bytes copied during GC
10,989,232 bytes maximum residency (21 sample(s))
621,552 bytes maximum slop
28 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 1016 colls, 0 par 0.19s 0.20s 0.0002s 0.0005s
Gen 1 21 colls, 0 par 0.04s 0.05s 0.0025s 0.0053s
TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.38s ( 0.75s elapsed)
GC time 0.24s ( 0.25s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.62s ( 1.01s elapsed)2つの結果で異なるのは使用メモリと経過時間。使用メモリについてはparallelバージョンの方が14倍も多くなっている。これはスレッドを生成するなどparallelに処理するためのオーバヘッドがかかるため。経過時間についてもオーバヘッドの分parallelバージョンの方が大きくなっている。
parallelバージョンをコア2つ検索した場合。
./dist/build/find/find find_par aaaaa ~ +RTS -s -N2
Nothing
546,511,664 bytes allocated in the heap
139,327,048 bytes copied during GC
8,853,000 bytes maximum residency (21 sample(s))
514,080 bytes maximum slop
23 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 574 colls, 574 par 0.14s 0.09s 0.0002s 0.0004s
Gen 1 21 colls, 20 par 0.05s 0.03s 0.0013s 0.0032s
Parallel GC work balance: 73.39% (serial 0%, perfect 100%)
TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.42s ( 0.43s elapsed)
GC time 0.18s ( 0.12s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.61s ( 0.55s elapsed)parallelバージョンをコア4つ検索した場合。
./dist/build/find/find find_par aaaaa ~ +RTS -s -N4
Nothing
546,697,408 bytes allocated in the heap
127,976,816 bytes copied during GC
9,595,944 bytes maximum residency (18 sample(s))
592,392 bytes maximum slop
25 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 322 colls, 322 par 0.17s 0.05s 0.0002s 0.0005s
Gen 1 18 colls, 17 par 0.06s 0.02s 0.0011s 0.0024s
Parallel GC work balance: 63.98% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.44s ( 0.22s elapsed)
GC time 0.23s ( 0.07s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.67s ( 0.29s elapsed)コアを1つでfind_parを実行したときに比べてほぼ2倍、4倍のスピードアップを実現した。
本だと1コアの場合と比べて2コア場合以下のような結果になっている。
- 経過時間は2倍以上早くなった
- GCにかかる時間が減少した
- 使用メモリが減少した
私の環境だとそこまでのパフォーマンスアップにはならなかったが、これはファイル数が少ないためその影響が出づらい、またはghcのバージョンの違いなどが影響しているのではと思われる。
セマフォによる粒度の調整
ここではsequentialバージョンと比べたときに2倍、4倍とはなっていないことに着目してそのオーバヘッド分がどこから来ているのかを考える。
sequentialな場合のthreadscope画像

4コアの場合のthreadscope画像

4コアの画像では細かくてわかりづらいものの、水色や黄緑のラインが細かく入っているのがわかる。水色がスレッド生成、黄緑がスレッドの起動を表している。4コアの場合はスレッドの生成や起動が頻繁に行われていることが画像からわかる。
現在のparallelバージョンのプログラムだとスレッドの生成(=Asyncの生成)はディレクトリの数だけ起こる。これは粒度が細かすぎると考えられる。
第1部でやったように任意の大きさのchunkに分けることで粒度を調整するというのが思い浮かぶが、今回扱っているのはディレクトリ構成であり各ディレクトリ内に含まれるエントリも均一ではないためうまく粒度を調整できない。
よってここでは別の方法を使う。今回問題なのはスレッドが生成されすぎてしまうということなので、スレッドの生成数に上限を設けることで対応する。スレッド数が上限値に達しているときは新しいスレッドを作らず、そのスレッドでsequentialに処理を続ける。現在のスレッド数を数えて判定するためにセマフォを用いる。
よく使われるセマフォの機能としては、リソースを使う際にカウントを-1し、使い終わったらカウントを+1する。カウントが0の際は1以上になるのを待つ。
今回の場合、カウントが0の場合はリソースの解放を待つのでなくそのまま処理を続けるということなので同期を取るのに比べると簡単に実装できる。ここではそれをNon Blocking Semaphore(NBSem)として実装する。
newtype NBSem = NBSem (MVar Int)
newNBSem :: Int -> IO NBSem
newNBSem i = do
m <- newMVar i
return $ NBSem m
tryAcquireNBSem :: NBSem -> IO Bool
tryAcquireNBSem (NBSem m) = do
modifyMVar m $ \i ->
if i == 0
then return (i, False)
else let !z = i-1 in return (z, True)
releaseNBSem :: NBSem -> IO ()
releaseNBSem (NBSem m) = do
modifyMVar m $ \i ->
let !z = i+1 in return (z,()) MVarの中にカウントを持つ形の実装。リソースの取得・解放においてはMVarの中の値を+1, -1するだけ。
NBSemを使うとsubfindは以下のように書ける。
subfind :: NBSem -> String -> FilePath
-> ([Async (Maybe FilePath)] -> IO (Maybe FilePath))
-> [Async (Maybe FilePath)] -> IO (Maybe FilePath)
subfind sem filename dir inner asyncs = do
isdir <- doesDirectoryExist dir
if not isdir
then inner asyncs
else do
q <- tryAcquireNBSem sem
if q
then do
let dofind = find sem filename dir `finally` releaseNBSem sem
withAsync dofind $ \a -> inner (a:asyncs)
else do
r <- find sem filename dir
case r of
Nothing -> inner asyncs
Just _ -> return r ディレクトリ内に指定のファイルが見つからなかったときはまずセマフォのカウントを確認。カウントが0の状態であればwithAsyncによって新しいスレッドを生成する。そうでなければそのスレッドにてfindの処理を続ける。
findにはセマフォの値を渡すようにする
main = do [n,s,d] <- getArgs sem <- newNBSem (read n) find sem s d >>= print
これを用いた検索の結果は以下のようになった。
4コアでセマフォが8のとき。
./dist/build/find/find find_par_sem 8 aaaaa ~ +RTS -s -N4 -l
Nothing
531,868,400 bytes allocated in the heap
24,751,824 bytes copied during GC
1,163,720 bytes maximum residency (21 sample(s))
78,832 bytes maximum slop
5 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 331 colls, 331 par 0.05s 0.02s 0.0001s 0.0003s
Gen 1 21 colls, 20 par 0.01s 0.00s 0.0001s 0.0004s
Parallel GC work balance: 30.21% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.41s ( 0.27s elapsed)
GC time 0.06s ( 0.02s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.47s ( 0.29s elapsed)私の環境ではfind_parとくらべてそれほど差が出なかった。載せた結果の経過時間は一致している。何度も実行してだいたいの平均を比べてもほぼ同じくらいの経過時間だった。こちらもファイル数が少なすぎたのかもしれない。
threadscopeの結果は以下。

水色の線(スレッドの生成)が減っているのがわかる。ただ、問題はGC以外のタイミングでもスレッドの実行が止まっているときがある点である。これはセマフォのread, writeで競合してブロックされるスレッドが存在するためと考えられる。
よってNBSemを改善したい。ブロックしてしまうのが原因であればSTMを使えば回避できるが今回はIORefを使う。
atomicModifyIORef :: IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefはIORef内の値を取り出して指定した関数を適用する。関数は元々入っていた値と新しく入れた値のtupleを返すもの。最終的には新しく入れた値がIOとして返ってくる。
atomicModifyIORefは処理できるトランザクションが一つのSTMと考えることができ、今回の場合はカウントの値を増減するのみの処理であるためSTMを使うよりもオーバヘッドが小さくできる。
{-# LANGUAGE BangPatterns #-}
import Data.IORef
newtype NBSem = NBSem (IORef Int)
newNBSem :: Int -> IO NBSem
newNBSem i = do
m <- newIORef i
return $ NBSem m
tryWaitNBSem :: NBSem -> IO Bool
tryWaitNBSem (NBSem m) = do
atomicModifyIORef m $ \i ->
if i == 0
then (i, False)
else let !z = i-1 in (z, True)
signalNBSem :: NBSem -> IO ()
signalNBSem (NBSem m) = do
atomicModifyIORef m $ \i ->
let !z = i+1 in (z,()) ポイントはBangPatternを使って!z = i-1とすることで計算されずに保持されるサンクをなくすようにしている点。
main関数は先ほどと同じでも問題ないが、コア数と取得してその4倍の値をセマフォに設定している。
main = do [s,d] <- getArgs n <- getNumCapabilities sem <- newNBSem (if n == 1 then 0 else n * 4) find sem s d >>= print
実行結果は以下。
./dist/build/find/find find_par_sem_ioref aaaaa ~ +RTS -s -N4 -l
Nothing
535,757,128 bytes allocated in the heap
30,604,776 bytes copied during GC
1,491,200 bytes maximum residency (23 sample(s))
95,552 bytes maximum slop
6 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 305 colls, 305 par 0.05s 0.02s 0.0001s 0.0003s
Gen 1 23 colls, 22 par 0.01s 0.00s 0.0002s 0.0003s
Parallel GC work balance: 43.12% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.42s ( 0.23s elapsed)
GC time 0.07s ( 0.02s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.48s ( 0.25s elapsed)find_parと比べて若干高速化できた。
ParIOを使ったバージョン
第4章ではParモナドを使って並列処理を行った。今回やっている内容だと並列にしたい処理中にIOが入っている。この場合、ParIOを使うことでParと同じ操作ができる。操作として異なる点はParIOの実行にはrunParIOを用いるということだけ。
処理の性質としては中でIOを実行しているので非決定性を持つことになる。(実行毎に結果が違う可能性あり)
ParIOを用いることでsubfind, find, mainは以下のように書ける。
subfind :: String -> FilePath
-> ([IVar (Maybe FilePath)] -> ParIO (Maybe FilePath))
-> [IVar (Maybe FilePath)] -> ParIO (Maybe FilePath)
subfind filename dir inner ivars = do
isdir <- liftIO $ doesDirectoryExist dir
if not isdir
then inner ivars
else do
v <- new
fork $ find filename dir >>= put v
inner $ v : ivars
find :: String -> FilePath -> ParIO (Maybe FilePath)
find filename dir = do
fs <- liftIO $ getDirectoryContents dir
let fs' = sort $ filter (`notElem` [".", ".."]) fs
if any (== filename) fs'
then return $ Just $ dir </> filename
else do
let ps = map (dir </>) fs'
foldr (subfind filename) dowait ps []
where
dowait vs = loop $ reverse vs
loop [] = return Nothing
loop (v:vs) = do
r <- get v
case r of
Nothing -> loop vs
Just p -> return $ Just p
main = do
[s,d] <- getArgs
runParIO (find s d) >>= printfindの変更点は値を取り出す際にwaitではなくgetを使うようにしたことだけ。
subfindではまず型を以下のように変更。
- MVar => IVar
- IO => ParIO
関数内の処理としては、newで新たにIVarを生成してforkした上でfindを実行する(forkIOとは違ってスレッドが立ち上がるわけではない)。findの結果は生成したIVarに格納される。
mainではrunParIOによってParIOの処理を実行する。
実行結果。
./dist/build/find/find find_par_io aaaaa ~ +RTS -s -N4 -l
Nothing
543,403,528 bytes allocated in the heap
34,353,688 bytes copied during GC
1,120,752 bytes maximum residency (14 sample(s))
103,512 bytes maximum slop
6 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 348 colls, 348 par 0.07s 0.03s 0.0001s 0.0008s
Gen 1 14 colls, 13 par 0.02s 0.01s 0.0004s 0.0010s
Parallel GC work balance: 23.24% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.40s ( 0.21s elapsed)
GC time 0.09s ( 0.03s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.49s ( 0.25s elapsed)私の環境ではIORefを用いたバージョンとほぼ同じ結果となった。平均するとこちらの方が若干パフォーマンスがよかったかもという感じ。
パフォーマンスがよい理由としては、まず共有状態のロックを取ったり確認したりという処理がなくなったこと。また、Parの並列処理の実行は効率よくなるようにうまくスケジューリングされていることが挙げられる。
ただし、上のプログラムはエラー時に処理が止まるようになっていない。これについては読者への宿題ということらしい。
ちなみにParIOを用いたバージョンのthreadscopeはこちらのよう。

スレッドの生成を示す水色のバーはなくなっている。

- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る

- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る