目次
はじめに
最近はもうすっかりデジタル社会になり、今こうしている間にも膨大なデータが生み出されてインターネット上を流れています。
スマホや時計などあらゆるものが複数のセンサーを積むことが当たり前になり、必要なデータから不要なデータまであらゆるデータを収集してBIシステムやExcelや人間がそこから有益な情報を取り出して、生活を便利にしたり楽しくするのに役立てています。
今回はそんなストリーミングデータを受信するシステムの話です。
AWSでストリーミングデータ処理
AWSにはストリーミングデータを処理する方法が沢山用意されています。
今回取り上げるのはSQSとLambdaを使用した次のようなアーキテクチャです。
データはSQSメッセージとして受けます。他のパターンについては今後記事にしたいと思います。
流れは次のようになります。
- SQSメッセージとしてストリーミングデータを受けます
- Lambdaイベントソースマッピングでメッセージを変換し、次のLambdaに渡すためSQSメッセージを送信します
- Lambdaイベントソースマッピングでメッセージを変換し、データをRedshiftへ保存するためFirehoseへ書き込みます
- FirehoseがRedshiftへデータを書き込みます
Redshiftは大量のinsertが苦手なので、Firehoseを手前に置きます。
Firehoseを使用すると複数データをひとつのS3オブジェクトにまとめ、バッファ設定に従ってRedshiftへCOPYコマンドで書き込むことができます。
バッファ設定は流量やデータサイズから設定しましょう。
このアーキテクチャの良いところ
次のような良い点があります。
- 流量の変化にLambdaの同時実行で対応できる
- データをSQSで滞留させられる
- メッセージの再処理が簡単
- SQSとLambdaを沢山繋げられる
ひとつずつ見ていきましょう。
流量の変化にLambdaの同時実行で対応できる
ストリーミングデータの流量変化に、Lambdaの同時実行で対応することができます。
パフォーマンステスト時にLambdaのメモリや同時実行数を大まかに決めることになりますが、あとあと変更が必要になった場合も簡単に対応できます。
予約可能なLambdaの同時実行数はアカウント当たり1000までなので、規模が大きくなる場合は上限緩和をリクエストしましょう。
データをSQSで滞留させられる
個人的にこれが一番好きな点です。
Lambda関数を更新する場合や、エラー調査のためにデータ処理を停止したい場合にSQSでメッセージを滞留させることができます。
手順としては、マネジメントコンソールからLambda関数を開き、トリガー設定からDisabledにするだけです。
メッセージの再処理が簡単
メッセージ削除前にLambdaが異常終了した場合、SQSに設定した可視性タイムアウトに従ってメッセージを再処理することができます。
Lambdaはタイムアウトが15分なので、可視性タイムアウトはそれ以上の値にしておくと安心です。
再処理回数はデッドレターキューの設定のところにある最大受信数から設定できます。
最大受信数を超えたメッセージはデッドレターキューに入りますが、デッドレターキューのマネジメントコンソール右上にある「DLQ再処理の開始」ボタンから簡単に再処理することができます。
SQSとLambdaを沢山繋げられる
アーキテクチャ図ではLambda関数が2つでしたが、同じ要領でいくつもLambda関数を増やすことができます。
データ分割を行ったり、実行する処理の種類によって関数を分けたりすることができます。
SQSのひとつのメッセージに対して複数の異なるLambda関数を適用したいときは、SNSを使用したファンアウトパターンが良いかもしれません。
このアーキテクチャの注意点
次のような注意点があります
- SQSメッセージは最大14日間しか保留できないので、LambdaでS3やDynamoDBなどのストレージにメッセージを保存したり、ログに出しておかない限り14日より前のメッセージが失われてしまいます。CloudWatchアラームを使用してメッセージがデッドレターキューに入ったことを見落とさないようにする必要があります。
- SQSメッセージは最大サイズが256KiBなので、これより大きなデータを扱う場合はS3にデータを置いてSQSメッセージのサイズを減らしましょう。S3イベント通知を使用してもよいですが、その場合はイベント通知の重複を考慮する必要があります。
- もしも流量が大きく変化しない場合はLambdaよりもEC2の方が安く済む場合があるので、事前に見積もりをして確認しましょう。
構築してみる
実際に上の図と同じアーキテクチャをAWS上に構築してみましょう。
今回はLambda関数とSQSをAWS CDKで構築します。
RedshiftはRedshift Serverlessを使用します。FirehoseとRedshift Serverlessは現在L1コンストラクトのみ存在しているため、マネジメントコンソールから作成します。
S3やVPCなどは既存のものを使用します。
RedshiftとFirehose
まずRedshift Serverlessでデータベースを作成します。
設定はカスタマイズを選択します。
関連付けられたIAMロールは「ロールを作成」から作成することができます。
その際にS3バケットを指定することで自動的にCOPYコマンドの実行に必要なポリシーを作成してくれます。
Firehose配信ストリームの配信先バケットとするS3バケットを選択しましょう。
FirehoseからRedshift Serverlessへアクセスするためにはパブリックアクセスが必要なので、ネットワーク設定ではパブリックサブネットを選択します。
RPUなどの設定を行い、設定を完了します(Redshiftは高価なのでRPUに注意しましょう)。
ワークグループが作成されたら、セキュリティグループの設定を開き、Firehoseからのアクセスを許可するインバウンドルールを追加します。
リージョン毎のFirehoseのIPアドレスは以下ドキュメントから確認できます。
Controlling Access with Amazon Kinesis Data Firehose - Amazon Kinesis Data Firehose
この時点では13.113.196.224/27
です。
ワークグループの「ネットワークとセキュリティ」から「パブリックにアクセス可能」設定をオンにして、ネットワークの設定は完了です。
次にクエリエディタを開き、テーブルを作成します。
今回はid
列とvalue
列を持ったstreaming_data
テーブルを作成します。
これでRedshiftの用意は完了です。
次にFirehose配信ストリームを作成します。
今回はLambdaからレコードを送信するので、ソースはDirect PUT
を選択します。
送信先はAmazon Redshift
です。
配信ストリーム名はstreaming-data-stream
とします。この名前は後ほどLambda関数作成時に使用します。
送信先タイプは「Serverlessワークグループ」を選択し、以降のワークグループやデータベースなどを埋めます。
S3の中間送信先として、Redshift作成時に選択したバケットを指定します。
今回はLambda関数からjsonデータをFirehoseに向けて送信するので、COPYコマンドにはjson 'auto'
オプションを指定します。
その他のオプションはRedshiftのドキュメントから確認することができます。
JSON 形式からの COPY - Amazon Redshift
今回は早く結果が確認できるように、バッファのヒントからバッファ間隔を60秒とします。
これでFirehoseの作成は完了です。
LambdaとSQS
図と同じになるように2つのLambda関数を使用する構成で作成します。
赤で囲ったSQSとLambda関数を1つのコンストラクトとして定義しています。
最初のLambda関数をLambda1、次のLambda関数をLambda2とします。
import * as cdk from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as lambda from 'aws-cdk-lib/aws-lambda'; import * as lambda_evnt_src from 'aws-cdk-lib/aws-lambda-event-sources'; import * as iam from 'aws-cdk-lib/aws-iam'; export interface SqsLambdaProps { lambdaCodePath: string, // Lambda関数のソースコードへのパス lambdaReservedConcurrentExecutions: number // Lambda関数の同時実行数 lambdaEnv?: {[key: string]: string}, // Lambda関数へ設定する環境変数 nextQueueArn?: string, // Lambda関数からメッセージ送信するSQS ARN lambdaRolePolicy?: iam.PolicyStatement // Lambda実行ロールに追加するIAMポリシー } export class SqsLambda extends Construct { queueUrl: string; queueArn: string; constructor(scope: Construct, id: string, props: SqsLambdaProps) { super(scope, id); const dlq = new sqs.Queue(this, 'Dlq', { visibilityTimeout: cdk.Duration.seconds(30) }); const q = new sqs.Queue(this, 'Sqs', { visibilityTimeout: cdk.Duration.seconds(900), deadLetterQueue: { maxReceiveCount: 5, queue: dlq } }); this.queueUrl = q.queueUrl; this.queueArn = q.queueArn; const f = new lambda.Function(this, 'Function', { code: lambda.Code.fromAsset(props.lambdaCodePath), handler: "lambda_function.lambda_handler", runtime: lambda.Runtime.PYTHON_3_11, reservedConcurrentExecutions: props.lambdaReservedConcurrentExecutions, environment: props.lambdaEnv }); if (props.hasOwnProperty("nextQueueArn") && props.nextQueueArn !== undefined) { // 次のSQSがある場合は送信用の権限を付与する const queueAccessPolicy = new iam.PolicyStatement(); queueAccessPolicy.addActions("sqs:SendMessage"); queueAccessPolicy.addResources(props.nextQueueArn); f.addToRolePolicy(queueAccessPolicy); } if (props.hasOwnProperty("lambdaRolePolicy") && props.lambdaRolePolicy !== undefined) { f.addToRolePolicy(props.lambdaRolePolicy); } f.addEventSource(new lambda_evnt_src.SqsEventSource(q)) } }
SqsLambda
を2つ分作成するスタックは次のようになります。
Firehoseの配信ストリーム名はstreaming-data-stream
です。
import * as cdk from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as path from 'path'; import * as sqsLambda from './sqs-lambda'; import * as iam from 'aws-cdk-lib/aws-iam'; const FIREHOSE_STREAM_NAME = "streaming-data-stream"; export class StreamingApp01Stack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const firehoseAccessPolicy = new iam.PolicyStatement(); firehoseAccessPolicy.addActions("firehose:PutRecord"); firehoseAccessPolicy.addResources(`arn:aws:firehose:ap-northeast-1:${cdk.Stack.of(this).account}:deliverystream/${FIREHOSE_STREAM_NAME}`); const sqsLambda2 = new sqsLambda.SqsLambda(this, 'sqsLambda2', { lambdaCodePath: path.join(__dirname, "../resources/lambda2/"), lambdaReservedConcurrentExecutions: 3, lambdaEnv: { "FIREHOSE_STREAM_NAME": FIREHOSE_STREAM_NAME }, lambdaRolePolicy: firehoseAccessPolicy }); const sqsLambda1 = new sqsLambda.SqsLambda(this, 'sqsLambda1', { lambdaCodePath: path.join(__dirname, "../resources/lambda1/"), lambdaReservedConcurrentExecutions: 3, lambdaEnv: { "NEXT_QUEUE_URL": sqsLambda2.queueUrl }, nextQueueArn: sqsLambda2.queueArn }); } }
Lambda1のソースコードは次のようになります。
受け取ったSQSメッセージのbodyをそのまま次のSQSキューへ送信します。
今回、メッセージの内容はjsonになります。
import json import boto3 import os import logging def lambda_handler(event, context): logger = logging.getLogger() logger.setLevel(logging.INFO) record = event["Records"][0] body = json.loads(record["body"]) logger.info("id: %d, value: %s", body["id"], body["value"]) # 次のキューへメッセージ送信 next_queue_url = os.environ.get("NEXT_QUEUE_URL") sqs = boto3.client("sqs") res = sqs.send_message(QueueUrl=next_queue_url, MessageBody=json.dumps(body)) logging.info("sqs.send_message response: %s", res)
Lambda2のソースコードは次のようになります。
受け取ったSQSメッセージのbodyをFirehose配信ストリームへputします。
import json import boto3 import os import logging def lambda_handler(event, context): logger = logging.getLogger() logger.setLevel(logging.INFO) record = event["Records"][0] body = json.loads(record["body"]) logger.info("id: %d, value: %s", body["id"], body["value"]) # Firehose配信ストリームへレコードを送信 stream_name = os.environ.get("FIREHOSE_STREAM_NAME") firehose = boto3.client("firehose") res = firehose.put_record( DeliveryStreamName=stream_name, Record={"Data": json.dumps(body) + "\n"}, ) logger.info("firehose.put_record response: %s", res)
cdkコマンドでdeployして完了です。
$ cdk deploy --profile <profile>
動作確認
作成されたSQSキューに以下のメッセージを送信して動作確認を行います。
{ "id": 1, "value": "test streaming data" }
マネジメントコンソールから送信します。
60秒と少し時間を置いてからクエリエディタにてテーブルを確認すると、レコードが挿入されていることが確認できます。
60秒以内に複数のメッセージを送信し、中間送信先S3バケットに出力されるオブジェクトを確認することで、バッファが動作していることを確認できます。
id
を2~4に変えたメッセージを60秒以内に送信した際に出力されるS3オブジェクトは次のようになります。
テーブルにも挿入されています。
もしもFirehoseで配信処理中にエラーとなった場合は、Firehoseの「送信先エラーログ」からエラーの内容を確認できます。
おわりに
今回は私が長い間使用しているSQSとLambdaを使用したアーキテクチャを取り上げてみました。
DLQ再処理について、以前は再処理用のスクリプトを書いていましたが今は「DLQ再処理の開始」ボタンがあるので、とても便利になったと感じます。
AWSにはストリーミングデータの処理に使用できるサービスが沢山用意されているので、今後取り上げていきたいです。