お風呂ライオン工場

まとめブログ

SQS標準キューでLambdaトリガーを再有効化したときの処理順について実験したよ!

SQSで送信順に処理をしたいならFIFOキューを使うと思います。
ですが、標準キューでもLambdaトリガーなどで処理をしている場合はだいたい送信順にメッセージを受信して処理してくれます。
SQSのデベロッパーガイドにも、標準キューの「メッセージ順序」のところに次のように記載があります。

標準キューでは、できる限りメッセージの順序を保持しますが、複数のメッセージのコピーが順無同で配信される場合があります。お使いのシステムで注文を保存する必要がある場合は、FIFO (先入れ先出し) キューするか、各メッセージに順序付け情報を追加して、受信時にメッセージを並べ直せるようにすることもできます。

今回はなんらかの理由によりLambdaトリガーを一度無効化して、その後再度有効化した場合に、標準キューの滞留したメッセージはどれくらい順序を保って処理されるのかを実験してみました。

目次

環境

環境はSQS標準キューとLambda関数を作成し、Lambdaのトリガーにキューを追加します。
関数のコードはsqs-pollerブループリントのJavaScriptコードを使わせてもらいます。
結果を確認しやすいようにLambdaの同時実行数は1にします。

f:id:kkmtyyz:20211016212532p:plain
環境図

CloudFormationのコード:

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  MyQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: 'MyQueue'

  SqsPollerLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: 'SqsPoller'
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - 'lambda.amazonaws.com'
            Action:
                - 'sts:AssumeRole'
      Policies:
        - PolicyName: 'SqsPollerSQSPolicy'
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - 'sqs:DeleteMessage'
                  - 'sqs:GetQueueAttributes'
                  - 'sqs:ReceiveMessage'
                Resource:
                  - !GetAtt MyQueue.Arn
        - PolicyName: 'SqsPollerLogPolicy'
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                  - 'logs:CreateLogGroup'
                Resource:
                  - '*'

  SqsPollerLambda:
    Type: AWS::Lambda::Function
    Properties: 
      FunctionName: 'SqsPoller'
      Role: !GetAtt SqsPollerLambdaRole.Arn
      Handler: 'index.handler'
      Runtime: 'nodejs12.x'
      ReservedConcurrentExecutions: 1
      Code: 
        ZipFile:  |
          console.log('Loading function');
          
          exports.handler = async (event) => {
              //console.log('Received event:', JSON.stringify(event, null, 2));
              for (const { messageId, body } of event.Records) {
                  console.log('SQS message %s: %j', messageId, body);
              }
              return `Successfully processed ${event.Records.length} messages.`;
          };

  SqsPollerLambdaEventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      Enabled: true  
      EventSourceArn: !GetAtt MyQueue.Arn
      FunctionName: !Ref SqsPollerLambda

  SqsPollerLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${SqsPollerLambda}'

実験

実験手順は次の通りです。

  1. メッセージ1から10を送信
  2. Lambda関数のログを確認(トリガー無効化前)
  3. トリガーを無効化
  4. メッセージ1から10を送信
  5. キューの滞留メッセージを確認
  6. トリガーを再度有効化
  7. Lambda関数のログを確認(トリガー最有効化後)

1. メッセージ1から10を送信

作成したキューにメッセージを送信します。

$ aws_account=`aws sts get-caller-identity | jq -r .Account`
$ for i in `seq 1 10`; do aws sqs send-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/${aws_account}/MyQueue" --message-body "$i"; done

2. Lambda関数のログを確認(トリガー無効化前)

LambdaトリガーによってCloudWatch Logsに書き込まれたログを確認します。
1から10のメッセージを送信された順に処理していることがわかります。

f:id:kkmtyyz:20211017155517p:plain
トリガー無効化前のログ

3. トリガーを無効化

Lambdaトリガーを無効化します。

$ uuid=`aws lambda list-event-source-mappings | jq -r '.EventSourceMappings[] | select(.FunctionArn | contains("SqsPoller")) | .UUID'`
$ aws lambda update-event-source-mapping --uuid $uuid --no-enabled
$ aws lambda get-event-source-mapping --uuid $uuid | jq .State
"Disabled" 

4. メッセージ1から10を送信

再びキューにメッセージを送信します。

$ for i in `seq 1 10`; do aws sqs send-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/${aws_account}/MyQueue" --message-body "$i"; done

5. キューの滞留メッセージを確認

キューにメッセージが滞留していて、Lambdaトリガーによって処理されていないことを確認します。

$ for i in `seq 10`; do aws sqs receive-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/${aws_account}/MyQueue" --max-number-of-messages 10 | jq -r .Messages[].Body; done
10
2
5
8
9
3
7
4
6
1

6. トリガーを再度有効化

キューの可視性タイムアウトが30秒なので、少し時間をおいてからLambdaトリガーを再度有効化します。

$ aws lambda update-event-source-mapping --uuid $uuid --enabled
$ aws lambda get-event-source-mapping --uuid $uuid | jq .State
"Enabled" 

7. Lambda関数のログを確認(トリガー最有効化後)

LambdaトリガーによってCloudWatch Logsに書き込まれたログを確認します。
手順5でreceive-messageを叩いたときに薄々そんな気はしてましたが、バラバラに受信されていることがわかります。

f:id:kkmtyyz:20211017160028p:plain
トリガー再度有効化後のログ

結果

結果としてはSQS標準キューにてメッセージを滞留させた状態でLambdaトリガーを再有効化すると、受信時には送信時の順序がバラバラになりました。
トリガーが有効化された状態では送信順に処理されたので、すごい頻度でポーリングしてるのかなと思いました。ポーリングのリクエスト毎に課金されるようなので、イベントを有効化した状態で放置してしまうとどんどん課金されてしまいそうです。

まとめ

そもそも標準キューで順番を気にするようなことをするなよと言えばそれはそうですが、いざ「だいたい送信順に処理されるって書いてあるけど、トリガーを無効化した後に再度有効化したらどうなるの?」って聞かれたときに困ってしまったので、実験してみた次第でした。
しかしCloudFormationを書くのはまだまだ時間がかかってしまいます。コンソールでは意識していないリソースもCloudFormationでは認識する必要があったり、設定もコンソールではサクサクできてもCloudFormationでは別のリソースとして定義しなければならなかったりと、まだまだ知らないことが多いなあと思います。
しかしそれが楽しいんですけどね😊