読者です 読者をやめる 読者になる 読者になる

非同期アクション(Parallel and Concurrent Programming in Haskell Chapter 8)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch08.html

MVarについて書いた第7章まとめはこちら
http://jsapachehtml.hatenablog.com/entry/2015/02/15/205159

第8章は何かの作業をしている後ろでIOを実行する方法について
この章に出てくる関数たちはControl.Concurrent.Asyncで定義されている模様

題材はwebページのダウンロード
urlを引数にとって結果を返す関数をgetURLとして以下のようにバックグラウンドでダウンロード処理が書ける
この章の前半ではこれを綺麗に書けるように洗練していく

import Control.Concurrent
import Network.HTTP
import System.Environment

getURL :: String -> IO String
getURL url = simpleHTTP (getRequest url) >>= getResponseBody

main :: IO ()
main = do
  [arg] <- getArgs
  case arg of
    "1" -> geturl1
    _ -> print "no such arguments"

geturl1 :: IO ()
geturl1 = do
  m1 <- newEmptyMVar
  m2 <- newEmptyMVar
  forkIO $ do
    r1 <- getURL "http://hackage.haskell.org/package/HTTP-4000.0.5/docs/Network-HTTP.html"
    putMVar m1 r1
  forkIO $ do
    r2 <- getURL "http://hackage.haskell.org/package/HTTP-4000.0.6/docs/Network-HTTP.html"
    putMVar m2 r2
  r1' <- takeMVar m1
  r2' <- takeMVar m2
  print (length r1', length r2')

重要なのはgeturl1
2つのMVarを生成してそれぞれを別スレッドで取得したコンテンツを入れる箱として使う
メインスレッドでは両方にtakeMVarをかけて取得を待つ

getURLはHTTPパッケージの例にあるもののほぼそのまま

geturl1は明らかに冗長であるためもっと短くきれいに書きたい
ライブラリにある高階関数を使うだけでも短くできるが、
共有パターンを抽出してそれを抽象化する方がよりよい

今回やりたいことは以下
1.非同期でアクションを実行
2.その実行完了を待って結果を取得
なのでそれを表すインターフェースを定義する

data Async a = Async (MVar a)

async :: IO a -> IO (Async a)
async action = do
  mvar <- newEmptyMvar
  forkIO $ do
    r <- action
    putMVar mvar r
  return (Async mvar)

wait :: Async a -> IO a
wait (Async mvar) = readMVar mvar

Async型はMVarをラップしただけのデータ型で非同期アクションの開始を表す
中のMVarにはアクションの実行結果が入る

これだけだと特に定義する必要もなさそうだが、
エラー処理などを含めようとすればもっと複雑になっていくことも考えると
実装の詳細を隠蔽するためこのように定義することはとても重要

またreadMVarを使っていることにも注意
各Asyncに対して複数回waitが呼ばれることもあるため
ここでtakeMVarを使っているとデッドロックしてしまう
ちなみにtakeMVarは中身を取り出して空にするもの
readMVarは中身を取り出すが読んだら元に戻すもの

これを使って書いたgeturl2がこれ

geturl2 :: IO ()
geturl2 = do
  a1 <- async $ getURL "http://hackage.haskell.org/package/HTTP-4000.0.5/docs/Network-HTTP.html"
  a2 <- async $ getURL "http://hackage.haskell.org/package/HTTP-4000.0.6/docs/Network-HTTP.html"
  r1 <- wait a1
  r2 <- wait a2
  print (length r1, length r2)

非同期アクションの例外処理

geturl2をネットワークのつながらない場所で実行してみると以下のような結果になる

geturls2: connect: does not exist (No route to host)
geturls2: connect: does not exist (No route to host)
geturls2: thread blocked indefinitely in an MVar operation

ネットワークがつながらないためにconnectできずにforkしたスレッドが終了したというのが上の2つ
3つ目の出力はtakeMVarがデッドロック状態になったことを検知して出るエラー
forkしたスレッドが落ちてその中で実行されるはずだったputMvarがなくなってしまったためこのようになった

このような状況を避けるためにgeturl2にエラー処理を入れる
まずはAsync型にエラーを含ませる

data Async a = Async (MVar (Either SomeException a))

非同期アクションの結果だけを保持するのではなくエラーも取れるようにするため
MVarの中身をEitherとした。エラーが起きた場合は例外を含んだLeftが返る

これに合わせてasyncを変更

async :: IO a -> IO (Async a)
async action = do
  mvar <- newEmptyMVar
  forkIO $ do
    e <- try action
    putMVar mvar e
  return $ Async mvar

元々は単に渡されたactionを実行するだけだったものをtryに渡すようにした
tryはactionを実行して例外が発生すればLeftとして例外を、正常終了すればRightとして結果を返す関数
tryについてはこちらで
http://jsapachehtml.hatenablog.com/entry/2015/02/21/082507

次にwaitも変更

wait :: Async a -> IO a
wait async = do
  e <- waitCatch async
  case e of
    Left exception -> throwIO exception
    Right r -> return r

waitCatch :: Async a -> IO (Either SomeException a)
waitCatch (Async mvar) = readMVar mvar

結果を取り出す関数として2つ提供する
一つはもともとと同じインターフェースのwait
非同期アクション内で例外が発生した場合はそれを投げ直し、正常終了であれば結果を返す
もう一つはwaitCatch
こちらはAsyncからEitherを取り出してそのまま返す
例外が発生した場合に何らかの処理を入れたい場合にこちらを使う

ちなみにmain関数はgeturl2と全く同じである

以上のように修正して再度ネットワークのつながらない場所で実行してみると先ほどとは違ってエラーは一つ出るだけである

発生した例外をforkしたスレッド側からメインスレッド側へ渡し、メインスレッド側でthrowIOしているため例外発生時はメインスレッドでも例外が投げられて処理がストップする。
そのためputMVarがなくなったことによるデッドロックは発生せず、また2つ目のアクションも実行途中でkillされる(メインスレッドがkillされればforkしたスレッドもkillされるため)

これによって基本的なエラーハンドリングをライブラリ側に入れることができた。

非同期アクションのマージ

複数のアクションを実行した上で最初の一つだけを取り出したとしたら以下のように書ける

sites = ["http://example1.com"
        ,"http://example2.com"
        ,"http://example3.com"]

geturl5 :: IO ()
geturl5 = do
  m <- newEmptyMVar
  let 
    download url = do
      r <- getURL url
      putMVar m (url, r)
  mapM_ (forkIO . download) sites

  (url, r) <- takeMVar m
  printf "%s was first (%d bytes)\n" url (length r)
  replicateM_ (length sites - 1) $ takeMVar m

MVarを一つだけ作ってすべてのアクションの結果をそこに入れるようにするというのが考え方
そうすれば最初の一つだけが入ってあとはブロックされる

これをAsync型を使った形にしたい
原著だとまず2つの場合としてwaitEitherによる説明が入っているが、わかりづらかったので最初から複数の場合で考える
Async aのリストをとって最初に返ってきた結果を返すという関数を作る(waitAny)
ここで少し混乱したのだが、Async aを引数にとっている時点で既にAsyncは作成されている。つまり別スレッドによる非同期アクションの実行は既に始まっている。
そちらのスレッドで取得した結果がAsyncにラップされたMVarに入っているという点に注意。

waitAny :: [Async a] -> IO a
waitAny as = do
  mvar <- newEmptyMVar
  let 
    forkWait a = forkIO $ do
      r <- try $ wait a
      putMVar mvar r
  mapM_ forkWait as
  wait $ Async mvar

まずひとつMVarを生成
Asyncをとって別スレッドでその結果を取得しMVarに入れる関数をforkWaitとして定義
それをmapM_でAsyncの数だけ実行する
最後のwaitでmvarに入った情報(最初に取得されたデータ)を取り出して返す

これを使うとgeturl5は以下のように簡単にできる

geturl6 :: IO ()
geturl6 = do
  let
    download url = do
      r <- getURL url
      return (url, r)
  as <- mapM_ (async . download) sites
  (url, r) <- waitAny as
  printf "%s was first (%d bytes)" url r
  mapM_ wait as

MVarを使った処理は全てwaitやasyncの処理に隠すことができた
難点はAsync一つにつき2つのスレッドを作成しているため無駄なコストが発生しているという点
これについては10章でSTMを使ってより効率的なやり方を学ぶ

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング