haskellで非同期IO処理(Parallel and Concurrent Programming in Haskellの11章はじめ部分)

Parallel and Concurrent Programming in Haskellの読書会でasyncを使った並行処理について学んだので復習がてらメモ
本の内容はこちら(今回の内容は第11章の前半について)
http://chimera.labs.oreilly.com/books/1230000000929/index.html

なお、ブログ中にでてくるコードは基本的に本から引用したもの

10章まででasyncを使ってのプログラムは既に登場している
11章ではそれをさらに使いやすいように洗練されていくという流れ

asyncを使った処理の基本的な形

main = do
  a1 <- async (getURL "http://www.wikipedia.org/wiki/Shovel")
  a2 <- async (getURL "http://www.wikipedia.org/wiki/Spade")
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

a1, a2をそれぞれ別スレッドで実行し、それらの結果をwaitによって得て表示

ただ、exceptionが発生した場合は問題が起こる
例えばa1の処理中にexceptionが発生した場合、mainの処理にも伝達されて処理がストップする。
しかし、a2の処理は動き続けてしまいメモリリークにつながる。

どちらかで例外が発生した場合は他方のスレッドも終了させたい。そこでbracketとasyncを組み合わせて以下のように書く

bracket (async io) cancel operation

ioに並列で処理したいアクションを記述し、operationにio中にしておきたい処理を書く

bracketはtry-finallyのようなもので定義は↓(hoogleより)

:: IO a	    computation to run first ("acquire resource")
-> (a -> IO b)	computation to run last ("release resource")
-> (a -> IO c)	computation to run in-between
-> IO c	 


最初のasyncの処理をbracketで書きなおすとこうなる

main = do
  bracket (async (getURL "http://www.wikipedia.org/wiki/Shovel"))
          cancel $ \a1 -> do
  bracket (async (getURL "http://www.wikipedia.org/wiki/Shovel"))
          cancel $ \a2 -> do
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

並列処理であることを示したいからかインデントが並べてあるので少し見づらい。operationの部分をラムダ抽象にして非同期で実行しているアクションを受け取り、その中でさらにbracketを入れ子にして2つ目の非同期処理を実行している。operation内で例外が発生した場合、cancelの処理が必ず走るためリソースが解放されないままにはならない。

ただ、同じパターンの部分をさらにまとめられるのでwithAsyncという関数を定義。

withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync io operation = bracket (async io) cancel operation

最終的にmainは以下のようになる。

main =
  withAsync (getURL "http://www.wikipedia.org/wiki/Shovel") $ \a1 ->
  withAsync (getURL "http://www.wikipedia.org/wiki/Spade")  $ \a2 -> do
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング