STM (Parallel and Concurrent Programming in Haskell Chapter 10)

英語の原文はこちらのページで読める
http://chimera.labs.oreilly.com/books/1230000000929/ch10.html

前の章についてまとめたものはこちら
http://jsapachehtml.hatenablog.com/entry/2015/02/28/085123

STMは各処理をひとまとまりのアトミックな処理として実行することで、状態を持った並行処理を簡潔に書くための方法である。
STMによってこれまでの章で説明されたMVarによる処理の問題をいくつか解決することができる。
この章では例を出しつつそれらについて説明を行っている。

MVarとTVarの比較(windowによる例)

画面上に表示されるWindowとそれらが含まれるDesktopを形式的に定義し、Desktop間でWindowを移動させるという例を使ってTVarを利用することの利点を説明している。
本だと具体的な実装は載っていなかったので簡単に書いて試してみた。

newtype Window = Window String deriving (Eq, Show, Ord)
newtype Desktop = Desktop Int deriving (Eq, Show, Ord)

複数のDesktopを含む画面全体をDisplayとしてMVarで定義

type DisplayMVar = M.Map Desktop (MVar (S.Set Window))

各DesktopをkeyとしたMapとして表現し、各DesktopはWindowのSetを持っている。

これらの型でmoveWindowというWindowを指定したDesktopへ移動する関数を定義する

import qualified Data.Set as S
import qualified Data.Map as M

moveWindow :: DisplayMVar -> Window -> Desktop -> Desktop -> IO ()
moveWindow disp w da db = do
  wa <- takeMVar ma
  print "lock Desktop passed by third args"
  wb <- takeMVar mb
  putMVar ma $ S.delete w wa
  putMVar mb $ S.insert w wb
  where
    ma = disp M.! da
    mb = disp M.! db

また、main関数を以下のように定義

main :: IO ()
main = do
   mvar1 <- newMVar $ S.fromList [Window "A"]
   mvar2 <- newMVar $ S.fromList [Window "B"]
   let disp = M.fromList [(Desktop 1, mvar1), (Desktop 2, mvar2)]
 
   desktop1 <- readMVar mvar1
   desktop2 <- readMVar mvar2
   print $ "Desktop 1 before: " ++ show desktop1
   print $ "Desktop 2 before: " ++ show desktop2
   
   checkThread1 <- newEmptyMVar
   _ <- forkIO $ do
     moveWindow disp (Window "A") (Desktop 1) (Desktop 2)
     putMVar checkThread1 "finished"

   -- checkThread2 <- newEmptyMVar
   -- _ <- forkIO $ do
   --  moveWindow disp (Window "B") (Desktop 2) (Desktop 1)
   --  putMVar checkThread2 "finished"
   
   takeMVar checkThread1
   
   desktop1' <- takeMVar mvar1
   desktop2' <- takeMVar mvar2
   print $ "Desktop 1 after: " ++ show desktop1'
   print $ "Desktop 2 after: " ++ show desktop2'
   return ()

これを実行すると以下のような結果になる

"Desktop 1 before: fromList [Window \"A\"]"
"Desktop 2 before: fromList [Window \"B\"]"
"lock Desktop passed by third args"
"lock Desktop passed by third args"
"Desktop 1 after: fromList []"
"Desktop 2 after: fromList [Window \"A\",Window \"B\"]"

ここでもう一つforkIOしてWindow BをDesktop 1からDesktop 2へ移動する処理を追加する
(上のコメントアウトしてあるところ)
すると実行時にデッドロックすることが確認できる

"Desktop 1 before: fromList [Window \"A\"]"
"Desktop 2 before: fromList [Window \"B\"]"
"lock Desktop passed by third args"
"lock Desktop passed by third args"
stm: thread blocked indefinitely in an MVar operation

moveWindow内での最初のtakeMVarの実行タイミングの問題。一つ目のスレッドでtakeMVarした後、printを処理している間に2つ目のスレッドのtakeMVarが実行される。よって1つ目のスレッドにて次のtakeMVarがブロックされる。2つ目のスレッドでも同様にブロックされてデッドロックとなる。ただし、この例だとprint文を除いてコンパイルすればデッドロックせずに実行できる。これは1つ目のスレッドの実行が最後まで行われてからもう一つのスレッドに切り替わるためと考えられる。

MVarを使った実装のデメリットはこのようにデッドロックしてしまうことだけでなく、moveWindow内での実行順も気にする必要があることである。

例えば、あるwindowをdisplay aからbへ移動する場合を考える。
display aに対しtakeMVarしてからwindowを削除してputMVarしたとする。この時点ではまだdisplay bへwindowを追加していないため、現状そのwindowはどこにも存在しないことになってしまう。よって、このような中間状態を他のスレッドに参照されてしまうのは良くないため、先に両方のdisplayのロックを取ってから削除・追加を行うことで中間状態を見せないようにするという工夫が必要になっている。

このような制約があるととてもプログラムが書きづらくなり、かけたとしてもエラーの多いものになってしまう。

これらのデメリットをTVarを使うことで解消できる。
Display型の定義をTVarを使うように修正し、moveWindowを少し書き換えてSTMとして実行するようにした関数を定義

type DisplayTVar = M.Map Desktop (TVar (S.Set Window))

moveWindowSTM :: DisplayTVar -> Window -> Desktop -> Desktop -> STM ()
moveWindowSTM disp w da db = do
  wa <- readTVar ma
  wb <- readTVar mb
  writeTVar ma $ S.delete w wa
  writeTVar mb $ S.insert w wb
  where
    ma = disp M.! da
    mb = disp M.! db

MVarの処理になっていたところを単にTVarの処理に変えて、IOをSTMとしただけである。
mainの処理は上記の関数に置き換えた上でatomicallyの引数とするだけ。

_ <- forkIO $ do
  atomically $ moveWindowSTM disp (Window "A") (Desktop 1) (Desktop 2)

コードの全体はこちらで。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Stm/Main.hs#L114


一つのSTM内での処理はアトミックに実行されるため中間状態で他のスレッドに参照されない。そのためSTM内での処理の順序は気にしなくてよくなる。
また、moveWindowを2回行う場合でも一つずつの処理はアトミックに実行されることが保証されるためデッドロックは起こさない。
これらの特徴からSTMは各ひとまとまりの処理を部品として使ってより大きな処理を作ることが容易にできることがわかる。

処理をブロック

STMの処理ではある状態になるまで処理を待ちたい場合にretryという関数を使う

retry :: STM a

retryを実行すると現在のトランザクションの処理がすべてロールバックされて最初の状態に戻る。そして再度同じ処理を実行する。
現在のトランザクションとは一つのatomicallyに含まれる処理である。異なるatomicallyに属するSTMの処理はそれぞれ別個のトランザクションとなる。

これを用いた例としてTVarによってMVarの機能を実現するためTMVarという型を作成する

newtype TMVar a = TMVar (TVar (Maybe a))

newEmptyTMVar :: STM (TMVar a)
newEmptyTMVar = do
  t <- newTVar Nothing
  return $ TMVar t

takeTMVar :: TMVar a -> STM a
takeTMVar (TMVar tvar) = do
  m <- readTVar tvar
  case m of
    Nothing -> retry
    Just a -> writeTVar tvar Nothing >> return a

putTMVar :: TMVar a -> a -> STM ()
putTMVar (TMVar tvar) a = do
  m <- readTVar tvar
  case m of
    Just _ -> retry
    Nothing -> void $ writeTVar tvar (Just a)

MVarの状態を再現するためにMaybeをTVarにくるんで使う。
NothingであればMVarでいうemtpyの状態を表すことにする。

takeTMVarもputTMVarもシンプルな処理で、TVarの中身を取り出しNothingかJustかで処理を分けるだけ。
その際、処理をすべき状態でないときはretryを実行することでそのトランザクション全体をやり直し、適切な状態であれば値の取得や変更を行う。

これを使うと両方の値が入っているときのみそれらを取り出すという処理が書ける。

atomically $ do
  a <- takeTMVar ta     (1)
  b <- takeTMVar tb     (2)
  return (a,b)

(2)の処理中にretryが実行された場合、(1)の処理もロールバックされて最初から再実行されることになる。
また、この例はMVarで実現するのが難しい。もしMVarでこれと同じことをやろうとするとした場合、(2)の実行時にretry(と同等の処理)を行おうとしても他のスレッドからMVarに対して既に別の処理がなされている可能性があるためこのままでは実施できない。やるのであればもう一つMVarを導入してロックの役割をさせる必要がある。しかし、もちろんコードは複雑化することになるし全てのスレッドがその変数を見るようにしなければならないというパフォーマンスの問題も出てくる。

何かが変わるまでブロック

ここではretryの使い方をもう少し複雑な例(最初に出てきたDisplayの題材)を例に説明。
ここでも本には動くコード全体の例はなかったためこちらに作ってみた。
https://github.com/y-kamiya/parallel-concurrent-haskell/blob/master/src/Stm/Main.hs#L141

状態変化時の処理起動などエラーをおこしがちな部分の処理をSTMの機構にまかせることができているおかげでかなり簡潔にかけていると言える。

orElse

2つのSTMの処理のどちらかを実行したいという場合にorElseという関数が使える。

orElse :: STM a -> STM a -> STM a

一つ目のSTMがretryなく実行完了した場合はその結果を返し、retryに入った場合は二つ目のSTMが実行される。
例えば2つのTMVarのどちらかを実行してその値を返す関数は以下のように書ける。

takeEitherTMVar :: TMVar a -> TMVar b -> STM (Either a b)
takeEitherTMVar ma mb = fmap Left (takeTMVar ma) `orElse` fmap Right (takeTMVar mb)

一つ目のSTMが実行完了した場合はLeftの値として返し、二つ目のSTMの場合はRightで返す。
orElseを使う際の注意点は、左側にある処理の方が優先されるという点である。もし両方のSTMが正常に実行できる状態であれば必ず一つ目のSTMが実行されることになる。

第8章のAsyncをSTMで

第8章の後半で出てきた例が再登場。IOで行っていた各処理をSTMにするだけであるためコードすべては記載しない。
複数のURLからコンテンツをダウンロードしてきてもっとも早く落ちてきたものを表示する。ここではMVarを使っていたところをTMVarを使うようにして書き換える。

STMを使うことのメリットは多くのスレッドを動かす必要がないためコストが下げられること。

waitAnyのところがfoldを使っていて少しわかりづらいのでまとめておくと

waitAny = atomically $ foldr orElse retry $ map waitSTM as

asは作成されたAsync aのリストでこれをfoldrによって右からつなげている。よって以下のようになる。

Async `orElse` Async `orElse` Async ... `orElse` retry

orElseは左から順に実行されていくので左から順に見ていって処理が完了しているasyncが見つかったらその値を返すことになる。

ただ、厳密には早くダウンロードが終わったものがとれてくるとは限らない気がする。例えば最初のAsyncが未完了でretryとなった場合、次のAsyncの実行完了が確認されるがその間に最初のAsyncの結果が返ってきても一つ目の結果は返らないはず。なので同じくらいのダウンロード時間であれば前後することもあり得ると思われる。

STMによるチャネル

これも第8章の例をMVarからSTMに書き換える例。設計自体は第8章と同じにしたうえでTVarを使ってどうなるかを説明。
MVarでの実装と異なる点は以下の通り

  • IOではなくSTMモナドの処理とし、実行時はatomicallyで包む
  • 中身が空の状態をTNilとして表現(MVarのときは単純に空)
  • ひとまとまり毎にatomicな処理となっているため、readとwriteが同時に実行される状態でも気にせず書ける

コードの全体は以下

import Control.Monad.STM
import Control.Concurrent.STM.TVar

data TChan a = TChan (TVar (TVarList a))
                     (TVar (TVarList a))

type TVarList a = TVar (TList a)

data TList a = TNil | TCons a (TVarList a)

newTChan :: STM (TChan a)
newTChan = do
  hole <- newTVar TNil
  read <- newTVar hole
  write <- newTVar hole
  return $ TChan read write

readTChan :: TChan a -> STM a
readTChan (TChan read _) = do
  listHead <- readTVar read
  tList <- readTVar listHead
  case tList of
    TNil -> retry
    TCons a tail -> do
      writeTVar read tail
      return a 

writeTChan :: TChan a -> a -> STM ()
writeTChan (TChan _ write) a = do
  newEnd <- newTVar TNil
  listEnd <- readTVar write
  writeTVar write newEnd
  writeTVar listEnd $ TCons a newEnd

MVarの実装ではうまくいかなかったunGetTChanもこちらであればうまく実装できる。
MVarでうまくいかなかったのはreadMVarを別のスレッドで実行していた場合。read用のチャネルをブロックしてしまうためunGetTChna実行時のtakeMVarもブロックされてしまう。

こちらの実装ではSTMを使うことで各処理がアトミックに実行されるためデッドロックは発生しない。

unGetTChan :: TChan a -> a -> STM ()
unGetTChan (TChan read _) a = do
  newHead <- newTVar TNil
  listHead <- readTVar read
  writeTVar read newHead
  writeTVar newHead $ TCons a listHead
STM中の例外

IOと同じ形でSTMバージョンの関数がある。throwSTMやcatchSTMなど。IOと異なる点は例外が発生した時点でそれまでのトランザクションがすべてロールバックされるということ。ロールバックされた状態で設定されたハンドラーなどが実行される。
また、ハンドラーが存在しない場合にはロールバック後にSTMの外に向かって例外が伝播していく。

これらの動作は非同期例外に対しても同じである。そのためSTMでの処理中には例外を受け取ってよいタイミングなどを考える必要がないためとても便利。第9章で出てきたmaskなども使う必要ない。

チャネルを別の実装で効率よく

これまでTVarやMVarを入れ子にしてつながりを表現していたが、ここではTVarの中に直接リストを含めてしまう。一つのリストでreadもwriteも扱うのがTList。

import Control.Concurrent.STM                   
                                                
newtype TList a = TList (TVar [a])              
                                                
newTList :: STM (TList a)                       
newTList = do                                   
  v <- newTVar []                               
  return $ TList v
                                                
writeTList :: TList a -> a -> STM ()            
writeTList (TList v) a = do                     
  list <- readTVar v                            
  writeTVar v $ list ++ [a]                     
                                                
readTList :: TList a -> STM a                   
readTList (TList v) = do                        
  list <- readTVar v                            
  case list of                                  
    [] -> retry                                 
    (x:xs) -> writeTVar v xs >> return x        

ちなみにMVarでこれと同じものを実装してみると、writeでもreadでもtakeMVarを呼ぶことになるためデッドロックしてしまいうまくいかない。

TListは書き込み時にリストの連結を用いているため処理量はO(n)となる。
※ [1,2,3]というのが1:2:3:[]のsyntax sugarであるため、後ろに連結する場合はすべての要素についてリストコンストラクタ(:)が処理されることによる。

これを改善するためにwrite, readでリストを分け、write側ではreverseした状態で保持する。これにより書き込み時には先頭に一つ要素をくっつけるだけで済みO(1)で処理できる。
これを実装したのがTQueue

data TQueue a = TQueue (TVar [a]) (TVar [a]) 
                                             
newTQueue :: STM (TQueue a)                  
newTQueue = do                               
  read <- newTVar []                         
  write <- newTVar []                        
  return $ TQueue read write                 
                                             
writeTQueue :: TQueue a -> a -> STM ()       
writeTQueue (TQueue _ write) a = do          
  listend <- readTVar write                  
  writeTVar write $ a : listend              
                                             
readTQueue :: TQueue a -> STM a              
readTQueue (TQueue read write) = do          
  xs <- readTVar read                        
  case xs of                                 
    (x:xs') -> do                            
      writeTVar read xs'                     
      return x                               
    [] -> do                                 
      ys <- readTVar write                   
      case ys of                             
        [] -> retry                          
        _ -> do                              
          let (z:zs) = reverse ys  (1)
          writeTVar write []                 
          writeTVar read zs                  
          return z                           

データ型としてはreadとwriteとしてそれぞれTVarを持っている。一つ目がread用で二つ目がwrite用。
readTQueueでやっているのは以下のような処理

  • read用バッファの中身が入っていればその先頭を読み出す
  • read用バッファの中身が空であれば、write用バッファを確認

- write用バッファも空であればretry(データが全くない状態なので)
- write用バッファの中身があればreverseした上でread用バッファに移動し、その先頭を読み出す

write用バッファを確認する際にcase文を使ったらもっと簡単に書けるのではと思うかもしれないがそれは良くない。
もしcase文でパターンマッチをした場合、reverseの処理がその場で行われ、それが終わるまでトランザクションが終わらなくなってしまう。ここではそれを防ぐために(1)の部分でlet (z:zs)という形で束縛している。
haskellは遅延評価なので値が必要になるまで処理されない。今回の場合letで束縛しておくことで、次回のread用バッファからの読み出しの際のcaseによるパターンマッチでreverseが実行されると思われる。そちらのタイミングでO(n)の処理がされるのであれば問題ないということなのだろうか?

サイズのあるチャネル

TQueueの実装に加えチャネルの最大サイズを設定する。TQueue型にサイズを表すIntを持たせてデータの出し入れ時にそれを加算減算する。
実装の変更はそこだけなのでコードは省略。

STMでできないことは?

STMにより多くのことがMVarを使うよりも簡単にできることを見てきたが、STMの方がやりづらいことやMVarの方が便利なことはあるのかという話。
MVarの方がよい点は以下の2つ

  • 単体の処理が早い
  • 公平性

単体の処理が早いというのはそのままでTVarでトランザクションの処理をするよりもMVarで値の出し入れをする処理の方が早く済む。ただし、プログラム全体でみた時にMVarの方が早くなるかと言われるとそうでない場合もある。これはここまで説明してきたSTMの利点があるためである。
2つ目の公平性というのはブロックされた処理が再開される順である。これがMVarの大きな利点。

MVarの場合、ブロックされた処理はFIFOで処理されていくため先にブロックされたものから処理が再開されていくため公平。
TVarの場合、ブロックされた処理が複数あるならその全てが起こされ処理が再開される。そのうち一つの処理が完了して残りは再度ブロックされる。よって先にブロックされた処理でもなかなか実行されない可能性がある。

TVarにおいて全ての処理が起こされる理由はブロックされた状態が一意に定まらないため。ifやcaseなどで判定した結果retryを呼んでブロックすることになるため、その条件次第でブロックの理由が無数にある。一方MVarの場合は簡単で、その対象のMVarに値が入っているか入っていないかしかない。

パフォーマンス

ここではSTMの実装について説明。
一言でいうとSTMはログである。
そのトランザクション内で行われたreadTVarとwriteTVarの記録を順にためていき、トランザクション処理の中で参照される。

メモリに反映せずにログに書いておくだけにすることでretryの際はログを破棄すればよいだけとなる。
トランザクションが最後まで到達すると、ログとreadTVarで取得した値を比較し、一致していればメモリに反映される。そうでなければログは破棄されトランザクションが最初からやり直される。

また、readTVarはログを走査して現在最新のTVarの値を取り出してくるためO(n)の処理量になる。つまりトランザクションが大きくなればなるほど処理に時間がかかる。

これらより2つの守るべきポイントがある
まず一つのトランザクション内で多くのTVarをreadすべきではない。要素が多いTVarのリストから値を取り出すなど。
そして、トランザクションはできるだけ小さくするべき。大きいトランザクションはそれだけretryされる可能性が高い。小さなトランザクションが値を小刻みに変更し続けた結果、大きなトランザクションの処理がずっとretryされ続けるということになりかねない。

retryはそのトランザクション内で読まれたTVarの数をnとするとO(n)の処理量になる。なぜならretryするためには関係するTVarが変化したことを検知する必要があるため。retryの際、各TVarのwatch listに対してretryしたスレッドを加えている。そしてTVarの値が変更されたときはそのwatch listにあるスレッドがすべて起こされてトランザクションが再実行される。

まとめ

STMが提供している利点は以下

  • 部品として使えるアトミックな処理のまとまり
  • ブロックしている処理の間に他の処理を入れられる
    • MVarなどを使った場合、デッドロックを避けるために処理の順などを気にして全体の整合性を取る必要がある
  • 例外安全

Haskellによる並列・並行プログラミング

Haskellによる並列・並行プログラミング