AWS SQS の visibility timeout 調査
Amazon Simple Queue Service (SQS) は AWS のメッセージキューイングサービスです。完全マネージドなサービスであり、ボリューム等の細かい心配をせずにシステム間の連携に使用できるのが嬉しいのですが、SQS にはいくつか単純なキューにはない独自の実装を持っています。
その一つとして visibility timeout というものがあります。これは 「自分以外の他の consumer が同じメッセージを (できるだけ) 受信しないようにするため」 の機能なのですが、ドキュメントを読んだだけでは細かい挙動でわからないところがあったので、実際に動かして調査してみました。
テストに使用したコードは こちら に置いています。
Queue の作成
まずはテストに使用するキューを作成します。
# Create queue
$ aws sqs create-queue --queue-name sample-queue
作成したキューのデフォルト設定は以下のようになります。
VisibilityTimeout: 30
となっているので、デフォルトでは受信したメッセージが 30 秒は他の consumer から見えなくなるということがわかります。
# Describe queue
$ $ aws sqs get-queue-attributes --queue-url <queue_url> --attribute-names All
{
"Attributes": {
"QueueArn": "<queue_arn>",
"ApproximateNumberOfMessages": "0",
"ApproximateNumberOfMessagesNotVisible": "0",
"ApproximateNumberOfMessagesDelayed": "0",
"CreatedTimestamp": "1505547619",
"LastModifiedTimestamp": "1505547619",
"VisibilityTimeout": "30",
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "345600",
"DelaySeconds": "0",
"ReceiveMessageWaitTimeSeconds": "0"
}
}
Queue へのメッセージ送受信
Visibility timeout について調査する前に、SQS を介した簡単なメッセージの送受信を行ってみます。
以下では aws-java-sdk-sqs を使用して Scala の簡単なコードを書いて試しています。
送信
- SQS へのメッセージ送信のために
SimpleMessagaeSender
を定義AmazonSQS
を介して SQS とやり取りをするSendMessageRequest
が送信するメッセージを表す
package com.tiqwab.example.sqs
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.SendMessageRequest
class SimpleMessageSender(
val queueUrl: String
) {
lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
def send(msg: String): Unit = {
val request = new SendMessageRequest()
.withMessageBody(msg)
.withQueueUrl(queueUrl)
sqs.sendMessage(request)
}
}
SimpleMessageSender
を使用してキューにメッセージを投げてみます。
package com.tiqwab.example.sqs
object SendMessageMain {
def main(args: Array[String]): Unit = {
val sender = new SimpleMessageSender(
queueUrl =
"https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
)
sender.send("hello")
}
}
AWS の Web コンソールを見るとたしかにメッセージ数が増えていることがわかります。
受信
- SQS からのメッセージ受信のために
SimpleMessageReceiver
を定義AmazonSQS#receiveMessage
にReceiveMessageRequest
オブジェクトを渡すAmazonSQS#deleteMessage
でメッセージを処理したことになる- これを呼ばないと visibility timeout (default 30s) 後に再度キューから取得可能になる
package com.tiqwab.example.sqs
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.Message
import scala.collection.JavaConverters._
class SimpleMessageReceiver(
val queueUrl: String
) {
lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
def receive(f: Message => Unit): Unit = {
val request = new ReceiveMessageRequest(queueUrl)
val messages = sqs.receiveMessage(request).getMessages.asScala
messages.foreach { m =>
f(m)
sqs.deleteMessage(queueUrl, m.getReceiptHandle)
}
}
}
実際にメッセージを受信してみます。
package com.tiqwab.example.sqs
object ReceiveMessageMain {
def main(args: Array[String]): Unit = {
val receiver = new SimpleMessageReceiver(
queueUrl =
"https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
)
receiver.receive(m => println(m.getBody))
}
}
受信の挙動は以下のようになります。
- 初回実行後 “hello” というメッセージが出力される
- 二度目以降はメッセージが削除されているので取得されない
Visibility timeout の挙動
ここからが本題で visibility timeout を使用したメッセージ受信の挙動を確認していきます。
基本
- メッセージ受診時に削除を行わない場合、visibility timeout 後に再度メッセージが取得される
- 削除しない限り何度でもメッセージが受信され得る
この挙動を確認するためにメッセージを受信後削除しないよう受信側のクラスを変更しました。
class SimpleMessageReceiver(
val queueUrl: String
) {
lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
// Accept Boolean to control deletion
def receive(f: String => Unit, doesDelete: Boolean = true): Unit = {
val request = new ReceiveMessageRequest(queueUrl)
val messages = sqs.receiveMessage(request).getMessages.asScala
messages.foreach { m =>
f(m)
if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
}
}
}
object ReceiveMessageMain {
def main(args: Array[String]): Unit = {
val receiver = new SimpleMessageReceiver(
queueUrl =
"https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
)
@tailrec
def loop(n: Int): Unit = n match {
case 0 =>
()
case _ =>
println(s"${System.currentTimeMillis()}: ")
receiver.receive(msg => println(s"$msg.getBody"), false)
Thread.sleep(1000)
loop(n - 1)
}
loop(65)
}
}
- Visibility timeout をデフォルト値の 30 秒とし、1 秒ごとにメッセージ受信を試みている
- おおよそ 30 秒後にメッセージが繰り返し受信されることがわかる
...
1505550644841:
visibility_timeout_sample
...
1505550686487:
visibility_timeout_sample
...
1505550717256:
visibility_timeout_sample
...
Attribute の利用
- メッセージには body 以外にも
Attribute
,MessageAttribute
と呼ばれる属性が付属している- Attribute: システム側が自動的に設定している属性?
- MessageAttribute: ユーザ側が設定できる属性
ReceiveMessageRequest
作成時にメッセージと一緒に持ってきたい属性を指定する- 受信後に属性を変更し、visibility timeout した場合、セットした属性は反映されない
上の挙動を確認するために、メッセージ受信時に ApproximateReceiveCount
と LastReceivedTimestamp
という二つの属性を表示するようにしました。
ApproximateReceiveCount
はシステムが自動的につけている値、LastReceivedTimestamp
は自分で独自にメッセージ受信後につけている値です。
class SimpleMessageReceiver(
val queueUrl: String
) {
lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
// Receive messages with attributes
def receive(f: Message => Unit, doesDelete: Boolean = true): Unit = {
val request = new ReceiveMessageRequest(queueUrl)
.withAttributeNames("ApproximateReceiveCount")
.withMessageAttributeNames("LastReceivedTimestamp")
val messages = sqs.receiveMessage(request).getMessages.asScala
messages.foreach { m =>
f(m)
if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
}
}
}
object ReceiveMessageMain {
def main(args: Array[String]): Unit = {
val receiver = new SimpleMessageReceiver(
queueUrl =
"https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
)
@tailrec
def loop(n: Int): Unit = n match {
case 0 =>
()
case _ =>
println(s"${System.currentTimeMillis()}: ")
receiver.receive(
msg => {
println(
s"body: ${msg.getBody}, attributes: ${msg.getAttributes}, messageAttributes: ${msg.getMessageAttributes}")
val attributes = msg.getMessageAttributes.asScala
val lastReceivedTimestamp = new MessageAttributeValue()
.withDataType("Number")
.withStringValue(System.currentTimeMillis.toString)
msg.setMessageAttributes(
attributes
.updated("LastReceivedTimestamp", lastReceivedTimestamp)
.asJava)
},
false
)
Thread.sleep(1000)
loop(n - 1)
}
loop(65)
}
}
ApproximateReceiveCount
は受信毎にカウントが増えている- メッセージ受信後、自分で
LastReceivedTimestamp
を設定しているが、次回受診時には反映されていない
...
1505629833497:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=14}, messageAttributes: {}
...
1505629866777:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=15}, messageAttributes: {}
...
1505629898377:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=16}, messageAttributes: {}
...
Visibility timeout の設定
- visibility timeout は受診時に再設定することができる
以下のクラスでは ApproximateReceiveCount
を利用してメッセージの受信毎に visibility timeout 値を再設定しています。
class SimpleMessageReceiver(
val queueUrl: String
) {
lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
def receive(f: Message => Unit, doesDelete: Boolean = true): Unit = {
val request = new ReceiveMessageRequest(queueUrl)
.withAttributeNames("ApproximateReceiveCount")
val messages = sqs.receiveMessage(request).getMessages.asScala
messages.foreach { m =>
f(m)
// visibility timeout の再設定
m.getAttributes
.get("ApproximateReceiveCount")
.asInstanceOf[String]
.toInt match {
case 1 => // 1 回目の受信
sqs.changeMessageVisibility(
createChangeVisibilityRequest(10, m.getReceiptHandle))
case 2 => // 2 回目の受信
sqs.changeMessageVisibility(
createChangeVisibilityRequest(20, m.getReceiptHandle))
case 3 => // 3 回目の受信
sqs.changeMessageVisibility(
createChangeVisibilityRequest(120, m.getReceiptHandle))
Thread.sleep(30000)
sqs.changeMessageVisibility(
createChangeVisibilityRequest(60, m.getReceiptHandle))
case _ => // それ以降
()
}
if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
}
}
def createChangeVisibilityRequest(
timeout: Int,
receiptHandle: String
): ChangeMessageVisibilityRequest =
new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withReceiptHandle(receiptHandle)
.withVisibilityTimeout(timeout)
}
object ReceiveMessageMain {
def main(args: Array[String]): Unit = {
val receiver = new SimpleMessageReceiver(
queueUrl =
"https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
)
@tailrec
def loop(n: Int): Unit = n match {
case 0 =>
()
case _ =>
println(s"${System.currentTimeMillis()}: ")
receiver.receive(
msg => {
println(s"body: ${msg.getBody}, attributes: ${msg.getAttributes}")
},
false
)
Thread.sleep(1000)
loop(n - 1)
}
loop(180)
}
}
結果は以下のようになります。
ChangeVisibilityRequest
で設定した時間がちゃんと反映されている- 設定した visibility timeout は次のメッセージにのみ有効
- 一度設定しても次々回の受信はデフォルトの visibility timeout が使用される
- あるメッセージ受信時に複数回
ChangeVisibilityRequest
を行った場合、リクエストを送信したタイミングから設定した visibility timeout が有効になる- 換言すれば visibility timeout の延長ができるということ
1505632893061: # <- (set visibility timeout to 10s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=1}
...
1505632905142: # <- (set visibility timeout to 20s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=2}
...
1505632929236: # (set visibility timeout to 120s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=3}
(wait 30s)
1505632960979: # (set visibility timeout to 60s)
...
1505633021165:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=4}
...
1505633058395:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=5}
...
1505633092792:
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=6}
...
Memo
アプリケーションをデーモンとして実行するために sbt-native-packager が便利
$ sbt stage
$ ./target/universal/stage/bin/extension-receive-message-main &