Eval monadと並列実行(Parallel and Concurrent Programming in Haskell Chapter 2)

こちらの記事のつづき
http://jsapachehtml.hatenablog.com/entry/2015/01/03/101754

もともとの本の内容はこちらで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch02.html
また、このページのコードは↑から引用した
コードの全体は著者が公開しているsampleにある
https://github.com/simonmar/parconc-examples

並列性を表すにはEvalモナドを使う
Evalモナドは2つの操作rseq、rparを持っている
rseqはシーケンシャルな実行を表し、rparは並列での実行を表す
また、Evalモナドから値を取り出すにはrunEvalが使える

rseq, rparを組み合わせて使った場合の処理のタイミング

1.

runEval $ do
  a <- rpar (f x)
  b <- rpar (f y)
  return (a,b)

a, bどちらの値の取得にもrparを使っているため、
f xやf yの実行完了を待たずにすぐreturnが実行される

2.

runEval $ do
  a <- rpar (f x)
  b <- rseq (f y)
  return (a,b)

f x, f yの処理は並列に実行されるが、returnが実行されるのはf yの実行完了後になる

3.

runEval $ do
  a <- rpar (f x)
  b <- rseq (f y)
  rseq a
  return (a,b)

このように返ってきた値に対して再度rseqを適用することで計算完了を待つことができる
この場合、f x, f yどちらの実行も完了してからreturnが実行される

4.

runEval $ do
  a <- rpar (f x)
  b <- rpar (f y)
  rseq a
  rseq b
  return (a,b)

3.と同じ結果になるがこちらの方が対称性があってよい

2.の使い方をすることはあまりない
どちらの処理が早く終わるかプログラム時にわかることはあまりないためである

1.についてはひとまず処理を開始しておきたいときに使うかも
3. 4.は同じ意味だが4.の方が対称性があってわかりやすいためこちらを使うとよい

並列実行するためのプログラムは--threadedのオプションを付けてコンパイルする必要がある
また、実行時に使うべきcpuの数をランタイムオプションで指定する
ソースをrpar.hsとすると以下のようにする

$ ghc -O2 rpar.hs -threaded
$ ./rpar.hs +RTS -N2

数独ソルバによる並列化の具体例

著者のsampleがこちらにあるのでコード全体についてはこちらで参照
https://github.com/simonmar/parconc-examples

この中のsudoku.hsの処理を並列化するという例
まずはシーケンシャルな場合を考える

import Sudoku
import Control.Exception
import System.Environment
import Data.Maybe

main :: IO ()
main = do
  [f] <- getArgs                           
  file <- readFile f                       

  let puzzles   = lines file               
      solutions = map solve puzzles        

  print (length (filter isJust solutions)) 

まず渡された名前のファイルから問題を読み込む
数独の問題自体は以下のように表現されている

# sudoku17_1000.txt
.......2143.......6........2.15..........637...........68...4.....23........7....
.......241..8.............3...4..5..7.....1......3.......51.6....2....5..3...7...

ドットは空白のマスを表しており、すべてのマス(81マス)の状態がstringで書かれている

読み込んだ文字列を改行区切りでリストにしてpuzzlesに束縛後
それをmapして1つずつsolveしている

solveは以下の型

solve :: String -> Maybe Grid

解が存在しなければNothingが、存在すれば見つけた解をJustで返す

最後に解が存在する問題の数をprintして終了

一つ注意点としては
isJustを入れて解が存在したかどうかを判定する必要があるということ
それをしなかった場合、
puzzlesの数のみを数えればよくなるためsolveが実行されない
これはhaskellの遅延評価という性質のためである

実行時間などの解析結果付きで実行するには以下のようにオプションを付ける

$ ghc -O2 sudoku1.hs -rtsopts
$ ./sudoku1.hs sudoku17.1000.txt +RTS -s

コンパイル時のオプションの意味

  • -O2:生成コードの最適化
  • -rtsopts:実行時に+RTSでランタイムオプションを渡せるよう指定

実行時のオプションの意味

  • +RTS:これ以降のオプションをランタイムオプションとして認識させる

  
自分の端末で実行してみた結果はこんな感じ

$ ./sudoku1 sudoku17.1000.txt +RTS -s
./sudoku1 sudoku17.1000.txt +RTS -s                                                                                                                                                        
1000
   2,328,560,200 bytes allocated in the heap
      36,505,136 bytes copied during GC
         241,256 bytes maximum residency (14 sample(s))
          83,480 bytes maximum slop
               2 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      4506 colls,     0 par    0.11s    0.11s     0.0000s    0.0004s
  Gen  1        14 colls,     0 par    0.00s    0.00s     0.0003s    0.0005s

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    2.42s  (  2.46s elapsed)
  GC      time    0.11s  (  0.12s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    2.53s  (  2.57s elapsed)

  %GC     time       4.4%  (4.6% elapsed)

  Alloc rate    963,866,675 bytes per MUT second

  Productivity  95.6% of total user, 93.9% of total elapsed

今回は並列化することによる実行時間の短縮を目指しているのでTotal timeを見ていく

次にrparを使って並列化してみる
問題の全体を2つに分けてそれぞれrparにかけて並列に処理する

main :: IO ()
main = do
  [f] <- getArgs
  file <- readFile f

  let puzzles = lines file

      (as,bs) = splitAt (length puzzles `div` 2) puzzles 

      solutions = runEval $ do
                    as' <- rpar (force (map solve as))   
                    bs' <- rpar (force (map solve bs))   
                    rseq as'                             
                    rseq bs'                             
                    return (as' ++ bs')                  

  print (length (filter isJust solutions))

シーケンシャルの場合と変わったのはsolutionsを求める部分とその前に問題を2つに分けたこと
rparの中でforceという関数を使っているが
これは引数をnormal formとなるまで評価する関数である

これを入れなかった場合、rparの時点ではMaybe Gridの形まで評価されず、
最終的な出力でisJustを行ったときに評価されるため必要な処理を並列化できない

実行結果はこちら

./sudoku-sample sudoku17.1000.txt +RTS -N2 -s                                                                                                                                            
1000
   2,336,626,216 bytes allocated in the heap
      51,306,536 bytes copied during GC
       2,716,936 bytes maximum residency (8 sample(s))
         305,592 bytes maximum slop
               9 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      2986 colls,  2986 par    0.86s    0.19s     0.0001s    0.0058s
  Gen  1         8 colls,     7 par    0.03s    0.02s     0.0024s    0.0042s

  Parallel GC work balance: 51.78% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 2 (1 converted, 0 overflowed, 0 dud, 0 GC'd, 1 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    2.17s  (  1.87s elapsed)
  GC      time    0.89s  (  0.21s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    3.06s  (  2.08s elapsed)

  Alloc rate    1,078,980,034 bytes per MUT second

  Productivity  70.8% of total user, 104.0% of total elapsed

gc_alloc_block_sync: 11250
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 397

経過時間を比較すると2.57/2.08 = 1.24倍の早くなった
本のサンプルよりは短縮時間が少ないが実行環境などによると思われる

ちなみにTotal timeの3.06sはCPUの動作時間の総合計
つまり2coreでそれぞれのcpuが1sずつ処理すれば2sという出力になる
スピードアップの判定は経過時間で測るのでelapsedという方の時間を使う

使うコアを2つにしたのだから理想的には2倍となっても良さそうだが、 そうなっていない
その理由を調べるためthreadscopeを使う
threadscopeは各CPUがある時間帯にどのような処理をしていたかという情報を
グラフィカルに確認するためのツールである

mac, windowsならばこちらのページからバイナリインストールできる
https://www.haskell.org/haskellwiki/ThreadScope#Binary_releases

threadscopeが解析できる情報を取得するには

  • コンパイル時のオプションに-eventlog
  • 実行時のオプションに-l

を加える

$ ghc -O2 sudoku2.hs -threaded -rtsopts -eventlog
$ ./sudoku2 sudoku17.1000.txt +RTS -N2 -l

ちなみにsudoku2.hsはsudoku1.hsを並列化したもので著者のサンプル内にある

threadscopeの出力を見ると片方のCPUの処理が他方より早く終了してしまっていることがわかる
splitAtによって2つに分けた問題をそれぞれrparで実行したためタスクの大きさのバランスが取れていない

rparによって生成される並列に実行されるタスクをsparkと呼ぶが、
ghcでは生成されたsparkを空いているCPUに割り当てていく仕組みが存在する
そのため、タスクの粒度が小さなsparkをたくさん生成して処理させた方がCPU間のバランスは取りやすいと考えられる

これを実現するためにparMapという関数を定義する

parMap :: (a -> b) -> [a] -> Eval [b]
parMap f [] = return []
parMap f (a:as) = do
   b <- rpar (f a)
   bs <- parMap f as
   return (b:bs)

リストの各要素にfを適用するが、rparも適用することによって各処理を並列に行えるようにする
parMapが実行されると要素一つに対し1つのsparkが生成されたことになる

これを使ってsudoku2.hsを書き換えたのがsudoku3.hsである
mainは以下のようになる

main :: IO ()
main = do
  [f] <- getArgs
  file <- readFile f

  let puzzles   = lines file
      solutions = runEval (parMap solve puzzles)

  print (length (filter isJust solutions))

自分で問題を分割する必要がなくなりすっきりした

自分の環境での実行結果

./sudoku-sample sudoku17.1000.txt +RTS -N2 -s                                                                                                                                            
1000
   2,351,383,120 bytes allocated in the heap
      62,853,544 bytes copied during GC
       2,263,232 bytes maximum residency (27 sample(s))
         105,496 bytes maximum slop
               8 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      2483 colls,  2483 par    1.07s    0.22s     0.0001s    0.0111s
  Gen  1        27 colls,    26 par    0.07s    0.04s     0.0015s    0.0077s

  Parallel GC work balance: 67.70% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 1000 (1000 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    1.92s  (  1.50s elapsed)
  GC      time    1.14s  (  0.27s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    3.06s  (  1.76s elapsed)

  Alloc rate    1,227,582,067 bytes per MUT second

  Productivity  62.6% of total user, 108.7% of total elapsed

gc_alloc_block_sync: 3302
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 0

シーケンシャルなバージョンと比べると2.57/1.76 = 1.46倍早くなった
また、上の結果のSPARKSという欄を見ると1000となっており、sparkが1000個(問題の数)分生成されたことがわかる

SPARKS: 1000 (1000 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

SPARKSの欄の見方は次のよう

  • converted: 生成されたspark数
  • overflowed: spark poolには上限値が設定されているためそれを超えた分は生成されず、ここに計上される
  • dud: 既に存在するsparkと同じ処理だった場合ここに計上され、rparは実行されない
  • GC'd: プログラム内で不必要な処理にrparを適用した場合ここに計上される
  • fizzled: sparkとして生成された際は評価されなかったものの、後に独立して評価された場合ここに計上される

fizzledに関してはどういう状況かあまりよくわかっていないが
並列処理がうまくいっていないという意味ではある
sudoku2.hsの実行結果ではfizzledが1となっているので
他の並列処理が終わった後に実行されたsparkの数ということだろうか

sudoku3.hsの実行結果をthreadscopeで見ると
最初の16msほどは並列処理されていないことがわかる
これは入力ファイルから問題を読み出す処理の部分であり、
この部分は並列化できない

並列化によってどの程度までスピードアップできるかについてはi
アムダールの法則によって理論値を算出することができる

1 / ((1 - P) + P / N)
  • Pは全体に占める並列化した部分の割合
  • NはCPUの数

を表しており
分子が元の計算時間、分母が並列化によって改良した後の計算時間で
改良後の計算時間は
並列化できない部分の時間+並列化できる部分の計算時間 / コア数
と表せるため上記の式になる