読者です 読者をやめる 読者になる 読者になる

並行処理のキャンセル (Parallel and Concurrent Programming in Haskell Chapter 9)

第9章では引き続き非同期処理について

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch09.html


処理のキャンセルや時間待ちの処理が入ることはよくあるし対処しなければならない問題

^Cで実行をキャンセルしたとかサーバがコネクションを切る前に一定時間待つとか

よくあるやり方は2つ
1. キャンセル処理の存在をポーリングで調べて実行
2. 処理がキャンセルされたらすぐに止める

1.だとポーリングを十分にしていなかった場合にスレッドが応答が悪くなる
2.の場合、重要な処理を行っているときにはキャンセルできないようにするなどしてデータの不整合を防ぐ必要がある

現実には1.のみ若しくは両方を使って処理されることが多い
2.をデフォルトにした場合でも、重要データを扱う領域では結局1.のやり方に切り替えるしかない

ほとんどの命令形言語ではコードのほとんどに状態を変更する操作が含まれるため、2.をデフォルトにすることは考えられない
一方、haskellでは副作用のないコードが多いためキャンセルを処理しても元の処理の整合性を失うことなく元に戻したりできるという利点がある。
というより、純粋関数の定義からしてポーリングによるキャンセルはできないため2.の方法を取るしかない

よってhaskellでのキャンセル処理の設計はIOモナド内でどのようにキャンセル処理を行うかを決めることである

非同期の例外

まずここでいう非同期例外について説明
同期例外:throwIOなどその例外によってキャンセルされるプログラム上での例外
非同期例外:その例外によってキャンセルされるプログラム外からの例外(HUPシグナルによる実行キャンセルとか)

非同期例外というのは例外を受ける側がまったく知らないタイミングでそれを受けることになるため
(throwIOの実行であればそのプログラムが元々知っている)

ここでは8章で使ったAsyncを非同期例外に対応させることが目的
ウェブページを並行にダウンロードするプログラムをqを入力することでストップできるようにする
第8章で出てきたプログラムはこんな感じ

import System.TimeIt
import Text.Printf 

sites :: [String]                                                                        
sites = [ "http://hackage.haskell.org/package/HTTP"                                      
        , "http://hackage.haskell.org/package/async"                                     
        , "http://hackage.haskell.org/package/base-4.7.0.2/docs/Control-Concurrent.html" 
        ]                                                                                

timeDownload :: String -> IO ()                                      
timeDownload url = do                                                
  (time, page) <- timeItT $ getURL url                               
  printf "downloaded: %s (%d bytes, %.2fs)\n" url (length page) time 

geturl3 :: IO ()                          
geturl3 = do                              
  as <- mapM (async . timeDownload) sites 
  mapM_ wait as                           

timeDownloadはダウンロードしたコンテンツとその実行した時間を取得する関数
asyncやwaitについては第8章を読むか以前まとめたページにて
http://jsapachehtml.hatenablog.com/entry/2015/02/21/082507


別のスレッドに対して例外を送るにはthrowToが使える

throwTo :: Exception e => ThreadId -> e -> IO ()

ThreadIdはforkIOを実行した際の返り値として取得できる

これを使うためAsync型を以下のように拡張する

第8章終わりでの定義
data Async = Async (MVar (Either SomeException a))

今回の拡張後
data Async = Async ThreadId (MVar (Either SomeException a))

拡張したAsyncのAPIとしてcancelを導入

cancel :: Async a -> IO ()
cancel (Async tid mvar) = throwTo tid ThreadKilled

Async型を生成するということはその中でスレッドを生成し処理を行っている
そのスレッドに対して例外を送ってキャンセルするのがcancel

ThreadKilledはExceptionの一つでControl.Exceptionで定義されている

上記の関数やデータ型を使ってgeturl3を実行途中に止められるようにしたのが以下

async :: IO a -> IO (Async a)    
async action = do                
  mvar <- newEmptyMVar           
  tid <- forkIO $ do             
    e <- try action              
    putMVar mvar e               
  return $ Async tid mvar        

geturl7 :: IO ()                                                 
geturl7 = do                                                     
  as <- mapM (async . timeDownload) sites                        
  forkIO $ do                                                    
    hSetBuffering stdin NoBuffering                              
    forever $ do                                                 
      c <- getChar                                               
      when (c == 'q') $ mapM_ cancel as                          
                                                                 
  rs <- mapM waitCatch as                                        
  printf "%d/%d succeeded\n" (length (rights rs)) (length rs)    

asyncは生成するAsyncにthreadIdを含めるように変更した
geturl7ではqが入力された際にすべてのasyncで実行されているダウンロードをcancelするようになっている
ちなみにthreadToは実行終了しているスレッドに対しては何もしないため
qを入力した時点で取得済みのコンテンツはキャンセルされない

maskと非同期例外

非同期例外がどんなときでも受け取れてしまうとデータの不整合がおきてしまう可能性がある
例えば複数のスレッド間で共有しているデータを処理中のときなど

これを実現するための基本的な考え方は
非同期例外を受け取ったら問題のある場合には受け取れない設定をしておき、
その処理が終わった時点で例外を受け入れるというものである

以下のような関数を考えた場合、(1)と(2)の間または(2)と(3)の間に例外を受け取ってしまうと問題になる

problem :: MVar a -> (a -> IO a) -> IO ()
problem mvar f = do
  a <- takeMVar mvar                                 -- (1)
  r <- f a `catch`  \e -> do putMVar mvar a; throw e -- (2)
  putMVar mvar r                                     -- (3)

takeMVarをした後putMVarの前に例外でおちれば(正常に終了していれば)mvarに入るはずだった結果が入らず空のままとなる
その状態他のスレッドがtakeMVarでブロックしていたらデッドロックになる。
(2)の実行中に例外を受け取ればcatchがあるので問題ない

これを解決するためにmaskを使う
maskはその引数にある処理中に例外を受け付けなくする
ただし、maskから渡される引数の関数を適用した処理のところで例外の有無をチェックすることができる
つまりmaskの中では例外の処理がポーリングモードになったかのような状態になる

problem :: MVar a -> (a -> IO a) -> IO ()
problem mvar f = mask $ \restore -> do
  a <- takeMVar mvar                                 
  r <- restore (f a) `catch`  \e -> do putMVar mvar a; throw e 
  putMVar mvar r                                    

全体をmaskの引数とし、引数として渡ってくる関数(restore)で(2)の処理をラップする
これでf aの処理以外の部分で例外を受け取ってしまう危険がなくなった

しかし、これによって新しい危険もある
takeMVarやputMVarが例外を受け付けなくなったため、
そこで処理がブロックした場合にキャンセルができない

ただ、takeMVarなどのブロックしてしまう可能性のある処理は
uninterruptibleな処理に指定されていてmaskの中でも例外を受けることができる
しかも、実際にブロックしてしまったときのみに。
なのでちゃんとコードが組んであれば例外を受け取ることはない

maskを内包した関数としてmodifyMVarがある

modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
modifyMVar  :: MVar a -> (a -> IO (a, b)) -> IO b

MVarをとりその中身の値を変更して再度入れる。後者の関数はタプルの2つ目を返り値として返す。

典型的な使い方はcasの操作

casMVar :: Eq a => MVar a -> a -> a -> IO Bool
casMVar mvar old new = do
  modifyMVar mvar $ \cur -> do
    if cur == old
      then return (new, True)
      else return (old, False)

元の値と新しい値を渡してMVarに含まれる値と比較する
MVar内の値が古い値のままであれば新しい値に更新してTrueを返す
そうでなければ他のスレッドが更新したということなので更新せずにFalseを返す

非同期例外安全なbracket

第8章で定義を書いたbracketだがそのままだと非同期例外に対して安全ではない
修正するにはmaskを使って以下のように書ける

bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket :: before after during =
  mask $ \restore -> do
    a <- before
    r <- restore (during a) `onException` after a
    _ <- after a
    return r

duringを処理しているときだけ例外を受け取るようにmaskするだけ
注意点としてはbeforeでブロッキングな処理をする場合は多くても一つにすること
もし2つ以上ブロッキングな処理が含まれる場合、2つ目の処理で例外を受けてしまうと
afterの処理は実行されないことになってしまう

2つ以上ブロッキングな処理を入れたい場合は
bracket自体を多重に使って処理するべき

チャネル処理を例外安全にする

第8章で定義したreadChanなどを例外安全にする例
以前定義した際はtakeMVarやputMVarを直接使っているので
それらの処理の間に例外を受けるとやはりデッドロックしてしまう

修正にはmodifyMVarなどを用いれば良さそうだが
readChan自体だけでなく、そこから呼んでいるreadMVarにも
takeMVarなどが含まれることに注意してそちらも修正する必要がある

最も安全で直接的なのはmodifyMVarでなくmaskを直接使って定義してしまうこと

readMVar :: MVar a -> IO a
readMVar mvar = do
  mask_ $ do
    a < takeMVar mvar
    putMVar mvar a
    return a

mask_はrestore関数が必要ない場合に使うmask

タイムアウト

タイムアウトを扱うための便利なラッパーがある

timeout :: Int -> IO a -> IO (Maybe a)

一つ目の引数がタイムアウトまでの時間(us)
二つ目がその間に行うアクション

時間内であればアクションの結果または例外がJustにくるまれて返る
時間がすぎるとアクションに対してTimeout Unique型の例外が投げられ、Nothingが返ってくる

少し簡易化した実装は以下のように書ける

timeout :: Int -> IO a -> IO (Maybe a)
timeout t m 
  | t < 0 = fmap Just m
  | t == 0 = return Nothing
  | otherwise = do
    pid <- myThreadId
    u <- newUnique
    let ex = Timeout u
    handleJust
      (\e -> if e == ex then Just () else Nothing)
      (\_ -> return Nothing)
      (bracket 
        (forkIO $ do threadDelay t; throwTo pid ex)
        (\tid -> throwTo tid ThreadKilled)
        (\_ -> fmap Just m))

タイムアウトの値によって場合分けしt>0のときがメインの処理
まず自身のスレッドID(myThreadId)と一意な値(newUnique)を取得
newUniqueは実行するごとに異なる値が生成される関数で他のnewUniqueで生成した値とは必ず異なる値になる

handleJustはキャッチしたい例外を選択できるcatchのようなもの
Justを返せばhandlerが実行されNothingを返せばその例外が投げ直される
ここでは指定したユニーク値をもつTimeout例外であればhandlerを実行しNothingを返す

bracketではスレッドを生成してタイムアウト分待ってから作成したTimeout例外を投げている
処理が正常に終了すればJust mを返すのみ
処理の成功・失敗に関わらず自分自身に対してThreadKilledを投げて終了するようにしている

一つ気になる点は親と子両方のスレッドで同時にthreadToを実行したらどうなるのかという点
いろいろ説明されているが必ずどちらか片方が実行されるような状態が実現できているため問題ないということのよう。

非同期例外をmaskする際の注意点

非同期例外をキャッチしてそのハンドラーを実行中に別の例外を受け取ってしまうことはあり得る。
そうなるとハンドラーの処理が実行途中で終わってしまうということになりかねない
そのためハンドラーはmaskの中にあるべきである

ただ、末尾再帰したプログラムを書いている場合に意図せずmaskの状態から抜けてしまうということがある
以下が例

main = do
  fs <- getArgs
  let
     loop !n [] = return n
     loop !n (f:fs)
        = handle (\e -> if isDoesNotExistError e
                           then loop n fs
                           else throwIO e) $
            do
               getMaskingState >>= print
               h <- openFile f ReadMode
               s <- hGetContents h
               loop (n + length (lines s)) fs

  n <- loop 0 fs
  print n

以下のような実行結果になる

# xxx yyyは存在しないファイル名
$ ./catch-mask xxx yyy 
Unmasked
MaskedInterruptible
0

一つ目のファイル処理時は素直にhandle内のメイン処理に入るためunmaskedな状態
だが、そこで例外が発生してハンドラーの処理に入りさらににloopが呼ばれる
するとその後の処理はmaskされた状態で実行されることになる
(handle関数がハンドラーの処理をmaskしているため)

なのでhandleなどを使用する際に明示的ではないmaskに気をつけて
意図せずmaskの状態が変わってしまうことを避ける必要がある

maskとforkIO

forkIOで生成したスレッドは親スレッドのmask状態を引き継ぐという話
以下のようなコードだとmvarを生成してからputMVarをするまでの間に非同期例外を受け取ってしまう可能性がある

async :: IO a -> IO (Async a)
async action = do
  mvar <- newEmptyMVar
  tid <- forkIO $ do
    e <- try action
    putMVar mvar e
  return $ Async tid mvar

このときforkIOの中でmaskしたとしても不十分でtryの前に例外がくるかもしれない
この場合、forkIO毎maskしてしまうことができる

async :: IO a -> IO (Async a)
async action = do
  mvar <- newEmptyMVar
  tid <- mask $ \restore -> 
    forkIO $ do
      e <- try $ restore action
      putMVar mvar e
  return $ Async tid mvar

スレッドの処理が完了してから特定の処理をするというパターンはよくあるのでforkFinallyという関数が用意されている

forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action fun =
  mask $ \restore ->
    forkIO $ do
      e <- try $ restore action
      fun e

これを使うとasyncは簡単に書ける

async :: IO a -> IO (Async a)
async action = do
  mvar newEmptyMVar
  tid <- forkFinally action (putMVar mvar)
  return $ Async tid mvar

forkIO内の最初にエラーハンドリングを行うのであればforkFinallyを使うのがよい
forkIO (x `finally` y)というのはforkFinally x (\_ -> y)と書ける

まとめ

この章では非同期の例外についての処理を扱った
また、それらを扱いやすくするための便利な関数も。
bracketやmodifyMVarなど非同期例外に対する処理が内蔵されている関数を使うことで
安全に簡潔に書くことができる

状態を持つコードの大きな塊をmaskした場合は処理が複雑になる可能性があり、
どこで非同期例外を受け取るのがよいか注意してコーディングする必要がある

第10章で学ぶSTMを使えばこれらの問題を解決できる

また、haskellでは^Cなどでプログラムを止めた際でも
ただ止まるだけでなく後処理がちゃんと実行される。
これは非同期例外を利用していることで得られる利点である


Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング