スレッドによる並列プログラミング(Parallel and Concurrent Programming in Haskell Chapter 13)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch13.html

このページで紹介しているコードはほとんど上記ページからの引用。


これまで並行処理で用いてきたスレッドを使って並列処理を行うという話。

本の前半で扱ったEvalやParを使えばそれでよいのだが、以下のような場合はそれができない。

  • 並列にしたい処理中に入出力が含まれている
  • 並列にしたい処理が非決定的である

最初、この2つは同じことをいっているような感じを受けたが異なることを言っていた。こちらのブログにわかりやすくまとまっていた。STモナドとIOモナドについて。
http://uehaj.hatenablog.com/entry/2014/01/29/110222

STモナドを使えば非決定性のある計算も扱うことができるようだが、その中で並列処理を行うのは難しく、それならば並行処理としてそれを実現すべきというのが導入。

ただし、この章の最後まで行くとParのIO版であるParIOを使用することでパフォーマンス的にもコード的にもうまくいくということが書いてある。なのでやはり並列処理用のParなどが使える状況であればそれを使う方法を考えるのが最も良さそう。それが無理なら仕方ないのでスレッドを作ることで並列化を実現するという感じか。

ファイル探索

この章で使う例は簡略化したfindコマンド。指定した名前のファイルを指定したディレクトリ以下から再帰的に探して最初に見つかったものを返すプログラム。まずsequentialに書いてそれを並列化していく。

sequential version

まずはsequentialなプログラムを書く。

import System.Environment (getArgs)                                
import System.Directory (getDirectoryContents, doesDirectoryExist) 
import System.FilePath ((</>))                                     
import Data.List (sort)                                            

main :: IO ()                                                 
main = do                                                     
  (filename:dir:[]) <- getArgs                                   
  print =<< find_seq filename dir

find_seq :: String -> FilePath -> IO (Maybe FilePath)   
find_seq filename dir = do                              
  fs <- getDirectoryContents dir                        
  let fs' = sort $ filter (`notElem` [".", ".."]) fs    
  if any (== filename) fs'                              
    then return $ Just $ dir </> filename               
    else loop fs'                                       
  where                                                 
    loop [] = return Nothing                            
    loop (f:fs) = do                                    
      let dir' = dir </> f                              
      isdir <- doesDirectoryExist dir'                  
      if isdir                                          
        then do                                         
          r <- find_seq filename dir'                   
          case r of                                     
            Just _ -> return r                          
            Nothing -> loop fs                          
        else loop fs                                    

getDirectoryContentsで指定したディレクトリ内の要素をすべて取ってくる。sort, filterで現在のディレクトリと親を除いている。その中に名前の一致する要素があれば返して終了、なければloop。

loopではディレクトリ内の要素のリストからディレクトリだけを選別し、そのディレクトリに対して再度検索を行う。

parallel version

sequentialなバージョンを元に並列化する。

並列化の際気をつけることは以下。

  • エラー発生時にすべてのスレッドが終了する
  • ファイルを見つけた場合はその時点ですべてのスレッドが終了する

これらを満たすために10章で使ったwithAsyncを用いる。Asyncは指定したIOを別スレッドで開始するもの。withAsyncはそれをbracketで包むことで実行終了時のクリーンアップ処理を内包している。

Control.Concurrent.Asyncに含まれているので今回は自前の実装はしない。
https://hackage.haskell.org/package/async-2.0.1.4/docs/Control-Concurrent-Async.html#v:withAsync

Async関係の関数が以前出てきたのは11章なのでこちら参照。
http://jsapachehtml.hatenablog.com/entry/2015/03/08/140358

各ディレクトリを並列に処理するために新しいスレッドを順次立ち上げていくがその構造はツリー形になり、それはfoldを使って表せる。畳み込みに使う関数をsubfindとして以下のように定義する。

import Control.Concurrent.Async

subfind :: String -> FilePath
        -> ([Async (Maybe FilePath)] -> IO (Maybe FilePath))
        ->  [Async (Maybe FilePath)] -> IO (Maybe FilePath)
subfind filename dir inner asyncs = do
  isdir <- doesDirectoryExist dir
  if not isdir
    then inner asyncs
    else withAsync (find_par filename dir) $ \a -> inner (a:asyncs)

find_par :: String -> FilePath -> IO (Maybe FilePath)
find_par filename dir = do
  fs <- getDirectoryContents dir
  let fs' = sort $ filter (`notElem` [".", ".."]) fs
  if any (== filename) fs'
    then return $ Just $ dir </> filename
    else do
      let ps = map (dir </>) fs'
      foldr (subfind filename) dowait ps []
  where
    dowait as = loop $ reverse as

    loop [] = return Nothing
    loop (a:as) = do
      r <- wait a
      case r of
        Nothing -> loop as
        Just p -> return $ Just p

現在対象としているディレクトリに指定したファイルが見つからない場合にはその中の各ディレクトリを再帰的に検索していく。検索を再帰的に行うのがsubfindだがその中でfindを呼んでいるのでわかりづらい。さらにfoldrしているところはとても難解で理解に苦労した。

別のページとしてまとめたので詳細はこちらで。
subfindの実装(Parallel and Concurrent Programming in Haskell Chapter 13の一部) - MEMOcho-

パフォーマンス

sequentialなバージョンとparallelなバージョンで存在しないファイルを検索した場合以下のような結果になった。

ちなみに検索対象は自分のマシンのホームディレクトリだが、全部で66000のエントリがあり、そのうち35000がディレクトリだった。
また、以下のコマンド実行時はfindというコマンドを実行してその第1引数としてfind_seqやfind_parのように呼び出すバージョンを指定するようにした。
詳細はgithubを。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Find/Main.hs#L13

まずsequentialバージョン。

$ ./dist/build/find/find find_seq aaaaa ~/ +RTS -s                                                                                                                                         
Nothing
     509,897,416 bytes allocated in the heap
      13,693,824 bytes copied during GC
         177,128 bytes maximum residency (4 sample(s))
          25,928 bytes maximum slop
               2 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       981 colls,     0 par    0.02s    0.02s     0.0000s    0.0002s
  Gen  1         4 colls,     0 par    0.00s    0.00s     0.0001s    0.0002s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.31s  (  0.71s elapsed)
  GC      time    0.02s  (  0.02s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.33s  (  0.73s elapsed)

次にparallelバージョン。

$ ./dist/build/find/find find_par aaaaa ~/ +RTS -s                                                                                                                                         
Nothing
     546,395,680 bytes allocated in the heap
     157,722,216 bytes copied during GC
      10,989,232 bytes maximum residency (21 sample(s))
         621,552 bytes maximum slop
              28 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      1016 colls,     0 par    0.19s    0.20s     0.0002s    0.0005s
  Gen  1        21 colls,     0 par    0.04s    0.05s     0.0025s    0.0053s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.38s  (  0.75s elapsed)
  GC      time    0.24s  (  0.25s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.62s  (  1.01s elapsed)

2つの結果で異なるのは使用メモリと経過時間。使用メモリについてはparallelバージョンの方が14倍も多くなっている。これはスレッドを生成するなどparallelに処理するためのオーバヘッドがかかるため。経過時間についてもオーバヘッドの分parallelバージョンの方が大きくなっている。

parallelバージョンをコア2つ検索した場合。

./dist/build/find/find find_par aaaaa ~ +RTS -s -N2                                                                                                                                      
Nothing
     546,511,664 bytes allocated in the heap
     139,327,048 bytes copied during GC
       8,853,000 bytes maximum residency (21 sample(s))
         514,080 bytes maximum slop
              23 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       574 colls,   574 par    0.14s    0.09s     0.0002s    0.0004s
  Gen  1        21 colls,    20 par    0.05s    0.03s     0.0013s    0.0032s

  Parallel GC work balance: 73.39% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.42s  (  0.43s elapsed)
  GC      time    0.18s  (  0.12s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.61s  (  0.55s elapsed)

parallelバージョンをコア4つ検索した場合。

./dist/build/find/find find_par aaaaa ~ +RTS -s -N4                                                                                                                                      
Nothing
     546,697,408 bytes allocated in the heap
     127,976,816 bytes copied during GC
       9,595,944 bytes maximum residency (18 sample(s))
         592,392 bytes maximum slop
              25 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       322 colls,   322 par    0.17s    0.05s     0.0002s    0.0005s
  Gen  1        18 colls,    17 par    0.06s    0.02s     0.0011s    0.0024s

  Parallel GC work balance: 63.98% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.44s  (  0.22s elapsed)
  GC      time    0.23s  (  0.07s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.67s  (  0.29s elapsed)

コアを1つでfind_parを実行したときに比べてほぼ2倍、4倍のスピードアップを実現した。

本だと1コアの場合と比べて2コア場合以下のような結果になっている。

  • 経過時間は2倍以上早くなった
  • GCにかかる時間が減少した
  • 使用メモリが減少した

私の環境だとそこまでのパフォーマンスアップにはならなかったが、これはファイル数が少ないためその影響が出づらい、またはghcのバージョンの違いなどが影響しているのではと思われる。

セマフォによる粒度の調整

ここではsequentialバージョンと比べたときに2倍、4倍とはなっていないことに着目してそのオーバヘッド分がどこから来ているのかを考える。
sequentialな場合のthreadscope画像
f:id:y-kamiya:20150314152532p:plain

4コアの場合のthreadscope画像
f:id:y-kamiya:20150314152525p:plain

4コアの画像では細かくてわかりづらいものの、水色や黄緑のラインが細かく入っているのがわかる。水色がスレッド生成、黄緑がスレッドの起動を表している。4コアの場合はスレッドの生成や起動が頻繁に行われていることが画像からわかる。

現在のparallelバージョンのプログラムだとスレッドの生成(=Asyncの生成)はディレクトリの数だけ起こる。これは粒度が細かすぎると考えられる。

第1部でやったように任意の大きさのchunkに分けることで粒度を調整するというのが思い浮かぶが、今回扱っているのはディレクトリ構成であり各ディレクトリ内に含まれるエントリも均一ではないためうまく粒度を調整できない。

よってここでは別の方法を使う。今回問題なのはスレッドが生成されすぎてしまうということなので、スレッドの生成数に上限を設けることで対応する。スレッド数が上限値に達しているときは新しいスレッドを作らず、そのスレッドでsequentialに処理を続ける。現在のスレッド数を数えて判定するためにセマフォを用いる。

よく使われるセマフォの機能としては、リソースを使う際にカウントを-1し、使い終わったらカウントを+1する。カウントが0の際は1以上になるのを待つ。

今回の場合、カウントが0の場合はリソースの解放を待つのでなくそのまま処理を続けるということなので同期を取るのに比べると簡単に実装できる。ここではそれをNon Blocking Semaphore(NBSem)として実装する。

newtype NBSem = NBSem (MVar Int)             
                                             
newNBSem :: Int -> IO NBSem                  
newNBSem i = do                              
  m <- newMVar i                             
  return $ NBSem m                           
                                             
tryAcquireNBSem :: NBSem -> IO Bool          
tryAcquireNBSem (NBSem m) = do               
  modifyMVar m $ \i ->                       
    if i == 0                                
       then return (i, False)                
       else let !z = i-1 in return (z, True) 
                                             
releaseNBSem :: NBSem -> IO ()               
releaseNBSem (NBSem m) = do                  
  modifyMVar m $ \i ->                       
    let !z = i+1 in return (z,())            

MVarの中にカウントを持つ形の実装。リソースの取得・解放においてはMVarの中の値を+1, -1するだけ。

NBSemを使うとsubfindは以下のように書ける。

subfind :: NBSem -> String -> FilePath                                  
        -> ([Async (Maybe FilePath)] -> IO (Maybe FilePath))            
        ->  [Async (Maybe FilePath)] -> IO (Maybe FilePath)             
subfind sem filename dir inner asyncs = do                              
  isdir <- doesDirectoryExist dir                                       
  if not isdir                                                          
    then inner asyncs                                                   
    else do                                                             
      q <- tryAcquireNBSem sem                                          
      if q                                                              
        then do                                                         
          let dofind = find sem filename dir `finally` releaseNBSem sem 
          withAsync dofind $ \a -> inner (a:asyncs)                     
        else do                                                         
          r <- find sem filename dir                                    
          case r of                                                     
            Nothing -> inner asyncs                                     
            Just _ -> return r                                          

ディレクトリ内に指定のファイルが見つからなかったときはまずセマフォのカウントを確認。カウントが0の状態であればwithAsyncによって新しいスレッドを生成する。そうでなければそのスレッドにてfindの処理を続ける。

findにはセマフォの値を渡すようにする

main = do
  [n,s,d] <- getArgs
  sem <- newNBSem (read n)
  find sem s d >>= print

これを用いた検索の結果は以下のようになった。
4コアでセマフォが8のとき。

./dist/build/find/find find_par_sem 8 aaaaa ~ +RTS -s -N4 -l                                                                                                                             
Nothing
     531,868,400 bytes allocated in the heap
      24,751,824 bytes copied during GC
       1,163,720 bytes maximum residency (21 sample(s))
          78,832 bytes maximum slop
               5 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       331 colls,   331 par    0.05s    0.02s     0.0001s    0.0003s
  Gen  1        21 colls,    20 par    0.01s    0.00s     0.0001s    0.0004s

  Parallel GC work balance: 30.21% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.41s  (  0.27s elapsed)
  GC      time    0.06s  (  0.02s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.47s  (  0.29s elapsed)

私の環境ではfind_parとくらべてそれほど差が出なかった。載せた結果の経過時間は一致している。何度も実行してだいたいの平均を比べてもほぼ同じくらいの経過時間だった。こちらもファイル数が少なすぎたのかもしれない。

threadscopeの結果は以下。
f:id:y-kamiya:20150314182005p:plain
水色の線(スレッドの生成)が減っているのがわかる。ただ、問題はGC以外のタイミングでもスレッドの実行が止まっているときがある点である。これはセマフォのread, writeで競合してブロックされるスレッドが存在するためと考えられる。

よってNBSemを改善したい。ブロックしてしまうのが原因であればSTMを使えば回避できるが今回はIORefを使う。

atomicModifyIORef :: IORef a -> (a -> (a, b)) -> IO b

atomicModifyIORefはIORef内の値を取り出して指定した関数を適用する。関数は元々入っていた値と新しく入れた値のtupleを返すもの。最終的には新しく入れた値がIOとして返ってくる。

atomicModifyIORefは処理できるトランザクションが一つのSTMと考えることができ、今回の場合はカウントの値を増減するのみの処理であるためSTMを使うよりもオーバヘッドが小さくできる。

{-# LANGUAGE BangPatterns #-}
import Data.IORef

newtype NBSem = NBSem (IORef Int)     
                                      
newNBSem :: Int -> IO NBSem           
newNBSem i = do                       
  m <- newIORef i                     
  return $ NBSem m                    
                                      
tryWaitNBSem :: NBSem -> IO Bool      
tryWaitNBSem (NBSem m) = do           
  atomicModifyIORef m $ \i ->         
    if i == 0                         
       then (i, False)                
       else let !z = i-1 in (z, True) 
                                      
signalNBSem :: NBSem -> IO ()         
signalNBSem (NBSem m) = do            
  atomicModifyIORef m $ \i ->         
    let !z = i+1 in (z,())            

ポイントはBangPatternを使って!z = i-1とすることで計算されずに保持されるサンクをなくすようにしている点。

main関数は先ほどと同じでも問題ないが、コア数と取得してその4倍の値をセマフォに設定している。

main = do
  [s,d] <- getArgs
  n <- getNumCapabilities
  sem <- newNBSem (if n == 1 then 0 else n * 4)
  find sem s d >>= print

実行結果は以下。

 ./dist/build/find/find find_par_sem_ioref aaaaa ~ +RTS -s -N4 -l                                                                                                                         
Nothing
     535,757,128 bytes allocated in the heap
      30,604,776 bytes copied during GC
       1,491,200 bytes maximum residency (23 sample(s))
          95,552 bytes maximum slop
               6 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       305 colls,   305 par    0.05s    0.02s     0.0001s    0.0003s
  Gen  1        23 colls,    22 par    0.01s    0.00s     0.0002s    0.0003s

  Parallel GC work balance: 43.12% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.42s  (  0.23s elapsed)
  GC      time    0.07s  (  0.02s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.48s  (  0.25s elapsed)

find_parと比べて若干高速化できた。

ParIOを使ったバージョン

第4章ではParモナドを使って並列処理を行った。今回やっている内容だと並列にしたい処理中にIOが入っている。この場合、ParIOを使うことでParと同じ操作ができる。操作として異なる点はParIOの実行にはrunParIOを用いるということだけ。

処理の性質としては中でIOを実行しているので非決定性を持つことになる。(実行毎に結果が違う可能性あり)

ParIOを用いることでsubfind, find, mainは以下のように書ける。

subfind :: String -> FilePath                                 
        -> ([IVar (Maybe FilePath)] -> ParIO (Maybe FilePath))
        ->  [IVar (Maybe FilePath)] -> ParIO (Maybe FilePath) 
subfind filename dir inner ivars = do                         
  isdir <- liftIO $ doesDirectoryExist dir                    
  if not isdir                                                
    then inner ivars                                          
    else do                                                   
      v <- new                                                
      fork $ find filename dir >>= put v                      
      inner $ v : ivars                                       
                                                              
find :: String -> FilePath -> ParIO (Maybe FilePath)          
find filename dir = do                                        
  fs <- liftIO $ getDirectoryContents dir                     
  let fs' = sort $ filter (`notElem` [".", ".."]) fs          
  if any (== filename) fs'                                    
    then return $ Just $ dir </> filename                     
    else do                                                   
      let ps = map (dir </>) fs'                              
      foldr (subfind filename) dowait ps []                   
    where                                                     
      dowait vs = loop $ reverse vs                           
                                                              
      loop [] = return Nothing                                
      loop (v:vs) = do                                        
        r <- get v                                            
        case r of                                             
          Nothing -> loop vs                                  
          Just p -> return $ Just p                           

main = do
  [s,d] <- getArgs
  runParIO (find s d) >>= print

findの変更点は値を取り出す際にwaitではなくgetを使うようにしたことだけ。

subfindではまず型を以下のように変更。

  • MVar => IVar
  • IO => ParIO

関数内の処理としては、newで新たにIVarを生成してforkした上でfindを実行する(forkIOとは違ってスレッドが立ち上がるわけではない)。findの結果は生成したIVarに格納される。

mainではrunParIOによってParIOの処理を実行する。

実行結果。

./dist/build/find/find find_par_io aaaaa ~ +RTS -s -N4 -l                                                                                                                                
Nothing
     543,403,528 bytes allocated in the heap
      34,353,688 bytes copied during GC
       1,120,752 bytes maximum residency (14 sample(s))
         103,512 bytes maximum slop
               6 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       348 colls,   348 par    0.07s    0.03s     0.0001s    0.0008s
  Gen  1        14 colls,    13 par    0.02s    0.01s     0.0004s    0.0010s

  Parallel GC work balance: 23.24% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.40s  (  0.21s elapsed)
  GC      time    0.09s  (  0.03s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.49s  (  0.25s elapsed)

私の環境ではIORefを用いたバージョンとほぼ同じ結果となった。平均するとこちらの方が若干パフォーマンスがよかったかもという感じ。

パフォーマンスがよい理由としては、まず共有状態のロックを取ったり確認したりという処理がなくなったこと。また、Parの並列処理の実行は効率よくなるようにうまくスケジューリングされていることが挙げられる。

ただし、上のプログラムはエラー時に処理が止まるようになっていない。これについては読者への宿題ということらしい。

ちなみにParIOを用いたバージョンのthreadscopeはこちらのよう。
f:id:y-kamiya:20150314181611p:plain
スレッドの生成を示す水色のバーはなくなっている。

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング