読者です 読者をやめる 読者になる 読者になる

scala actorでチャットサーバ

scalaの勉強のためチャットサーバを書いてみた。actorも使ってみる。

仕様

仕様は以前haskellで書いたものを参考にした。
jsapachehtml.hatenablog.com

  • 存在するチャネルは一つだけ(簡単のため)
  • クライアントが接続してきたときに名前を尋ねる。入力された名前が既に使われているなら他の名前にしてもらう
  • クライアントからの入力は以下のようなコマンドとして処理する
    • :t name message: 指定のユーザへメッセージを送る
    • :k name : 指定のユーザを追い出す
    • :q : 接続をやめる
    • message : すべてのユーザにメッセージを投げる
  • クライアントが接続したとき・接続を切ったときは他のすべてのクライアントに通知
  • 正しくエラーを処理し不整合を起こさない
    • ex. 二人の同名のクライアントが同時につなげてきたときはどちらか一方がつながる

アーキテクチャ

  • 接続してきたクライアント毎に一つのactorを生成
  • 各actorはchatにいるメンバーをすべて知っている
  • actorは自分のクライアントからの入力を受け取り他のactorにChatMessageとして投げる
  • ChatMessageを受け取ったactorは自身の状態を変更し、自分のクライアントへ表示すべきデータを投げる
  • 接続を待ち受けてacceptするactorは現在のchatのメンバー数のみ覚えている(名前のconflictを解決するため)

各actorはactorSelectionの機能を使ってメッセージを投げ合う

実装

クライアントからの接続を待ち受ける役割をServerクラスとして作成。基本的に以下のページ(akkaのドキュメント)の内容そのまま。
Using TCP — Akka Documentation
唯一memberの数を状態として持っているが、これは各クライアントの名前の衝突を防ぐために使う。指定したactor pathに存在するアクター数を数えることができれば必要ないのだが、そのような機能はactorSelectionには見当たらなかった。

object Main {
  def main(args: Array[String]) {
    val system = ActorSystem("ChatServer")
    val serverActor = system.actorOf(Props[Server], name = "server")
  }
}

class Server extends Actor {
  import Tcp._
  import context.system

  var memberCount = 0

  IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 8011))

  def receive = {
    case b @ Bound(localAddress) =>
      println(s"start listening port ${localAddress.getPort()}")
    case CommandFailed(command: Bind) =>
      println(command.failureMessage)
    case c @ Connected(remote, local) =>
      val handler = context.actorOf(Props(classOf[ChatHandler], sender(), memberCount))
      sender() ! Register(handler)
      memberCount += 1
    case RemoveClient(name) => memberCount -= 1
  }
}


各actor間でやり取りするメッセージの定義

trait ChatMessage                                                                             
case class NewClient(name: String) extends ChatMessage                                        
case class AckNewClient(name: String) extends ChatMessage                                     
case class RemoveClient(name: String) extends ChatMessage                                     
case class Broadcast(name: String, msg: String) extends ChatMessage                           
case class Tell(name: String, msg: String) extends ChatMessage                                
case class NameConfirmation(name: String) extends ChatMessage                                 
case class AckNameConfirmation(name: String, isOk: Boolean) extends ChatMessage               

以下は各actorが実行するメインの処理。最初に行うのはクライアントの名前を決めること。ここはかなりややこしいことになってしまったが、やっていることは以下のようなこと。

  • クライアントから名前を受け取る
  • 受け取った名前をNameConfirmationとして他のactorに投げる
  • 他のactorは既に使用されている名前であればAckNameConfirmationとしてfalseを返す
  • 一つでもfalseが返ってきた場合は接続を切ってやり直し
  • chatのメンバー数の過半数からtrueが返ってきた場合は名前を決定して次の状態へ遷移
class ChatHandler(ref: ActorRef, memberCount: Int) extends Actor {                            
  import Tcp._                                                                                
                                                                                              
  val Crlf = "\r\n"                                                                           
  val Separator = " "                                                                         
                                                                                              
  var clientName = "undefined"                                                                
  var clientMap = new ClientMap                                                               
                                                                                              
  respondToClient(s"What your name?$Crlf")                                                    
                                                                                              
  def receive = {                                                                             
    case Received(name) =>                                                                    
      println("receive name")                                                                 
      clientName = name.utf8String.stripLineEnd                                               
      clientMap.add(clientName, sender().path)                                                
      memberCount match {                                                                     
        case 0 => context.become(loggedIn)                                                    
        case _ =>                                                                             
          notifyAll(NameConfirmation(clientName))                                             
          context.setReceiveTimeout(10 seconds)                                               
          context.become(naming)                                                              
      }                                                                                       
  }                                                                                           
                                                                                              
  def naming: Receive = {                                                                     
    case AckNameConfirmation(name, true) =>                                                   
      println(s"success $clientName")                                                         
      clientMap.add(name, sender().path)                                                      
      if (memberCount / 2 < clientMap.size) {                                                 
        context.setReceiveTimeout(Duration.Undefined)                                         
        notifyJoin(clientName)                                                                
        context.become(loggedIn)                                                              
      }                                                                                       
    case AckNameConfirmation(_, false) => failedToNameDecision                                
    case ReceiveTimeout => failedToNameDecision                                               
  }                                                                                           
                                                                                              
 def failedToNameDecision {                                                       
   println(s"failed $clientName")                                                 
   context.setReceiveTimeout(Duration.Undefined)                                  
   parent ! RemoveClient(name)                                                    
   respondToClient(s"$clientName has already used$Crlf")                          
   self ! PoisonPill                                                              
 }                                                                                

名前の衝突については後から実装したのだが、ここが最もややこしいものだった。今回はchatにいるメンバーの情報をすべてのactorがそれぞれ持っているというモデルで書いていたため、重複を見つけるのが大変になってしまった。


各メッセージに対する処理は以下。actor間でやりとりする各メッセージだけでなく、クライアントから受け取るinputによる処理もパターンマッチによってかき分けることができるのは楽だしわかりやすかった。

 def loggedIn: Receive = {                                                        
   case Received(data) =>                                                         
     println("receive message")                                                   
     println(data)                                                                
     val str = data.utf8String.stripLineEnd                                       
     val commands = str.split(Separator).toList                                   
     println(commands)                                                            
     commands match {                                                             
       case List(":t", name) => noop                                              
       case ":t" :: name :: list => tell(name, list.mkString(Separator))          
       case List(":q", _*)       => kick(clientName)                              
       case List(":k", name, _*) => kick(name)                                    
       case List(c, _*) if c.startsWith(":") =>                                   
         notice(s"unknown command $c$Crlf")                                       
       case List("") => noop                                                      
       case _ =>                                                                  
         broadcast(clientName, str)                                               
     }                                                                            
   case PeerClosed => context.stop(self)                                          
   case NameConfirmation(name) if sender == self => noop                          
   case NameConfirmation(name) =>                                                 
     println(s"NameConfirmation in $clientName")                                  
     val exist = clientMap.contains(name)                                         
     sender() ! AckNameConfirmation(clientName, !exist)                           
     if (!exist) {                                                                
       clientMap.add(name, sender().path)                                         
     }                                                                            
                                                                                  
   case Broadcast(name, str) =>                                                   
     println("receive Broadcast")                                                 
     respondToClient(s"* $name *: $str$Crlf")                                     
   case NewClient(name) =>                                                        
     clientMap.update(name, sender().path)                                        
     if (name != clientName) {                                                    
       sender() ! AckNewClient(clientName)                                        
       notice(s"$name joined$Crlf")                                               
     }                                                                            
   case AckNewClient(name) =>                                                     
     clientMap.add(name, sender().path)                                           
     notice(s"greet from $name$Crlf")                                             
   case Tell(name, str) =>                                                        
     println("receive Tell")                                                      
     respondToClient(s"** $name **: $str$Crlf")                                   
   case RemoveClient(name) if name == clientName =>                               
     println("receive RemoveClient to me")                                        
      notice(s"you are kicked$Crlf")                                                 
      self ! PoisonPill                                                              
    case RemoveClient(name) =>                                                       
      println("receive RemoveClient")                                                
      clientMap.remove(name)                                                         
      notice(s"$name exited$Crlf")                                                   
    case m => println(s"unknown message $m")                                         
  }                                                                                                                                                                 

コードの全体はこちら
https://github.com/y-kamiya/test/tree/chatserver-scala-v1/scala/ChatServer/src/main/scala


以前javaでも同様のものを書いてみたのだが、今回はactorモデルを利用することでかなり簡潔にかけたと思う。
ただ、名前の衝突の部分に関しては無駄に複雑なことになってしまったのでvol.2として修正したいと思う。各actorがそれぞれメンバーについての情報をもっているというモデルが良くなかった。

全てのメッセージを受け取って転送するようなactorを一つ作りそこにメンバーの情報をすべて持たせるようにすれば解決できそう。それだとメッセージが集中してそこがボトルネックになるのではと考えていたが、よくよく考えればさらに子アクターを生成して処理を並列化することで解決できそうだし、その方が役割を分離して各actorの処理をシンプルに保つことができそう。