scala actorでチャットサーバ
scalaの勉強のためチャットサーバを書いてみた。actorも使ってみる。
仕様
仕様は以前haskellで書いたものを参考にした。
jsapachehtml.hatenablog.com
- 存在するチャネルは一つだけ(簡単のため)
- クライアントが接続してきたときに名前を尋ねる。入力された名前が既に使われているなら他の名前にしてもらう
- クライアントからの入力は以下のようなコマンドとして処理する
- :t name message: 指定のユーザへメッセージを送る
- :k name : 指定のユーザを追い出す
- :q : 接続をやめる
- message : すべてのユーザにメッセージを投げる
- クライアントが接続したとき・接続を切ったときは他のすべてのクライアントに通知
- 正しくエラーを処理し不整合を起こさない
- ex. 二人の同名のクライアントが同時につなげてきたときはどちらか一方がつながる
アーキテクチャ
- 接続してきたクライアント毎に一つのactorを生成
- 各actorはchatにいるメンバーをすべて知っている
- actorは自分のクライアントからの入力を受け取り他のactorにChatMessageとして投げる
- ChatMessageを受け取ったactorは自身の状態を変更し、自分のクライアントへ表示すべきデータを投げる
- 接続を待ち受けてacceptするactorは現在のchatのメンバー数のみ覚えている(名前のconflictを解決するため)
各actorはactorSelectionの機能を使ってメッセージを投げ合う
実装
クライアントからの接続を待ち受ける役割をServerクラスとして作成。基本的に以下のページ(akkaのドキュメント)の内容そのまま。
Using TCP — Akka Documentation
唯一memberの数を状態として持っているが、これは各クライアントの名前の衝突を防ぐために使う。指定したactor pathに存在するアクター数を数えることができれば必要ないのだが、そのような機能はactorSelectionには見当たらなかった。
object Main {
def main(args: Array[String]) {
val system = ActorSystem("ChatServer")
val serverActor = system.actorOf(Props[Server], name = "server")
}
}
class Server extends Actor {
import Tcp._
import context.system
var memberCount = 0
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 8011))
def receive = {
case b @ Bound(localAddress) =>
println(s"start listening port ${localAddress.getPort()}")
case CommandFailed(command: Bind) =>
println(command.failureMessage)
case c @ Connected(remote, local) =>
val handler = context.actorOf(Props(classOf[ChatHandler], sender(), memberCount))
sender() ! Register(handler)
memberCount += 1
case RemoveClient(name) => memberCount -= 1
}
}
各actor間でやり取りするメッセージの定義
trait ChatMessage case class NewClient(name: String) extends ChatMessage case class AckNewClient(name: String) extends ChatMessage case class RemoveClient(name: String) extends ChatMessage case class Broadcast(name: String, msg: String) extends ChatMessage case class Tell(name: String, msg: String) extends ChatMessage case class NameConfirmation(name: String) extends ChatMessage case class AckNameConfirmation(name: String, isOk: Boolean) extends ChatMessage
以下は各actorが実行するメインの処理。最初に行うのはクライアントの名前を決めること。ここはかなりややこしいことになってしまったが、やっていることは以下のようなこと。
- クライアントから名前を受け取る
- 受け取った名前をNameConfirmationとして他のactorに投げる
- 他のactorは既に使用されている名前であればAckNameConfirmationとしてfalseを返す
- 一つでもfalseが返ってきた場合は接続を切ってやり直し
- chatのメンバー数の過半数からtrueが返ってきた場合は名前を決定して次の状態へ遷移
class ChatHandler(ref: ActorRef, memberCount: Int) extends Actor {
import Tcp._
val Crlf = "\r\n"
val Separator = " "
var clientName = "undefined"
var clientMap = new ClientMap
respondToClient(s"What your name?$Crlf")
def receive = {
case Received(name) =>
println("receive name")
clientName = name.utf8String.stripLineEnd
clientMap.add(clientName, sender().path)
memberCount match {
case 0 => context.become(loggedIn)
case _ =>
notifyAll(NameConfirmation(clientName))
context.setReceiveTimeout(10 seconds)
context.become(naming)
}
}
def naming: Receive = {
case AckNameConfirmation(name, true) =>
println(s"success $clientName")
clientMap.add(name, sender().path)
if (memberCount / 2 < clientMap.size) {
context.setReceiveTimeout(Duration.Undefined)
notifyJoin(clientName)
context.become(loggedIn)
}
case AckNameConfirmation(_, false) => failedToNameDecision
case ReceiveTimeout => failedToNameDecision
}
def failedToNameDecision {
println(s"failed $clientName")
context.setReceiveTimeout(Duration.Undefined)
parent ! RemoveClient(name)
respondToClient(s"$clientName has already used$Crlf")
self ! PoisonPill
} 名前の衝突については後から実装したのだが、ここが最もややこしいものだった。今回はchatにいるメンバーの情報をすべてのactorがそれぞれ持っているというモデルで書いていたため、重複を見つけるのが大変になってしまった。
各メッセージに対する処理は以下。actor間でやりとりする各メッセージだけでなく、クライアントから受け取るinputによる処理もパターンマッチによってかき分けることができるのは楽だしわかりやすかった。
def loggedIn: Receive = {
case Received(data) =>
println("receive message")
println(data)
val str = data.utf8String.stripLineEnd
val commands = str.split(Separator).toList
println(commands)
commands match {
case List(":t", name) => noop
case ":t" :: name :: list => tell(name, list.mkString(Separator))
case List(":q", _*) => kick(clientName)
case List(":k", name, _*) => kick(name)
case List(c, _*) if c.startsWith(":") =>
notice(s"unknown command $c$Crlf")
case List("") => noop
case _ =>
broadcast(clientName, str)
}
case PeerClosed => context.stop(self)
case NameConfirmation(name) if sender == self => noop
case NameConfirmation(name) =>
println(s"NameConfirmation in $clientName")
val exist = clientMap.contains(name)
sender() ! AckNameConfirmation(clientName, !exist)
if (!exist) {
clientMap.add(name, sender().path)
}
case Broadcast(name, str) =>
println("receive Broadcast")
respondToClient(s"* $name *: $str$Crlf")
case NewClient(name) =>
clientMap.update(name, sender().path)
if (name != clientName) {
sender() ! AckNewClient(clientName)
notice(s"$name joined$Crlf")
}
case AckNewClient(name) =>
clientMap.add(name, sender().path)
notice(s"greet from $name$Crlf")
case Tell(name, str) =>
println("receive Tell")
respondToClient(s"** $name **: $str$Crlf")
case RemoveClient(name) if name == clientName =>
println("receive RemoveClient to me")
notice(s"you are kicked$Crlf")
self ! PoisonPill
case RemoveClient(name) =>
println("receive RemoveClient")
clientMap.remove(name)
notice(s"$name exited$Crlf")
case m => println(s"unknown message $m")
} コードの全体はこちら
https://github.com/y-kamiya/test/tree/chatserver-scala-v1/scala/ChatServer/src/main/scala
以前javaでも同様のものを書いてみたのだが、今回はactorモデルを利用することでかなり簡潔にかけたと思う。
ただ、名前の衝突の部分に関しては無駄に複雑なことになってしまったのでvol.2として修正したいと思う。各actorがそれぞれメンバーについての情報をもっているというモデルが良くなかった。
全てのメッセージを受け取って転送するようなactorを一つ作りそこにメンバーの情報をすべて持たせるようにすれば解決できそう。それだとメッセージが集中してそこがボトルネックになるのではと考えていたが、よくよく考えればさらに子アクターを生成して処理を並列化することで解決できそうだし、その方が役割を分離して各actorの処理をシンプルに保つことができそう。