たまに akka-http を触るときに毎回サーバの起動の仕方や Route の書き方をググり直すのが手間なので、自分なりにサンプルを作成しておくことにしました。

下記は簡単な解説で、サンプルコード全体は こちら に置いています。

シンプルに HTTP サーバを動かす

まずは akka-http を最小限の設定とともに動かしてみます。 サーバを起動する main 関数は以下のように書くことができます。

package com.tiqwab.example.akka

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object SimpleServer extends LazyLogging {

  def main(args: Array[String]): Unit = {
    // Typesafe Config のロード。デフォルトではクラスパス下の `application.conf` を読む
    val config: Config = ConfigFactory.load()
    val host = config.getString("http.host")
    val port = config.getInt("http.port")

    // 上の Config を使用して ActorSystem の作成
    implicit val system: ActorSystem = ActorSystem("simple-system", config)
    // Materializer は akka-http が依存する akka-stream に必要となる
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val ec: ExecutionContext = system.dispatcher

    // サーバの起動
    val route = new TopicRoute().routes
    Http().bindAndHandle(route, host, port).onComplete {
      case Success(binding) =>
        logger.info(s"Server start at ${binding.localAddress}")
      case Failure(e) =>
        logger.error(s"Error occurred while starting server", e)
        Await.result(system.terminate(), Duration.Inf)
        sys.exit(1)
    }
  }

}

akka-http でサーバを起動する際には Route と呼ばれるものを渡す必要があります。 上で bindAndHandle の第一引数に渡している route がそれで、サーバの提供する API を独自の DSL で定義したものです。

package com.tiqwab.example.akka

import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Route

class TopicRoute {

  def routes: Route =
    pathPrefix("topic" / Segment) { topicName =>
      // GET /topic/:topicName
      complete((OK, s"Accept request for $topicName"))
    }

}

application.conf はお試しで以下のようにしました。

akka {
  // 全体のログレベル調整。 'OFF' で抑制可能
  loglevel = INFO
  // システムの startup, shutdown 時に出力されることがある。'OFF' で抑制可能
  stdout-loglevel = INFO

  http {
    server {
      // HTTP レスポンスの Server ヘッダの値
      server-header = "Simple API Server"
    }
  }
}

http {
  host = "127.0.0.1"
  host = ${?SIMPLE_SERVER_HOST}
  port = "8080"
  port = ${?SIMPLE_SERVER_PORT}
}

sbt run でサーバの動作が確認できます。

# Start server
$ sbt run
2017-11-23 14:55:38,953 INFO [simple-system-akka.actor.default-dispatcher-2] c.t.e.a.SimpleServer$ Server start at /127.0.0.1:8080

...

# Execute command in another process
$ curl -v http://localhost:8080/topic/test
> GET /topic/test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.56.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Simple API Server
< Date: Thu, 23 Nov 2017 05:47:47 GMT
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 23
< 
Accept request for test

簡単な API の実装

次は実際に簡単な API を実装してみます。 ここでは以下のような POST /topic/:name に body, timestampMillis を持つ json を投げることでメッセージの保存を行い、保存したメッセージの ID を返却する、という API を作成します。

$ curl -v -H "Content-Type: application/json" http://localhost:8080/topic/test -d '{"body": "hoge", "timestampMillis": 1}'
> POST /topic/test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.56.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 38
> 
< HTTP/1.1 200 OK
< Server: Simple API Server
< Date: Thu, 23 Nov 2017 06:50:51 GMT
< Content-Type: application/json
< Content-Length: 11
< 
{"id":"71"}

このエンドポイントに相当する Route 定義を抜粋すると以下のようになります。

package com.tiqwab.example.akka

import akka.actor.ActorSystem
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.util.Timeout
import com.typesafe.scalalogging.LazyLogging

import scala.util.{Failure, Success}

class TopicRoute(val system: ActorSystem, val timeout: Timeout)
  extends TopicApi with LazyLogging {

  def routes: Route =
    pathPrefix("topic" / Segment) { topicName =>
      // POST /topic/:name
      post {
        // リクエストから SaveMessageRequest を作成する
        // その際適当な Unmarshaller が必要
        entity(as[SaveMessageRequest]) { request =>
          // onComplete に Future を渡し、完了時の処理を記述する (Try として渡される)
          // saveMessage 処理の内容は後述
          onComplete(saveMessage(topicName, request)) {
            case Success(message) =>
              // complete((StatusCode, T)) で HTTP ステータスコードを指定してレスポンス
              // ここでは 型 T をレスポンスにする適当な Marshaller が必要
              complete((OK, SaveMessageResponse(message.id)))
            case Failure(e) =>
              logger.error("Error occurred while saving message", e)
              complete(InternalServerError)
          }
        }
      } ~
      ...
}

この例では akka-http における

  • リクエストボディの json を指定のクラスにデシリアライズする
  • レスポンスの HTTP ステータスコードの指定
  • 指定のクラスをレスポンスボディの json としてシリアライズする

といった方法を示しています。

リクエストから、またはレスポンスへとあるオブジェクトを変換したい場合、akka-http ではそれぞれ Unmarshaller, Marshaller を定義する必要があります。

json を扱う API の場合、主要な json ライブラリ用に Unmarshaller, Marshaller が既に何らかの形で提供されていることが多いと思うので、それらを使用することができます (上の場合は spray-json を使用し、依存ライブラリ akka-http-spray-json からの akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport を import している)。

Actor の実装

上でリクエストを受け付けられるようになったので、それを実際に処理する部分を実装します。

上の TopicRoute に継承してもらっている TopicApi trait には Route と処理を行う Actor との橋渡しを行ってもらっており、主に TopicList Actor に適当なメッセージを ask するという実装になっています。

このように Route を定義するクラスと具体的な処理を行うクラスを分けておくことでクラス間の責任がはっきりしていいかなと思います。

package com.tiqwab.example.akka

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Future

trait TopicApi {

  implicit val system: ActorSystem
  // 下記の akka.pattern.ask のために必要
  // Timeout で指定した duration を超えてもレスポンスが返ってこない場合、Future の結果が AskTimeoutException になる
  implicit val timeout: Timeout

  val topicList: ActorRef = system.actorOf(TopicList.props, "topicList")

  def saveMessage(topicName: String, request: SaveMessageRequest): Future[Message] =
    // akka.pattern.ask を import することで Actor からのレスポンスを Future で受け取れる
    (topicList ? TopicList.SaveMessage(topicName, request.body, request.timestampMillis)).mapTo[Message]

  def getMessage(topic: String, id: String): Future[Option[Message]] =
    (topicList ? TopicList.GetMessage(topic, id)).mapTo[Option[Message]]

  def listMessage(topic: String): Future[Option[Seq[Message]]] =
    (topicList ? TopicList.ListMessage(topic)).mapTo[Option[Seq[Message]]]

}

TopicList はよくある感じに実装した Actor であり、Topic Actor のまとめ役のような役割をしています。

package com.tiqwab.example.akka

import akka.actor._

class TopicList extends Actor with ActorLogging {

  var topicMap: Map[String, ActorRef] = Map.empty[String, ActorRef]

  override def receive: Receive = {
    case TopicList.SaveMessage(topicName, body, timestampMillis) =>
      val topic = topicMap.getOrElse(topicName, {
        val newTopic = context.actorOf(Topic.props(topicName))
        topicMap = topicMap + (topicName -> newTopic)
        newTopic
      })
      topic forward Topic.SaveMessage(body, timestampMillis)
    case TopicList.GetMessage(topicName, id) =>
      topicMap.get(topicName) match {
        case None =>
          sender() ! None
        case Some(topic) =>
          topic forward Topic.GetMessage(id)
      }
    case TopicList.ListMessage(topicName) =>
      topicMap.get(topicName) match {
        case None =>
          sender() ! None
        case Some(topic) =>
          topic forward Topic.ListMessage
      }
  }

}

object TopicList {
  def props: Props = Props(new TopicList())
  case class SaveMessage(topic: String, body: String, timestampMillis: Long)
  case class MessageSaved(id: String)
  case class GetMessage(topic: String, id: String)
  case class ListMessage(topic: String)
}

Topic Actor が実際にメッセージの保存や取得の機能を果たしています。

package com.tiqwab.example.akka

import akka.actor._

class Topic(val topicName: String) extends Actor with ActorLogging {
  var nextId: Long = 1
  var messages: Seq[Message] = Seq.empty[Message]
  override def receive: Receive = {
    case Topic.SaveMessage(body, timestampMillis) =>
      val id = nextId.toString
      nextId += 1
      val message = Message(id, body, timestampMillis)
      messages = messages :+ message
      sender() ! message
    case Topic.GetMessage(id) =>
      sender() ! messages.find(_.id == id)
    case Topic.ListMessage =>
      // 単体で見ると Some で包む必要は無いが、TopicList からの使われ方の関係で
      sender() ! Some(messages)
  }
}

object Topic {
  def props(topicName: String): Props = Props(new Topic(topicName))
  case class SaveMessage(body: String, timestampMillis: Long)
  case class GetMessage(id: String)
  case object ListMessage
}

実装した API の動作を確認します。

# Save message
$ curl -v -H "Content-Type: application/json" http://localhost:8080/topic/test -d '{"body": "hoge", "timestampMillis": 1}'
{"id":"1"}

# Get message
$ curl -v -H "Content-Type: application/json" http://localhost:8080/topic/test/1
{"id":"1","body":"hoge","timestampMillis":1}

# List messages
$ curl -v -H "Content-Type: application/json" http://localhost:8080/topic/test
{"messages":[{"id":"1","body":"hoge","timestampMillis":1}]}

感想

  • Actor モデルはあまり馴染みのない概念なのでけっこう面白い
  • akka-http は Scala で HTTP インタフェースを持つ簡単な API サーバを作るのに便利そう
  • Receive の型が Any => Unit の部分関数なので型の保証が無いのが複雑な実装になったときに不安な気がする