薄っぺらりん

厚くしていきたい

AWSでストリーミングデータ処理

目次

はじめに

最近はもうすっかりデジタル社会になり、今こうしている間にも膨大なデータが生み出されてインターネット上を流れています。
スマホや時計などあらゆるものが複数のセンサーを積むことが当たり前になり、必要なデータから不要なデータまであらゆるデータを収集してBIシステムやExcelや人間がそこから有益な情報を取り出して、生活を便利にしたり楽しくするのに役立てています。
今回はそんなストリーミングデータを受信するシステムの話です。

AWSでストリーミングデータ処理

AWSにはストリーミングデータを処理する方法が沢山用意されています。
今回取り上げるのはSQSとLambdaを使用した次のようなアーキテクチャです。

ストリーミングデータ処理のアーキテクチャ

データはSQSメッセージとして受けます。他のパターンについては今後記事にしたいと思います。
流れは次のようになります。

  1. SQSメッセージとしてストリーミングデータを受けます
  2. Lambdaイベントソースマッピングでメッセージを変換し、次のLambdaに渡すためSQSメッセージを送信します
  3. Lambdaイベントソースマッピングでメッセージを変換し、データをRedshiftへ保存するためFirehoseへ書き込みます
  4. FirehoseがRedshiftへデータを書き込みます

Redshiftは大量のinsertが苦手なので、Firehoseを手前に置きます。
Firehoseを使用すると複数データをひとつのS3オブジェクトにまとめ、バッファ設定に従ってRedshiftへCOPYコマンドで書き込むことができます。
バッファ設定は流量やデータサイズから設定しましょう。

docs.aws.amazon.com

docs.aws.amazon.com

このアーキテクチャの良いところ

次のような良い点があります。

  • 流量の変化にLambdaの同時実行で対応できる
  • データをSQSで滞留させられる
  • メッセージの再処理が簡単
  • SQSとLambdaを沢山繋げられる

ひとつずつ見ていきましょう。

流量の変化にLambdaの同時実行で対応できる

ストリーミングデータの流量変化に、Lambdaの同時実行で対応することができます。
パフォーマンステスト時にLambdaのメモリや同時実行数を大まかに決めることになりますが、あとあと変更が必要になった場合も簡単に対応できます。
予約可能なLambdaの同時実行数はアカウント当たり1000までなので、規模が大きくなる場合は上限緩和をリクエストしましょう。

データをSQSで滞留させられる

個人的にこれが一番好きな点です。
Lambda関数を更新する場合や、エラー調査のためにデータ処理を停止したい場合にSQSでメッセージを滞留させることができます。
手順としては、マネジメントコンソールからLambda関数を開き、トリガー設定からDisabledにするだけです。

Lambdaトリガー設定

メッセージの再処理が簡単

メッセージ削除前にLambdaが異常終了した場合、SQSに設定した可視性タイムアウトに従ってメッセージを再処理することができます。
Lambdaはタイムアウトが15分なので、可視性タイムアウトはそれ以上の値にしておくと安心です。
再処理回数はデッドレターキューの設定のところにある最大受信数から設定できます。
大受信数を超えたメッセージはデッドレターキューに入りますが、デッドレターキューのマネジメントコンソール右上にある「DLQ再処理の開始」ボタンから簡単に再処理することができます。

DLQ再処理の開始ボタン

SQSとLambdaを沢山繋げられる

アーキテクチャ図ではLambda関数が2つでしたが、同じ要領でいくつもLambda関数を増やすことができます。
データ分割を行ったり、実行する処理の種類によって関数を分けたりすることができます。
SQSのひとつのメッセージに対して複数の異なるLambda関数を適用したいときは、SNSを使用したファンアウトパターンが良いかもしれません。

ファンアウトパターン

このアーキテクチャの注意点

次のような注意点があります

  • SQSメッセージは最大14日間しか保留できないので、LambdaでS3やDynamoDBなどのストレージにメッセージを保存したり、ログに出しておかない限り14日より前のメッセージが失われてしまいます。CloudWatchアラームを使用してメッセージがデッドレターキューに入ったことを見落とさないようにする必要があります。
  • SQSメッセージは最大サイズが256KiBなので、これより大きなデータを扱う場合はS3にデータを置いてSQSメッセージのサイズを減らしましょう。S3イベント通知を使用してもよいですが、その場合はイベント通知の重複を考慮する必要があります。
  • もしも流量が大きく変化しない場合はLambdaよりもEC2の方が安く済む場合があるので、事前に見積もりをして確認しましょう。

構築してみる

実際に上の図と同じアーキテクチャAWS上に構築してみましょう。
今回はLambda関数とSQSをAWS CDKで構築します。
RedshiftはRedshift Serverlessを使用します。FirehoseとRedshift Serverlessは現在L1コンストラクトのみ存在しているため、マネジメントコンソールから作成します。
S3やVPCなどは既存のものを使用します。

RedshiftとFirehose

まずRedshift Serverlessでデータベースを作成します。
設定はカスタマイズを選択します。

関連付けられたIAMロールは「ロールを作成」から作成することができます。
その際にS3バケットを指定することで自動的にCOPYコマンドの実行に必要なポリシーを作成してくれます。
Firehose配信ストリームの配信先バケットとするS3バケットを選択しましょう。

FirehoseからRedshift Serverlessへアクセスするためにはパブリックアクセスが必要なので、ネットワーク設定ではパブリックサブネットを選択します。
RPUなどの設定を行い、設定を完了します(Redshiftは高価なのでRPUに注意しましょう)。

ワークグループが作成されたら、セキュリティグループの設定を開き、Firehoseからのアクセスを許可するインバウンドルールを追加します。
リージョン毎のFirehoseのIPアドレスは以下ドキュメントから確認できます。
Controlling Access with Amazon Kinesis Data Firehose - Amazon Kinesis Data Firehose
この時点では13.113.196.224/27です。

ワークグループの「ネットワークとセキュリティ」から「パブリックにアクセス可能」設定をオンにして、ネットワークの設定は完了です。

次にクエリエディタを開き、テーブルを作成します。
今回はid列とvalue列を持ったstreaming_dataテーブルを作成します。

これでRedshiftの用意は完了です。
次にFirehose配信ストリームを作成します。
今回はLambdaからレコードを送信するので、ソースはDirect PUTを選択します。
送信先Amazon Redshiftです。
配信ストリーム名はstreaming-data-streamとします。この名前は後ほどLambda関数作成時に使用します。

送信先タイプは「Serverlessワークグループ」を選択し、以降のワークグループやデータベースなどを埋めます。

S3の中間送信先として、Redshift作成時に選択したバケットを指定します。
今回はLambda関数からjsonデータをFirehoseに向けて送信するので、COPYコマンドにはjson 'auto'オプションを指定します。
その他のオプションはRedshiftのドキュメントから確認することができます。

JSON 形式からの COPY - Amazon Redshift

今回は早く結果が確認できるように、バッファのヒントからバッファ間隔を60秒とします。

これでFirehoseの作成は完了です。

LambdaとSQS

図と同じになるように2つのLambda関数を使用する構成で作成します。
赤で囲ったSQSとLambda関数を1つのコンストラクトとして定義しています。
最初のLambda関数をLambda1、次のLambda関数をLambda2とします。

SQSとLambdaのコンストラク

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambda_evnt_src from 'aws-cdk-lib/aws-lambda-event-sources';
import * as iam from 'aws-cdk-lib/aws-iam';

export interface SqsLambdaProps {
  lambdaCodePath: string, // Lambda関数のソースコードへのパス
  lambdaReservedConcurrentExecutions: number // Lambda関数の同時実行数
  lambdaEnv?: {[key: string]: string}, // Lambda関数へ設定する環境変数
  nextQueueArn?: string, // Lambda関数からメッセージ送信するSQS ARN
  lambdaRolePolicy?: iam.PolicyStatement // Lambda実行ロールに追加するIAMポリシー
}

export class SqsLambda extends Construct {
  queueUrl: string;
  queueArn: string;
  
  constructor(scope: Construct, id: string, props: SqsLambdaProps) {
    super(scope, id);
    
    const dlq = new sqs.Queue(this, 'Dlq', {
      visibilityTimeout: cdk.Duration.seconds(30)
    });
    
    const q = new sqs.Queue(this, 'Sqs', {
      visibilityTimeout: cdk.Duration.seconds(900),
      deadLetterQueue: {
        maxReceiveCount: 5,
        queue: dlq
      }
    });
    this.queueUrl = q.queueUrl;
    this.queueArn = q.queueArn;
    
    const f = new lambda.Function(this, 'Function', {
      code: lambda.Code.fromAsset(props.lambdaCodePath),
      handler: "lambda_function.lambda_handler",
      runtime: lambda.Runtime.PYTHON_3_11,
      reservedConcurrentExecutions: props.lambdaReservedConcurrentExecutions,
      environment: props.lambdaEnv
    });
    if (props.hasOwnProperty("nextQueueArn") && props.nextQueueArn !== undefined) {
      // 次のSQSがある場合は送信用の権限を付与する
      const queueAccessPolicy = new iam.PolicyStatement();
      queueAccessPolicy.addActions("sqs:SendMessage");
      queueAccessPolicy.addResources(props.nextQueueArn);
      f.addToRolePolicy(queueAccessPolicy);
    }
    if (props.hasOwnProperty("lambdaRolePolicy") && props.lambdaRolePolicy !== undefined) {
      f.addToRolePolicy(props.lambdaRolePolicy);
    }
    
    f.addEventSource(new lambda_evnt_src.SqsEventSource(q))
  }
}

SqsLambdaを2つ分作成するスタックは次のようになります。
Firehoseの配信ストリーム名はstreaming-data-streamです。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as path from 'path';
import * as sqsLambda from './sqs-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';

const FIREHOSE_STREAM_NAME = "streaming-data-stream";

export class StreamingApp01Stack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);
    
    const firehoseAccessPolicy = new iam.PolicyStatement();
    firehoseAccessPolicy.addActions("firehose:PutRecord");
    firehoseAccessPolicy.addResources(`arn:aws:firehose:ap-northeast-1:${cdk.Stack.of(this).account}:deliverystream/${FIREHOSE_STREAM_NAME}`);

    const sqsLambda2 = new sqsLambda.SqsLambda(this, 'sqsLambda2', {
      lambdaCodePath: path.join(__dirname, "../resources/lambda2/"),
      lambdaReservedConcurrentExecutions: 3,
      lambdaEnv: {
        "FIREHOSE_STREAM_NAME": FIREHOSE_STREAM_NAME
      },
      lambdaRolePolicy: firehoseAccessPolicy
    });
    
    const sqsLambda1 = new sqsLambda.SqsLambda(this, 'sqsLambda1', {
      lambdaCodePath: path.join(__dirname, "../resources/lambda1/"),
      lambdaReservedConcurrentExecutions: 3,
      lambdaEnv: {
        "NEXT_QUEUE_URL": sqsLambda2.queueUrl
      },
      nextQueueArn: sqsLambda2.queueArn
    });
  }
}

Lambda1のソースコードは次のようになります。
受け取ったSQSメッセージのbodyをそのまま次のSQSキューへ送信します。
今回、メッセージの内容はjsonになります。

import json
import boto3
import os
import logging


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    record = event["Records"][0]
    body = json.loads(record["body"])
    logger.info("id: %d, value: %s", body["id"], body["value"])

    # 次のキューへメッセージ送信
    next_queue_url = os.environ.get("NEXT_QUEUE_URL")
    sqs = boto3.client("sqs")
    res = sqs.send_message(QueueUrl=next_queue_url, MessageBody=json.dumps(body))
    logging.info("sqs.send_message response: %s", res)

Lambda2のソースコードは次のようになります。
受け取ったSQSメッセージのbodyをFirehose配信ストリームへputします。

import json
import boto3
import os
import logging


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    record = event["Records"][0]
    body = json.loads(record["body"])
    logger.info("id: %d, value: %s", body["id"], body["value"])

    # Firehose配信ストリームへレコードを送信
    stream_name = os.environ.get("FIREHOSE_STREAM_NAME")
    firehose = boto3.client("firehose")
    res = firehose.put_record(
        DeliveryStreamName=stream_name,
        Record={"Data": json.dumps(body) + "\n"},
    )
    logger.info("firehose.put_record response: %s", res)

cdkコマンドでdeployして完了です。

$ cdk deploy --profile <profile>

動作確認

作成されたSQSキューに以下のメッセージを送信して動作確認を行います。

{
  "id": 1,
  "value": "test streaming data"
}

マネジメントコンソールから送信します。

60秒と少し時間を置いてからクエリエディタにてテーブルを確認すると、レコードが挿入されていることが確認できます。

60秒以内に複数のメッセージを送信し、中間送信先S3バケットに出力されるオブジェクトを確認することで、バッファが動作していることを確認できます。
idを2~4に変えたメッセージを60秒以内に送信した際に出力されるS3オブジェクトは次のようになります。

テーブルにも挿入されています。

もしもFirehoseで配信処理中にエラーとなった場合は、Firehoseの「送信先エラーログ」からエラーの内容を確認できます。

おわりに

今回は私が長い間使用しているSQSとLambdaを使用したアーキテクチャを取り上げてみました。
DLQ再処理について、以前は再処理用のスクリプトを書いていましたが今は「DLQ再処理の開始」ボタンがあるので、とても便利になったと感じます。
AWSにはストリーミングデータの処理に使用できるサービスが沢山用意されているので、今後取り上げていきたいです。