RSAアルゴリズムを並列化(Parallel and Concurrent Programming in Haskell Chapter 3 and 4 example)
以前まとめた第3章、第4章についてはこちら
http://jsapachehtml.hatenablog.com/entry/2015/01/24/131408
http://jsapachehtml.hatenablog.com/entry/2015/01/31/221609
英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch04.html
図や式についてはほぼ引用した
3章では遅延評価の性質を利用してデータ毎に並列化を行う
4章ではParモナドを利用し独自に型を定義することでパイプラインを実現する
この場合は処理の段階毎に並列化を行うことになる
それぞれメリット・デメリットは原文の第4章の初めの方に書かれている
このブログでも第4章の記事の最初〜真ん中くらいにかけてまとめた
parListを用いたストリーム処理の例(第3章)
rsaを題材としてストリーム処理を並列化するという具体例
こんな感じで使える
$ echo "Hello World!" | ./rsa encrypt - | ./rsa decrypt - Hello World!
第一引数に暗号化か復号化を示す文字列、第2引数に入力ファイル名(-なら標準入力)
このプログラムは入力を一度にすべて読みだして暗号化・復号化を行うわけではなく
先頭から順に処理を行う
それの証拠に以下のような実行結果になる
$ ./rsa encrypt /usr/share/dict/words >/dev/null +RTS -s 8,040,128,392 bytes allocated in the heap 66,756,936 bytes copied during GC 186,992 bytes maximum residency (71 sample(s)) 36,584 bytes maximum slop 2 MB total memory in use (0 MB lost due to fragmentation)
/usr/share/dict/wordsは1MBほどのファイルだが最大消費メモリは187KBくらいである
暗号化を行う処理は以下のよう
encrypt :: Integer -> Integer -> ByteString -> ByteString encrypt n e = B.unlines . map (B.pack . show . power e n . code) . chunk (size n)
nとeは暗号化に用いる数である
rsaのアルゴリズムについてはwikipediaに詳しく書いてある
sizeはnに応じて分割すべきデータの大きさを求める
chunkでその大きさにデータを分割
mapの行で実際に暗号化を行っている
codeでByteStringをIntegerに変換してpowerでそれを暗号化
そして再度ByteStringにする
最後にunlinesで一つのByteStringとする
これをparListで並列化すると以下のように書ける
encrypt n e = B.unlines . withStrategy (parList rdeepseq) . map (B.pack . show . power e n . code) . chunk (size n)
単にwithStrategyの行を追加しただけであるがこれでOK
chunkによって分けたデータのまとまりをそれぞれ並列に暗号化してくれる
rsaのプログラムは面白そうだったのでsampleを見つつ自分でも作ってみた
実行結果はこちら
まずシーケンシャルな方
./dist/build/rsa/rsa enc_seq /usr/share/dict/words > /dev/null +RTS -s 2,462,899,904 bytes allocated in the heap 137,860,016 bytes copied during GC 5,016,704 bytes maximum residency (8 sample(s)) 2,154,184 bytes maximum slop 18 MB total memory in use (1 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 4749 colls, 0 par 0.28s 0.29s 0.0001s 0.0002s Gen 1 8 colls, 0 par 0.00s 0.00s 0.0006s 0.0008s TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 2.51s ( 2.56s elapsed) GC time 0.28s ( 0.29s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 2.80s ( 2.86s elapsed) Alloc rate 980,114,723 bytes per MUT second Productivity 90.0% of total user, 88.1% of total elapsed gc_alloc_block_sync: 0 whitehole_spin: 0 gen[0].sync: 0 gen[1].sync: 0
次にパラレル
./dist/build/rsa/rsa enc /usr/share/dict/words > /dev/null +RTS -s -N2 2,471,938,448 bytes allocated in the heap 154,972,656 bytes copied during GC 5,228,648 bytes maximum residency (10 sample(s)) 2,190,400 bytes maximum slop 20 MB total memory in use (2 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 2533 colls, 2533 par 1.72s 0.30s 0.0001s 0.0020s Gen 1 10 colls, 9 par 0.02s 0.01s 0.0012s 0.0017s Parallel GC work balance: 52.00% (serial 0%, perfect 100%) TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2) SPARKS: 2494 (2494 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 1.61s ( 1.44s elapsed) GC time 1.75s ( 0.31s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 3.36s ( 1.75s elapsed) Alloc rate 1,537,054,346 bytes per MUT second Productivity 48.0% of total user, 91.8% of total elapsed gc_alloc_block_sync: 3357 whitehole_spin: 0 gen[0].sync: 15 gen[1].sync: 6
2.86s -> 1.75sに短縮できた
ただ、消費メモリについては
シーケンシャルなプログラムでもパラレルなプログラムでも5MBほど食っていた
著者のサンプルとは違って1000文字ずつ固定でデータを区切っているためだと思われる
ちなみに私の環境ではwordsファイルは2.3MB程度だった
原著の流れだとシーケンシャルなプログラムよりパラレルなプログラムの方が
メモリ消費が大きくなるという問題が生じる
それを解決するためにparBufferという関数をparListの代わりに用いる
parBuffer nで作成するスパークの数をnに制限できるので
それによってメモリ消費も一定以下に抑えられる
パイプラインでの並列化(第4章)
パイプラインのような処理にして各工程を並列化する例
題材は第3章でも出てきたRSA暗号のアルゴリズム
第3章ではchunkとして小分けにしたデータを一つの単位として
データ毎に並列化した
今回はそうではなく、処理の段階をいくつかのステップに分けて
その各ステップが独立して処理されるようにすることで
ステップ毎に並列化する
haskellのリストを使った処理は遅延評価の性質を使って
パイプラインのような形に元々なっているが
パイプラインを独自に定義することで処理の粒度の制御がしやくすくなったり
ストリーム内で状態を扱ったりできる
パイプラインの各ステップとして以下のような概念を用いる
ストリームを作る役がProducer
ストリーム内のデータを加工し、ストリームとして次に渡すのがMapper
ストリームのデータを最終的に使うのがConsumer
ストリームを表すデータ型を定義
data IList a = Nil | Cons a (IVar (IList a)) type Stream a = IVar (IList a)
構造としてはリストと同じだが
連結されたデータの後ろがIVarに包まれた自分自身となっている
以下でストリームを扱うための関数を定義
まずはProducer
streamFromList :: NFData a => [a] -> Par (Stream a) streamFromList xs = do var <- new fork $ loop xs var return var where loop [] var = put var Nil loop (x:xs) var = do tail <- new put var (Cons x tail) loop xs tail
型変数aがNFDataに制限されているのはputを使っているため
最初意味がわかるまで時間がかかったが、やはりIVarを箱と考えるとわかりやすい
箱=将来的にstream型のデータが入ることになるもの
である
メインスレッドはnewで箱を作って返すだけ
箱はloopに渡しておいて実際にstream型のデータを作る処理はそちらで行う
loop内ではまた箱(tail)を作る
先頭の要素(x)と作った箱をConsでつなげてstream型を作り、putで上から渡されてきた箱に入れる
そして残りの要素(xs)と新しく作った箱をloopに渡して再帰的に処理
つまり箱の中にさらに箱が入り、さらに...という再帰的な構造になっている
loopがまわるにつれて後ろの要素がstreamに変換されてつながっていく
次にConsumer
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a streamFold fn !acc instrm = do ilst <- get instrm case ilst of Nil -> return acc Cons h t -> streamFold fn (fn acc h) t
名前の通りだがfoldlのStream版という形
渡されたStream(つまりIVar)から中身を取り出してNilかConsかで場合分け
NilならStreamの終わりなのでそこまでにfoldしたデータをParとして返す
Consなら先頭の要素にfnを適用し、残りをstreamFoldで再帰的に処理
最後にMapper
これはProducerとConsumerを合わせた感じ
streamMap :: NFData b => (a -> b) -> Stream a -> Par (Stream b) streamMap fn instrm = do outstrm <- new fork $ loop instrm outstrm return outstrm where loop instrm outstrm = do ilst <- get instrm case ilst of Nil -> put outstrm Nil Cons h t -> do newtl <- new put outstrm $ Cons (fn h) newtl loop t newtl
まず出力となるStreamを作ってloopに渡し、そのStreamを返す
メインスレッドはこれだけ
forkしたスレッド側で実際の変換処理を行う
まず入力から中身を取り出して場合わけ
NilならStreamの終わりなのでNilを入れて返す
Consなら先頭の要素にfnを適用し、後ろに新しいStreamをくっつける
新しいStreamはさらにloopに渡して再帰的に処理
以上の関数を使って第3章で作成したrsaのプログラムを並列化すると以下のようにできる
encrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString) encrypt n e s = streamMap (B.pack . show . power e n . code) s decrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString) decrypt n d s = streamMap (B.pack . decode . power d n . integer) s pipeline :: Integer -> Integer -> Integer -> ByteString -> ByteString pipeline n e d b = runPar $ do s0 <- streamFromList (chunk (size n) b) s1 <- encrypt n e s0 s2 <- decrypt n d s1 xs <- streamFold (\x y -> (y : x)) [] s2 return (B.unlines (reverse xs))
このpipelineは暗号化してすぐに元に戻しているので全く意味はないが
並列化のスピードアップを見るためなので気にしない
実際に手元で動かしてみた結果は以下のようになった
まずシーケンシャルな方
$ ./dist/build/rsa_pipe/rsa_pipe encdec_seq /usr/share/dict/words >/dev/null +RTS -s -N2 -l 4,905,882,584 bytes allocated in the heap 280,130,872 bytes copied during GC 5,096,136 bytes maximum residency (15 sample(s)) 2,164,560 bytes maximum slop 19 MB total memory in use (1 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 9463 colls, 9463 par 0.78s 0.83s 0.0001s 0.0079s Gen 1 15 colls, 14 par 0.01s 0.01s 0.0006s 0.0014s Parallel GC work balance: 0.81% (serial 0%, perfect 100%) TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.05s elapsed) MUT time 6.22s ( 6.04s elapsed) GC time 0.80s ( 0.84s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 7.02s ( 6.93s elapsed) Alloc rate 788,386,398 bytes per MUT second Productivity 88.6% of total user, 89.8% of total elapsed gc_alloc_block_sync: 3272 whitehole_spin: 0 gen[0].sync: 0 gen[1].sync: 19
次にパラレルな方
$ ./dist/build/rsa_pipe/rsa_pipe pipeline /usr/share/dict/words >/dev/null +RTS -s -N2 -l 4,903,237,784 bytes allocated in the heap 296,976,632 bytes copied during GC 5,399,688 bytes maximum residency (19 sample(s)) 2,303,984 bytes maximum slop 21 MB total memory in use (3 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 5092 colls, 5092 par 3.18s 0.56s 0.0001s 0.0037s Gen 1 19 colls, 18 par 0.04s 0.02s 0.0011s 0.0021s Parallel GC work balance: 55.59% (serial 0%, perfect 100%) TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2) SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) INIT time 0.00s ( 0.00s elapsed) MUT time 3.73s ( 3.13s elapsed) GC time 3.22s ( 0.58s elapsed) EXIT time 0.00s ( 0.00s elapsed) Total time 6.95s ( 3.71s elapsed) Alloc rate 1,315,747,356 bytes per MUT second Productivity 53.6% of total user, 100.5% of total elapsed gc_alloc_block_sync: 4837 whitehole_spin: 0 gen[0].sync: 158 gen[1].sync: 12
6.93s -> 3.71sに短縮した
両者の方法のまとめ
parListを使った並列化では
やはり元のアルゴリズムを表すコードと並列化のためのコードが明確に分けられていることでコードが書きやすいと感じた
元のコードの形にもよるが、一度シーケンシャルなコードを書いた上で並列化のための関数を足すだけでよいのは簡単でよい
上の例ではmapの処理を並列化するためにparMapを使うだけである
ただし、どこでNFまで評価されるべきかということを考えつつrdeepseqなどを用いる必要があるため、あまり気にせずにプログラムを書くと並列化したつもりができていないということになる恐れがある
Parモナドを利用した並列化ではパイプラインになっていることを型によって明確に表せていることがよいと思う
また、各処理の段階毎に並列化されるのでどれが並列に処理されるのかわかりやすい
ただ、データ構造から自分で定義することになるため並列化のために書くべきコードというのは多くなると思われる
既存のコードを並列化するという目的でかつmapを使った部分の処理量が多いというのであればparListを用いた方法で並列化すればコストがかからず良さそう
いくつかの独立な処理がありそれらを並列化するという用途であればParモナドを用いるというのがやりやすいのかもしれない

- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る

- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る