英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch08.html
MVarについて書いた第7章まとめはこちら
http://jsapachehtml.hatenablog.com/entry/2015/02/15/205159
第8章は何かの作業をしている後ろでIOを実行する方法について
この章に出てくる関数たちはControl.Concurrent.Asyncで定義されている模様
題材はwebページのダウンロード
urlを引数にとって結果を返す関数をgetURLとして以下のようにバックグラウンドでダウンロード処理が書ける
この章の前半ではこれを綺麗に書けるように洗練していく
import Control.Concurrent import Network.HTTP import System.Environment getURL :: String -> IO String getURL url = simpleHTTP (getRequest url) >>= getResponseBody main :: IO () main = do [arg] <- getArgs case arg of "1" -> geturl1 _ -> print "no such arguments" geturl1 :: IO () geturl1 = do m1 <- newEmptyMVar m2 <- newEmptyMVar forkIO $ do r1 <- getURL "http://hackage.haskell.org/package/HTTP-4000.0.5/docs/Network-HTTP.html" putMVar m1 r1 forkIO $ do r2 <- getURL "http://hackage.haskell.org/package/HTTP-4000.0.6/docs/Network-HTTP.html" putMVar m2 r2 r1' <- takeMVar m1 r2' <- takeMVar m2 print (length r1', length r2')
重要なのはgeturl1
2つのMVarを生成してそれぞれを別スレッドで取得したコンテンツを入れる箱として使う
メインスレッドでは両方にtakeMVarをかけて取得を待つ
getURLはHTTPパッケージの例にあるもののほぼそのまま
geturl1は明らかに冗長であるためもっと短くきれいに書きたい
ライブラリにある高階関数を使うだけでも短くできるが、
共有パターンを抽出してそれを抽象化する方がよりよい
今回やりたいことは以下
1.非同期でアクションを実行
2.その実行完了を待って結果を取得
なのでそれを表すインターフェースを定義する
data Async a = Async (MVar a) async :: IO a -> IO (Async a) async action = do mvar <- newEmptyMvar forkIO $ do r <- action putMVar mvar r return (Async mvar) wait :: Async a -> IO a wait (Async mvar) = readMVar mvar
Async型はMVarをラップしただけのデータ型で非同期アクションの開始を表す
中のMVarにはアクションの実行結果が入る
これだけだと特に定義する必要もなさそうだが、
エラー処理などを含めようとすればもっと複雑になっていくことも考えると
実装の詳細を隠蔽するためこのように定義することはとても重要
またreadMVarを使っていることにも注意
各Asyncに対して複数回waitが呼ばれることもあるため
ここでtakeMVarを使っているとデッドロックしてしまう
ちなみにtakeMVarは中身を取り出して空にするもの
readMVarは中身を取り出すが読んだら元に戻すもの
これを使って書いたgeturl2がこれ
geturl2 :: IO () geturl2 = do a1 <- async $ getURL "http://hackage.haskell.org/package/HTTP-4000.0.5/docs/Network-HTTP.html" a2 <- async $ getURL "http://hackage.haskell.org/package/HTTP-4000.0.6/docs/Network-HTTP.html" r1 <- wait a1 r2 <- wait a2 print (length r1, length r2)
非同期アクションの例外処理
geturl2をネットワークのつながらない場所で実行してみると以下のような結果になる
geturls2: connect: does not exist (No route to host) geturls2: connect: does not exist (No route to host) geturls2: thread blocked indefinitely in an MVar operation
ネットワークがつながらないためにconnectできずにforkしたスレッドが終了したというのが上の2つ
3つ目の出力はtakeMVarがデッドロック状態になったことを検知して出るエラー
forkしたスレッドが落ちてその中で実行されるはずだったputMvarがなくなってしまったためこのようになった
このような状況を避けるためにgeturl2にエラー処理を入れる
まずはAsync型にエラーを含ませる
data Async a = Async (MVar (Either SomeException a))
非同期アクションの結果だけを保持するのではなくエラーも取れるようにするため
MVarの中身をEitherとした。エラーが起きた場合は例外を含んだLeftが返る
これに合わせてasyncを変更
async :: IO a -> IO (Async a) async action = do mvar <- newEmptyMVar forkIO $ do e <- try action putMVar mvar e return $ Async mvar
元々は単に渡されたactionを実行するだけだったものをtryに渡すようにした
tryはactionを実行して例外が発生すればLeftとして例外を、正常終了すればRightとして結果を返す関数
tryについてはこちらで
http://jsapachehtml.hatenablog.com/entry/2015/02/21/082507
次にwaitも変更
wait :: Async a -> IO a wait async = do e <- waitCatch async case e of Left exception -> throwIO exception Right r -> return r waitCatch :: Async a -> IO (Either SomeException a) waitCatch (Async mvar) = readMVar mvar
結果を取り出す関数として2つ提供する
一つはもともとと同じインターフェースのwait
非同期アクション内で例外が発生した場合はそれを投げ直し、正常終了であれば結果を返す
もう一つはwaitCatch
こちらはAsyncからEitherを取り出してそのまま返す
例外が発生した場合に何らかの処理を入れたい場合にこちらを使う
ちなみにmain関数はgeturl2と全く同じである
以上のように修正して再度ネットワークのつながらない場所で実行してみると先ほどとは違ってエラーは一つ出るだけである
発生した例外をforkしたスレッド側からメインスレッド側へ渡し、メインスレッド側でthrowIOしているため例外発生時はメインスレッドでも例外が投げられて処理がストップする。
そのためputMVarがなくなったことによるデッドロックは発生せず、また2つ目のアクションも実行途中でkillされる(メインスレッドがkillされればforkしたスレッドもkillされるため)
これによって基本的なエラーハンドリングをライブラリ側に入れることができた。
非同期アクションのマージ
複数のアクションを実行した上で最初の一つだけを取り出したとしたら以下のように書ける
sites = ["http://example1.com" ,"http://example2.com" ,"http://example3.com"] geturl5 :: IO () geturl5 = do m <- newEmptyMVar let download url = do r <- getURL url putMVar m (url, r) mapM_ (forkIO . download) sites (url, r) <- takeMVar m printf "%s was first (%d bytes)\n" url (length r) replicateM_ (length sites - 1) $ takeMVar m
MVarを一つだけ作ってすべてのアクションの結果をそこに入れるようにするというのが考え方
そうすれば最初の一つだけが入ってあとはブロックされる
これをAsync型を使った形にしたい
原著だとまず2つの場合としてwaitEitherによる説明が入っているが、わかりづらかったので最初から複数の場合で考える
Async aのリストをとって最初に返ってきた結果を返すという関数を作る(waitAny)
ここで少し混乱したのだが、Async aを引数にとっている時点で既にAsyncは作成されている。つまり別スレッドによる非同期アクションの実行は既に始まっている。
そちらのスレッドで取得した結果がAsyncにラップされたMVarに入っているという点に注意。
waitAny :: [Async a] -> IO a waitAny as = do mvar <- newEmptyMVar let forkWait a = forkIO $ do r <- try $ wait a putMVar mvar r mapM_ forkWait as wait $ Async mvar
まずひとつMVarを生成
Asyncをとって別スレッドでその結果を取得しMVarに入れる関数をforkWaitとして定義
それをmapM_でAsyncの数だけ実行する
最後のwaitでmvarに入った情報(最初に取得されたデータ)を取り出して返す
これを使うとgeturl5は以下のように簡単にできる
geturl6 :: IO () geturl6 = do let download url = do r <- getURL url return (url, r) as <- mapM_ (async . download) sites (url, r) <- waitAny as printf "%s was first (%d bytes)" url r mapM_ wait as
MVarを使った処理は全てwaitやasyncの処理に隠すことができた
難点はAsync一つにつき2つのスレッドを作成しているため無駄なコストが発生しているという点
これについては10章でSTMを使ってより効率的なやり方を学ぶ
- 作者: Simon Marlow
- 出版社/メーカー: O'Reilly Media
- 発売日: 2013/07/12
- メディア: Kindle版
- この商品を含むブログ (2件) を見る
- 作者: Simon Marlow,山下伸夫,山本和彦,田中英行
- 出版社/メーカー: オライリージャパン
- 発売日: 2014/08/21
- メディア: 大型本
- この商品を含むブログ (2件) を見る