並列処理の抽象化(Parallel and Concurrent Programming in Haskell Chapter 11)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch11.html

このページで紹介しているコードはほとんど上記ページからの引用である。

第11章ではこれまでの章に学んだ並行処理の要素を使いやすい形にまとめる。

1つ目の処理が終わったら2つ目の処理も終了させる

以前まとめたエントリがあったのでそちらで。
http://jsapachehtml.hatenablog.com/entry/2014/04/03/233101

シンメトリックな並列処理

前の節で一つ目の処理が終了したら2つ目の処理も終了させることができるようになった。逆の場合も成り立っている気がするが実はまだそうなっていない。asyncやwaitの実装を見直すとわかるが、例外発生を知るのはwaitの中であるため2つ目の処理が例外を投げたとしてもwait a1によって先に1つ目の処理が完了するまで待つことになる。そのため、2つ目の処理が途中で失敗しても1つ目の処理が完了するまでそれを検知できない。

ここではwaitBothという関数を定義することでどちらが先に終了した場合でも、もう片方も終了させるようにする。
こちらも以前まとめたエントリがあった。
http://jsapachehtml.hatenablog.com/entry/2014/04/10/095704

以上で二つの処理は対象に扱われるようになった。片方が例外をなげれば他方もそこで終了する。

concurrentlyに似た関数で早く終わった片方の処理の結果だけ返すという関数も作れる。waitBothではなく以前定義したwaitEitherを使うだけ。

waitEither :: Async a -> Async b -> IO (Either a b)
waitEither a b = atomically $
  fmap Left (waitSTM a) `orElse` fmap Right (waitSTM b)

race :: IO a -> IO b -> IO (Either a b)
race ioa iob =
  withAsync ioa $ \a ->
  withAsync iob $ \b -> 
    waitEither a b

raceは次の章でserverを作成するときに実際に使う。2つの相手から入力を待つようにスレッドを作り、片方から終了のinputがあればどちらも終了するという形。

concurrentlyやraceの良い点は2つのスレッドの終了タイミングを待って何かを行うようなエラーを起こしがちな操作を後ろに隠してくれることである。それぞれどんな処理をするのか定義した関数をraceなどに渡すだけで、それが終了したときの同期処理はうまくやってくれる。

raceを用いたtimeout

第9章で定義したtimeout関数をraceによって簡単にする例。
以前定義したのはこんな形

timeout :: Int -> IO a -> IO (Maybe a)
timeout t m 
  | t < 0 = fmap Just m
  | t == 0 = return Nothing
  | otherwise = do
    pid <- myThreadId
    u <- newUnique
    let ex = Timeout u
    handleJust
      (\e -> if e == ex then Just () else Nothing)
      (\_ -> return Nothing)
      (bracket 
        (forkIO $ do threadDelay t; throwTo pid ex)
        (\tid -> throwTo tid ThreadKilled)
        (\_ -> fmap Just m))

これをraceを使って以下のように書ける。

timeout :: Int -> IO a -> IO (Maybe a)
timeout n m
  | n < 0 = fmap Just m
  | n == 0 = return Nothing
  | otherwise = do
    r <- race (threadDelay n) m
    case r of 
      Left _ -> return Nothing
      Right a -> return $ Just a

timeoutした場合はraceの結果がLeftに、対象の処理が先に終わればRightに結果が返ってくる。別スレッドを作成して一定時間待ってから例外を投げるという煩雑な部分がすべてraceによって隠されていることがわかる。このような処理はエラーを起こしやすいので自分で実装しなくて済むのはとてもありがたい。

ただ、厳密に言うと実装は少し違う。前の実装だと生成したスレッドが呼び出し元のスレッドに対してTimeout例外を投げる。一方raceを使った実装では2つのスレッドが作成され、timeoutした場合はThreadKilled例外が投げられる。なので、対象のIO処理内で例外の種類に応じた処理を行っていた場合はraceを使った実装をそのまま使うことはできない。また、作成するスレっドが増えることもあり、多少raceの実装の方がパフォーマンスが落ちる。

functorのインスタンスにする

waitAnyなどはAsyncのリストを引数にとる。 リスト内の型はすべて一致する必要があるため様々な結果をもつAsyncを一つのリストにまとめられず不便である。

これの解決策としてAsyncをFunctorのインスタンスとして中の値に変換を施せるようにする。ただ、そのためにはAsyncの中で保持している型をTMVarからSTMに変える必要がある。最終的にfmapの実装は以下のように書ける。

data Async a :: Async ThreadId (STM (Either SomeException a))

instance Functor Async where
  fmap f (Async t stm) = Async t stm'
    where stm' = do
            r <- stm
            case r of
              Left e  -> return $ Left e
              Right a -> return $ Right (f a)

作成されたAsyncの型を変更することができる。
ただ、Async内で使われているデータ型がTMVarの場合にfmapが実装できないのはなぜか私には理解できなかった。IOアクションの結果の値がAsync作成時に決まってしまうためと書いてあるが、STMだと変更できてTMVarだと変更できないというのはなぜなのか?

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング