読者です 読者をやめる 読者になる 読者になる

RSAアルゴリズムを並列化(Parallel and Concurrent Programming in Haskell Chapter 3 and 4 example)

以前まとめた第3章、第4章についてはこちら
http://jsapachehtml.hatenablog.com/entry/2015/01/24/131408
http://jsapachehtml.hatenablog.com/entry/2015/01/31/221609

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch04.html
図や式についてはほぼ引用した

3章では遅延評価の性質を利用してデータ毎に並列化を行う
4章ではParモナドを利用し独自に型を定義することでパイプラインを実現する
この場合は処理の段階毎に並列化を行うことになる

それぞれメリット・デメリットは原文の第4章の初めの方に書かれている
このブログでも第4章の記事の最初〜真ん中くらいにかけてまとめた

parListを用いたストリーム処理の例(第3章)

rsaを題材としてストリーム処理を並列化するという具体例
こんな感じで使える

$ echo "Hello World!" | ./rsa encrypt - | ./rsa decrypt -
Hello World!

第一引数に暗号化か復号化を示す文字列、第2引数に入力ファイル名(-なら標準入力)

このプログラムは入力を一度にすべて読みだして暗号化・復号化を行うわけではなく
先頭から順に処理を行う
それの証拠に以下のような実行結果になる

$ ./rsa encrypt /usr/share/dict/words >/dev/null +RTS -s
   8,040,128,392 bytes allocated in the heap
      66,756,936 bytes copied during GC
         186,992 bytes maximum residency (71 sample(s))
          36,584 bytes maximum slop
               2 MB total memory in use (0 MB lost due to fragmentation)

/usr/share/dict/wordsは1MBほどのファイルだが最大消費メモリは187KBくらいである

暗号化を行う処理は以下のよう

encrypt :: Integer -> Integer -> ByteString -> ByteString
encrypt n e = B.unlines
            . map (B.pack . show . power e n . code)
            . chunk (size n)

nとeは暗号化に用いる数である
rsaアルゴリズムについてはwikipediaに詳しく書いてある
sizeはnに応じて分割すべきデータの大きさを求める
chunkでその大きさにデータを分割
mapの行で実際に暗号化を行っている
codeでByteStringをIntegerに変換してpowerでそれを暗号化
そして再度ByteStringにする
最後にunlinesで一つのByteStringとする

これをparListで並列化すると以下のように書ける

encrypt n e = B.unlines
            . withStrategy (parList rdeepseq)
            . map (B.pack . show . power e n . code)
            . chunk (size n)

単にwithStrategyの行を追加しただけであるがこれでOK
chunkによって分けたデータのまとまりをそれぞれ並列に暗号化してくれる

rsaのプログラムは面白そうだったのでsampleを見つつ自分でも作ってみた
実行結果はこちら
まずシーケンシャルな方

./dist/build/rsa/rsa enc_seq /usr/share/dict/words > /dev/null +RTS -s                                                                                                                                            
   2,462,899,904 bytes allocated in the heap
     137,860,016 bytes copied during GC
       5,016,704 bytes maximum residency (8 sample(s))
       2,154,184 bytes maximum slop
              18 MB total memory in use (1 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      4749 colls,     0 par    0.28s    0.29s     0.0001s    0.0002s
  Gen  1         8 colls,     0 par    0.00s    0.00s     0.0006s    0.0008s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    2.51s  (  2.56s elapsed)
  GC      time    0.28s  (  0.29s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    2.80s  (  2.86s elapsed)

  Alloc rate    980,114,723 bytes per MUT second

  Productivity  90.0% of total user, 88.1% of total elapsed

gc_alloc_block_sync: 0
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 0

次にパラレル

./dist/build/rsa/rsa enc /usr/share/dict/words > /dev/null +RTS -s -N2                                                                                                                                            
   2,471,938,448 bytes allocated in the heap
     154,972,656 bytes copied during GC
       5,228,648 bytes maximum residency (10 sample(s))
       2,190,400 bytes maximum slop
              20 MB total memory in use (2 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      2533 colls,  2533 par    1.72s    0.30s     0.0001s    0.0020s
  Gen  1        10 colls,     9 par    0.02s    0.01s     0.0012s    0.0017s

  Parallel GC work balance: 52.00% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 2494 (2494 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    1.61s  (  1.44s elapsed)
  GC      time    1.75s  (  0.31s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    3.36s  (  1.75s elapsed)

  Alloc rate    1,537,054,346 bytes per MUT second

  Productivity  48.0% of total user, 91.8% of total elapsed

gc_alloc_block_sync: 3357
whitehole_spin: 0
gen[0].sync: 15
gen[1].sync: 6

2.86s -> 1.75sに短縮できた
ただ、消費メモリについては
シーケンシャルなプログラムでもパラレルなプログラムでも5MBほど食っていた
著者のサンプルとは違って1000文字ずつ固定でデータを区切っているためだと思われる
ちなみに私の環境ではwordsファイルは2.3MB程度だった

原著の流れだとシーケンシャルなプログラムよりパラレルなプログラムの方が
メモリ消費が大きくなるという問題が生じる
それを解決するためにparBufferという関数をparListの代わりに用いる

parBuffer nで作成するスパークの数をnに制限できるので
それによってメモリ消費も一定以下に抑えられる


パイプラインでの並列化(第4章)

パイプラインのような処理にして各工程を並列化する例

題材は第3章でも出てきたRSA暗号アルゴリズム
第3章ではchunkとして小分けにしたデータを一つの単位として
データ毎に並列化した

今回はそうではなく、処理の段階をいくつかのステップに分けて
その各ステップが独立して処理されるようにすることで
ステップ毎に並列化する

haskellのリストを使った処理は遅延評価の性質を使って
パイプラインのような形に元々なっているが
パイプラインを独自に定義することで処理の粒度の制御がしやくすくなったり
ストリーム内で状態を扱ったりできる

パイプラインの各ステップとして以下のような概念を用いる
ストリームを作る役がProducer
ストリーム内のデータを加工し、ストリームとして次に渡すのがMapper
ストリームのデータを最終的に使うのがConsumer


ストリームを表すデータ型を定義

data IList a = Nil | Cons a (IVar (IList a))

type Stream a = IVar (IList a)

構造としてはリストと同じだが
連結されたデータの後ろがIVarに包まれた自分自身となっている

以下でストリームを扱うための関数を定義
まずはProducer

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
  var <- new
  fork $ loop xs var
  return var
  where
    loop [] var = put var Nil
    loop (x:xs) var = do
      tail <- new
      put var (Cons x tail)
      loop xs tail

型変数aがNFDataに制限されているのはputを使っているため
最初意味がわかるまで時間がかかったが、やはりIVarを箱と考えるとわかりやすい
箱=将来的にstream型のデータが入ることになるもの
である

メインスレッドはnewで箱を作って返すだけ
箱はloopに渡しておいて実際にstream型のデータを作る処理はそちらで行う

loop内ではまた箱(tail)を作る
先頭の要素(x)と作った箱をConsでつなげてstream型を作り、putで上から渡されてきた箱に入れる
そして残りの要素(xs)と新しく作った箱をloopに渡して再帰的に処理

つまり箱の中にさらに箱が入り、さらに...という再帰的な構造になっている
loopがまわるにつれて後ろの要素がstreamに変換されてつながっていく

次にConsumer

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc instrm = do
  ilst <- get instrm
  case ilst of
    Nil -> return acc
    Cons h t -> streamFold fn (fn acc h) t

名前の通りだがfoldlのStream版という形
渡されたStream(つまりIVar)から中身を取り出してNilかConsかで場合分け
NilならStreamの終わりなのでそこまでにfoldしたデータをParとして返す
Consなら先頭の要素にfnを適用し、残りをstreamFoldで再帰的に処理

最後にMapper
これはProducerとConsumerを合わせた感じ

streamMap :: NFData b => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
  outstrm <- new
  fork $ loop instrm outstrm
  return outstrm
  where
    loop instrm outstrm = do
      ilst <- get instrm
      case ilst of
        Nil -> put outstrm Nil
        Cons h t -> do
          newtl <- new
          put outstrm $ Cons (fn h) newtl
          loop t newtl

まず出力となるStreamを作ってloopに渡し、そのStreamを返す
メインスレッドはこれだけ
forkしたスレッド側で実際の変換処理を行う
まず入力から中身を取り出して場合わけ
NilならStreamの終わりなのでNilを入れて返す
Consなら先頭の要素にfnを適用し、後ろに新しいStreamをくっつける
新しいStreamはさらにloopに渡して再帰的に処理

以上の関数を使って第3章で作成したrsaのプログラムを並列化すると以下のようにできる

encrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString)
encrypt n e s = streamMap (B.pack . show . power e n . code) s

decrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString)
decrypt n d s = streamMap (B.pack . decode . power d n . integer) s

pipeline :: Integer -> Integer -> Integer -> ByteString -> ByteString
pipeline n e d b = runPar $ do
  s0 <- streamFromList (chunk (size n) b)
  s1 <- encrypt n e s0
  s2 <- decrypt n d s1
  xs <- streamFold (\x y -> (y : x)) [] s2
  return (B.unlines (reverse xs))

このpipelineは暗号化してすぐに元に戻しているので全く意味はないが
並列化のスピードアップを見るためなので気にしない


実際に手元で動かしてみた結果は以下のようになった
まずシーケンシャルな方

$ ./dist/build/rsa_pipe/rsa_pipe encdec_seq /usr/share/dict/words >/dev/null +RTS -s -N2 -l                                                                                                                         
   4,905,882,584 bytes allocated in the heap
     280,130,872 bytes copied during GC
       5,096,136 bytes maximum residency (15 sample(s))
       2,164,560 bytes maximum slop
              19 MB total memory in use (1 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      9463 colls,  9463 par    0.78s    0.83s     0.0001s    0.0079s
  Gen  1        15 colls,    14 par    0.01s    0.01s     0.0006s    0.0014s

  Parallel GC work balance: 0.81% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.05s elapsed)
  MUT     time    6.22s  (  6.04s elapsed)
  GC      time    0.80s  (  0.84s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    7.02s  (  6.93s elapsed)

  Alloc rate    788,386,398 bytes per MUT second

  Productivity  88.6% of total user, 89.8% of total elapsed

gc_alloc_block_sync: 3272
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 19

次にパラレルな方

$ ./dist/build/rsa_pipe/rsa_pipe pipeline /usr/share/dict/words >/dev/null +RTS -s -N2 -l                                                                                                                           
   4,903,237,784 bytes allocated in the heap
     296,976,632 bytes copied during GC
       5,399,688 bytes maximum residency (19 sample(s))
       2,303,984 bytes maximum slop
              21 MB total memory in use (3 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0      5092 colls,  5092 par    3.18s    0.56s     0.0001s    0.0037s
  Gen  1        19 colls,    18 par    0.04s    0.02s     0.0011s    0.0021s

  Parallel GC work balance: 55.59% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    3.73s  (  3.13s elapsed)
  GC      time    3.22s  (  0.58s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    6.95s  (  3.71s elapsed)

  Alloc rate    1,315,747,356 bytes per MUT second

  Productivity  53.6% of total user, 100.5% of total elapsed

gc_alloc_block_sync: 4837
whitehole_spin: 0
gen[0].sync: 158
gen[1].sync: 12

6.93s -> 3.71sに短縮した

両者の方法のまとめ

parListを使った並列化では
やはり元のアルゴリズムを表すコードと並列化のためのコードが明確に分けられていることでコードが書きやすいと感じた

元のコードの形にもよるが、一度シーケンシャルなコードを書いた上で並列化のための関数を足すだけでよいのは簡単でよい

上の例ではmapの処理を並列化するためにparMapを使うだけである
ただし、どこでNFまで評価されるべきかということを考えつつrdeepseqなどを用いる必要があるため、あまり気にせずにプログラムを書くと並列化したつもりができていないということになる恐れがある

Parモナドを利用した並列化ではパイプラインになっていることを型によって明確に表せていることがよいと思う

また、各処理の段階毎に並列化されるのでどれが並列に処理されるのかわかりやすい
ただ、データ構造から自分で定義することになるため並列化のために書くべきコードというのは多くなると思われる

既存のコードを並列化するという目的でかつmapを使った部分の処理量が多いというのであればparListを用いた方法で並列化すればコストがかからず良さそう

いくつかの独立な処理がありそれらを並列化するという用途であればParモナドを用いるというのがやりやすいのかもしれない

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング