薄っぺらりん

厚くしていきたい

AWSでストリーミングデータ処理 Step Functions編

目次

はじめに

前回はSQSとLambda関数を使用したストリーミングデータ処理アーキテクチャを取り上げました。

kkmtyyz.hatenablog.com

AWSには他にも沢山のサービスが存在し、無数に組み合わせることができます。
今回は前回とはまた違ったストリーミングデータ処理アーキテクチャを取り上げてみます。

アーキテクチャについて

Step Functionsをひとつの要素とした全体的なアーキテクチャと、Step Functionsのステートマシン内のアーキテクチャの2つに分けて記載します。

全体的なアーキテクチャ

今回取り上げるのはStep Functionsを使用した次のようなアーキテクチャです。

全体的なアーキテクチャ

流れは次のようになります。

  1. SQSメッセージとしてストリーミングデータを受けます
  2. Lambda関数からStep Functionsステートマシンを実行します
  3. ステートマシンでデータ変換や分割、アラート検知などを行います
  4. 全てのデータをFirehoseでRedshiftに書き込みます

加えて、ステートマシン内でアラート検知された場合は次の出力も行います。

  • アラート検知したことをSNSからメールで通知します
  • アラート検知したデータをDynamoDBに記録します

上記アーキテクチャの特筆すべき点として、Step Functionsのワークフロータイプがあります。
Step Functionsには標準ワークフローとExpressワークフローがあり、ドキュメントにも記載されている通りストリーミングデータ処理には後者が向いています。
今回のアーキテクチャでは処理に失敗したデータの再処理という観点からも、Expressワークフローを採用する必要があり、次の特性が重要になります。

  • 実行時間が最大5分間
  • ステートマシンの同期実行が可能

再処理の仕組みは前回のアーキテクチャと同様です。
ステートマシンの実行が失敗した場合にLambda関数が異常終了することでSQSメッセージがデッドレターキューへ送られ、再処理の際には「DLQ再処理の開始ボタン」から元のキューへメッセージを送信します。
そのため、Lambda関数の最大実行時間である15分以内にステートマシンの実行が終了する必要があり、かつステートマシンの終了ステータスを受け取れるように同期実行を行う必要があります。それを実現するためにはExpressワークフローを採用する必要があります。

docs.aws.amazon.com

ステートマシンのアーキテクチャ

ステートマシンを見る前に、今回例として扱う入力データとアラート条件について簡単に説明します。
入力データは16進数で表現されているバイナリデータになります。
複数レコード分のデータが1つのデータとしてステートマシンに入力されます。
1レコード分は10バイトで、頭2バイトがID、残り8バイトがValueを表現しています。
例えば3レコード分のデータを持った入力データの場合は次の図のように解釈します。

入力データのデコード

アラート条件はValueの値が「test_2」の場合にアラートとし、メールの送信とDynamoDBへのputを行います。
入力データの説明はここまでです。

いよいよステートマシンを見ていきます。
ステートマシンのアーキテクチャは次のようになっています。

ステートマシンのアーキテクチャ

流れとしては次のようになります。

  1. 入力されたバイナリ文字列を最初のLambda関数がデコードし、jsonオブジェクトとした各レコードを配列として出力します
  2. Mapステートが配列の各レコードを並行処理します
  3. Lambda関数「Process」にて現在日時を項目「datetime」としてjsonオブジェクトに追加します(データ変換などを行う場合もここで行います)
  4. Choiceステートでアラート検出を行います。value == "test_2"の場合にアラートとしてParallelステートへ分岐します
    1. Parallelステートでは次の2つの処理を並行実行します
      • SNSトピックをpublishします
      • DynamoDBへputします
  5. PassステートでFirehoseへPutRecordBatchできる形に成形します
  6. PutRecordBatchでFirehoseへレコードを送信します

実際にデータが変換されていく様子を図に追記したものを次に示します。

入出力

Passステートへの入力がChoiceステートの分岐に関わらず同じ形になるように、Parallelステートの出力は入力をそのまま出力するようにしています。
Passでは最後のPutRecordBatchのAPIリクエストに合うように、Data項目に1レコード分のjson文字列を持たせたオブジェクトに変換しています。
PutRecordBatchのリクエスシンタックスは次のような形です。PutRecordBatchではMapの出力配列をRecordsの値として埋め込むようにします。

{
   "DeliveryStreamName": "string",
   "Records": [ 
      { 
         "Data": blob
      }
   ]
}

docs.aws.amazon.com

前回のアーキテクチャとの比較

前回のアーキテクチャと比べて次のような相違点があります。

  • データ処理の流れを把握しやすい
  • コード量を削減できる
  • 最大実行時間は5分間

ひとつずつ見ていきましょう。

データ処理の流れを把握しやすい

前回のアーキテクチャはSQSとLambda関数を数珠繋ぎにした構成でした。個々の部品としては非常に分かりやすいですが、データ処理の流れが把握しづらいという欠点があります。Lambda関数の実行順序や入出力を表現するために、全体を俯瞰する図を作成して管理していく必要があります。
一方で今回のアーキテクチャはStep Functionsを使用しているため、データ処理の流れをグラフとして確認することができます。CloudWatch Logsに実行ログが残っていれば、各ステップの入出力も合わせて確認することができます。次の画像ではPutRecordBatchステップの入力を表示しています。これは新しくチームに加わったメンバーが処理の流れを理解するのにとても役立ちます。

ステップの入出力確認

コード量を削減できる

前回のアーキテクチャは入出力の成形や各種AWSリソースのAPIリクエストなどを全てLambda関数に記述する必要がありました。もちろんLambdaレイヤーなどを使用してコードの共通化を図るため、Lambda関数が増える度に都度記述するようなことはないですが、それでも1度は記述する必要があります。
一方で今回のアーキテクチャはStep Functionsにあらかじめ用意されている便利なフローやアクションを使用することができるため、簡単な入出力の変換やAWSリソースへのAPIリクエストは開発せずに済みます。ステートマシン自体の開発は次の画像のようにWorkflow Studioを使用することで直感的に行えるため、長々としたjsonを記述する必要もありません。

Workflow Studio

今回の例で使用しているFirehoseのPutRequestBatchのように、APIリクエストのフォーマットによってはやや工夫が必要になるケースもありますが、Step Functionsに用意されている組み込み関数を使用することである程度柔軟に対応することができます。
様々な入出力の変換機能は便利ですが、あくまでもIaCの方(今回だとCDK)に記述するため、あまり手の込んだことをやり過ぎないようにしたいですね。

docs.aws.amazon.com

最大実行時間は5分間

前回のアーキテクチャは各Lambda関数が15分間実行できました。
今回のアーキテクチャはステートマシンが5分間実行できます。
どちらも数珠繋ぎにすれば柔軟に対応することができますが、5分間「も」なのか5分間「しか」なのかは用途によるかもしれません。
Expressワークフローではなく標準ワークフローであれば実行時間はほとんど考慮する必要がなくなりますが、同期実行ができないので、データの再処理方法については外部に状態を持たせるなど別な方法が必要になります。

このアーキテクチャの注意点

次のような注意点があります

  • 部分的な再処理ができない
  • Map内で外部へデータ出力を行うと中途半端な出力となる
  • ステートマシンを同期実行するためSDKの設定に注意

部分的な再処理ができない

前回のアーキテクチャは各Lambda関数の手前にSQSがあるため、どこからでも再処理を行うことができました。
今回のアーキテクチャはステートマシンの実行単位でのみ再処理が可能であるため、ステートマシンの途中から再処理を行うことができません。
例えば次の図において、Lambda_Aのみ再処理を行いたい場合、同じ入力データに関して出力Aのみならず他の全ての出力データの削除を行い、全体の再実行が必要となります。そもそもそうならないように作っておくことが大切ですが、一連の処理全体が正常終了した後に、特定の入力データに関して一部の処理のみを変更して再処理したい場合などがこれに当たります。

部分的な再処理

Map内で外部へデータ出力を行うと中途半端な出力となる

Mapステートは並行実行しているいずれかの処理が失敗した場合、Mapステート全体がすぐに失敗します。 そのためMapステート内で外部リソースへデータ出力を行っている場合、失敗するまでに処理された入力の一部だけが出力されることになります。
失敗した原因を突き止めた後、同じ入力データの再処理を行う前に、既に出力されてしまった一部のデータの削除が必要となるかもしれません。 そのような中途半端な出力を避ける方法のひとつとして、今回のPutRecordBatchのようにMap処理後にまとめて出力する方法が考えられます。
今回は動作確認の「Mapの異常終了により中途半端なデータ出力となる場合」でこのパターンを実験してみます。

ステートマシンを同期実行するためSDKの設定に注意

Expressワークフローの実行終了を待つためステートマシンを実行するプログラムはAPIタイムアウトやリトライの設定に注意が必要です。
boto3の場合はタイムアウトがデフォルトで1分なので、Expressワークフローの最大実行時間にあわせて5分以上に変更する必要があります。

botocore.amazonaws.com

構築してみる

ここまで見てきた図と同様のアーキテクチャを構築します。
入力データによってMapステート内で次のように分岐するようにします。

  • value == test_2の場合にDynamoDBへのPutと、SNSでのメール通知を行う
  • id == 5の場合に例外を投げる

最終的にデータを挿入するRedshiftのテーブル定義は次のようになります。

create table streaming_data (id int, value varchar(20), datetime timestamp);

今回もAWS CDKを使用して構築します。
前回と同様にRedshift Serverlessとそれに対応するFirehoseは現在L1コンストラクトのみ存在しているため、マネジメントコンソールから作成します。 ステートマシンをトリガーするSQSとLambda関数は前回作成したSqsLambdaコンストラクトを使用します。

CDKのコード

SqsLambdaコンストラクトは前回と同じで次のようになります。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as lambda_evnt_src from "aws-cdk-lib/aws-lambda-event-sources";
import * as iam from "aws-cdk-lib/aws-iam";

export interface SqsLambdaProps {
  lambdaCodePath: string; // Lambda関数のソースコードへのパス
  lambdaReservedConcurrentExecutions: number; // Lambda関数の同時実行数
  lambdaEnv?: { [key: string]: string }; // Lambda関数へ設定する環境変数
  nextQueueArn?: string; // Lambda関数からメッセージ送信するSQS ARN
  lambdaRolePolicy?: iam.PolicyStatement; // Lambda実行ロールに追加するIAMポリシー
}

export class SqsLambda extends Construct {
  queueUrl: string;
  queueArn: string;

  constructor(scope: Construct, id: string, props: SqsLambdaProps) {
    super(scope, id);

    const dlq = new sqs.Queue(this, "Dlq", {
      visibilityTimeout: cdk.Duration.seconds(30),
    });

    const q = new sqs.Queue(this, "Sqs", {
      visibilityTimeout: cdk.Duration.seconds(360),
      deadLetterQueue: {
        maxReceiveCount: 1,
        queue: dlq,
      },
    });
    this.queueUrl = q.queueUrl;
    this.queueArn = q.queueArn;

    const f = new lambda.Function(this, "Function", {
      code: lambda.Code.fromAsset(props.lambdaCodePath),
      handler: "lambda_function.lambda_handler",
      runtime: lambda.Runtime.PYTHON_3_11,
      reservedConcurrentExecutions: props.lambdaReservedConcurrentExecutions,
      environment: props.lambdaEnv,
      timeout: cdk.Duration.minutes(6),
    });
    if (
      props.hasOwnProperty("nextQueueArn") &&
      props.nextQueueArn !== undefined
    ) {
      // 次のSQSがある場合は送信用の権限を付与する
      const queueAccessPolicy = new iam.PolicyStatement();
      queueAccessPolicy.addActions("sqs:SendMessage");
      queueAccessPolicy.addResources(props.nextQueueArn);
      f.addToRolePolicy(queueAccessPolicy);
    }
    if (
      props.hasOwnProperty("lambdaRolePolicy") &&
      props.lambdaRolePolicy !== undefined
    ) {
      f.addToRolePolicy(props.lambdaRolePolicy);
    }

    f.addEventSource(new lambda_evnt_src.SqsEventSource(q));
  }
}

DynamoDBテーブル、SNSトピック、Lambda関数、Step Functionsステートマシンは次のようになります。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as path from "path";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as sns from "aws-cdk-lib/aws-sns";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as logs from "aws-cdk-lib/aws-logs";

export interface StreamingStateMachineProps {
  firehoseStreamName: string;
}

export class StreamingStateMachine extends Construct {
  stateMachineArn: string;

  constructor(scope: Construct, id: string, props: StreamingStateMachineProps) {
    super(scope, id);

    /*
     * SNS
     */
    const alertTopic = new sns.Topic(this, "AlertTopic");

    /*
     * DynamoDB
     */
    const alertTable = new dynamodb.TableV2(this, "AlertTable", {
      partitionKey: { name: "Id", type: dynamodb.AttributeType.NUMBER },
      sortKey: { name: "Datetime", type: dynamodb.AttributeType.STRING },
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    /*
     * Lambda
     */
    const binaryDecodeFunction = new lambda.Function(
      this,
      "BinaryDecodeFunction",
      {
        code: lambda.Code.fromAsset(
          path.join(__dirname, "../resources/BinaryDecodeLambda/")
        ),
        handler: "lambda_function.lambda_handler",
        runtime: lambda.Runtime.PYTHON_3_11,
        reservedConcurrentExecutions: 3,
        timeout: cdk.Duration.minutes(1),
      }
    );

    const processFunction = new lambda.Function(this, "ProcessFunction", {
      code: lambda.Code.fromAsset(
        path.join(__dirname, "../resources/ProcessLambda/")
      ),
      handler: "lambda_function.lambda_handler",
      runtime: lambda.Runtime.PYTHON_3_11,
      reservedConcurrentExecutions: 5,
      timeout: cdk.Duration.minutes(2),
    });

    /*
     * Step Functions
     */
    const binaryDecodeFunctionInvoke = new tasks.LambdaInvoke(
      this,
      "Decode Binary String",
      {
        lambdaFunction: binaryDecodeFunction,
        outputPath: "$.Payload",
      }
    );

    const processFunctionInvoke = new tasks.LambdaInvoke(this, "Process", {
      lambdaFunction: processFunction,
      outputPath: "$.Payload",
    });

    const alertTopicPublish = new tasks.SnsPublish(
      this,
      "Publish Alert Topic",
      {
        topic: alertTopic,
        subject: "streaming data alert",
        message: sfn.TaskInput.fromJsonPathAt("$"),
      }
    );

    const alertTablePutItem = new tasks.DynamoPutItem(this, "Put Alert Item", {
      table: alertTable,
      item: {
        Id: tasks.DynamoAttributeValue.numberFromString(
          sfn.JsonPath.jsonToString(sfn.JsonPath.objectAt("$.id"))
        ),
        Datetime: tasks.DynamoAttributeValue.fromString(
          sfn.JsonPath.stringAt("$.datetime")
        ),
        Value: tasks.DynamoAttributeValue.fromString(
          sfn.JsonPath.stringAt("$.value")
        ),
      },
    });

    const transformForPutRecordBatch = new sfn.Pass(
      this,
      "Transform For PutRecordBatch",
      {
        parameters: {
          Data: sfn.JsonPath.jsonToString(sfn.JsonPath.objectAt("$")),
        },
      }
    );

    const putRecordBatch = new tasks.CallAwsService(this, "PutRecordBatch", {
      service: "firehose",
      action: "putRecordBatch",
      parameters: {
        DeliveryStreamName: props.firehoseStreamName,
        Records: sfn.JsonPath.stringAt("$"),
      },
      iamResources: [
        `arn:aws:firehose:ap-northeast-1:${
          cdk.Stack.of(this).account
        }:deliverystream/${props.firehoseStreamName}`,
      ],
      iamAction: "firehose:putRecordBatch",
    });

    const map = new sfn.Map(this, "Map");
    const choice = new sfn.Choice(this, "Choice");
    const condition = sfn.Condition.stringEquals("$.value", "test_2");
    const parallel = new sfn.Parallel(this, "Parallel", {
      resultPath: sfn.JsonPath.DISCARD,
    });

    parallel
      .branch(alertTopicPublish)
      .branch(alertTablePutItem)
      .next(transformForPutRecordBatch);

    choice
      .when(condition, parallel)
      .otherwise(transformForPutRecordBatch)
      .afterwards();

    map.iterator(processFunctionInvoke.next(choice));

    const definition = binaryDecodeFunctionInvoke
      .next(map)
      .next(putRecordBatch);

    const stateMachine = new sfn.StateMachine(this, "StateMachine", {
      definitionBody: sfn.DefinitionBody.fromChainable(definition),
      stateMachineType: sfn.StateMachineType.EXPRESS,
      logs: {
        destination: new logs.LogGroup(this, "stateMachineLogGroup"),
        level: sfn.LogLevel.ALL,
        includeExecutionData: true,
      },
    });

    this.stateMachineArn = stateMachine.stateMachineArn;
  }
}

スタックは次のようになります。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as path from "path";
import * as sqsLambda from "./sqs-lambda";
import * as iam from "aws-cdk-lib/aws-iam";
import * as stateMachine from "./streaming-state-machine";

const FIREHOSE_STREAM_NAME = "streaming-data-stream";

export class StreamingApp02Stack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const streamingStateMachine = new stateMachine.StreamingStateMachine(
      this,
      "StreamingStateMachine",
      {
        firehoseStreamName: FIREHOSE_STREAM_NAME,
      }
    );

    // ステートマシン同期実行用IAMポリシー
    const stateMachineAccessPolicy = new iam.PolicyStatement();
    stateMachineAccessPolicy.addActions("states:StartSyncExecution");
    stateMachineAccessPolicy.addResources(
      streamingStateMachine.stateMachineArn
    );

    const triggerSqsLambda = new sqsLambda.SqsLambda(this, "TriggerSqsLambda", {
      lambdaCodePath: path.join(
        __dirname,
        "../resources/StateMachineTriggerLambda/"
      ),
      lambdaReservedConcurrentExecutions: 3,
      lambdaEnv: {
        STATE_MACHINE_ARN: streamingStateMachine.stateMachineArn,
      },
      lambdaRolePolicy: stateMachineAccessPolicy,
    });
  }
}

各Lambda関数のコード

各Lambda関数のコードは次のようになります。

"""
StateMachineTriggerLambda
ステートマシンを実行するLambda関数のソースコード
"""


import json
import logging
import boto3
import os
from botocore.client import Config


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    input_data = event["Records"][0]["body"]
    logger.info("input_data=%s", input_data)

    # boto3のリクエストタイムアウトを600秒に設定
    config = Config(read_timeout=600)
    sfn = boto3.client("stepfunctions", config=config)

    output = {"hex_str": input_data}
    state_machine_arn = os.environ.get("STATE_MACHINE_ARN")
    # 同期実行
    res = sfn.start_sync_execution(
        stateMachineArn=state_machine_arn,
        input=json.dumps(output),
    )
    logging.info("sfn.start_sync_execution: response=%s", res)
    if "status" in res:
        if res["status"] == "FAILED":
            logging.error("sfn.start_sync_execution failed: cause=%s", res["cause"])
            raise
"""
BinaryDecodeLambda
16進数で表現される入力データをjsonオブジェクトにデコードするLambda関数のソースコード
"""


import logging
from typing import List, Dict

HEX_RECORD_LEN = 20  # 1レコードの長さ(16進数20桁)


class Record:
    def __init__(self, id: int, value: str):
        self.id = id
        self.value = value

    def to_dict(self) -> Dict:
        return {"id": self.id, "value": self.value}


def decode(hex_str: str) -> List[Record]:
    logger = logging.getLogger()
    records: List[Record] = []
    for record in [
        hex_str[idx : idx + HEX_RECORD_LEN]
        for idx in range(0, len(hex_str), HEX_RECORD_LEN)
    ]:
        logger.info("raw_record=%s", record)
        """
        record: 10Byte
          id: 2Byte
          value: 8Byte
        """
        id = int(record[0:4], 16)
        value = bytes.fromhex(record[4:]).decode().lstrip("\0")
        logger.info("id=%d, value=%s", id, value)
        records.append(Record(id, value))

    return records


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    input_hex_str = event["hex_str"]
    logger.info("input_hex_str=%s", input_hex_str)

    records = decode(input_hex_str)
    output = [record.to_dict() for record in records]
    logger.info("output=%s", output)

    return output
"""
ProcessLambda
入力データに`datetime`項目を追加するLambda関数のソースコード
idが5の場合に例外を投げて処理を失敗させる
"""


import logging
import datetime


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    if "id" in event:
        if event["id"] == 5:
            logger.error("process failed: event=%s", event)
            raise

    output = event
    output["datetime"] = datetime.datetime.now().isoformat(" ")
    logger.info("ouptut=%s", output)

    return output

cdkコマンドでdeployして完了です。

$ cdk deploy --profile <profile>

FirehoseからRedshift Serverlessへ配信されない場合の確認点

ステートマシンでPutRecordBatchへ入力するリクエストに不備などがあると、Firehose配信ストリームからRedshift Serverlessへ配信が失敗します。その際の確認点を紹介します。

  • FirehoseのエラーログにAn internal error occurred when attempting to deliver data.と表示される場合
    • リクエストに含まれるデータに誤りがある可能性があります。copyコマンドを実行できるjsonのフォーマットになっているか確認してみるとよいかもしれません。
  • Firehoseのエラーログに何も出力されず、Redshiftにもレコードが挿入されていない場合
    • Redshift Serverlessは一部のシステムテーブルやビューをクエリできないため、stl_errorビューをクエリするとpermission deniedとなります。Redshift Serverlessでは名前空間の以下設定からいくつかの監査ログをCloudWatch Logsに出力することができます。Firehoseにより実行されたcopyコマンドはuser activity logに出力されるため、rollbackしてるかどうかなどが確認できます。rollbackの理由などはsys_load_error_detailビューをクエリすることで確認することができます。

Redshift Serverless 監査ログ設定

動作確認

以下の2つのパターンについて動作確認を行います。

  • 正常終了する場合
  • Mapの異常終了により中途半端なデータ出力となる場合

入力データを作成するプログラムは以下になります。

"""
入力データを作成するプログラム
"""


class Record:
    def __init__(self, id: int, value: str):
        self.id = id
        self.value = value

    def to_hex_str(self) -> str:
        """
        id: 2Byte
        value: 8Byte
        """
        id_hex = self.id.to_bytes(2, "big").hex()
        value = self.value.encode()[0:8].rjust(8, b"\0").hex()
        return id_hex + value


# 正常終了する場合の入力データ
input_data = (
    Record(id=1, value="test_1").to_hex_str()
    + Record(id=2, value="test_2").to_hex_str()
    + Record(id=3, value="test_3").to_hex_str()
)

# Mapの異常終了により中途半端なデータ出力となる場合
# input_data = (
#    Record(id=4, value="test_2").to_hex_str()
#    + Record(id=5, value="test_2").to_hex_str()
#    + Record(id=6, value="test_2").to_hex_str()
# )

print(input_data)

正常終了する場合

入力データは次のようになります。

00010000746573745f3100020000746573745f3200030000746573745f33

SQSキューにペーストして実行します。

ステートマシンの実行結果から成功したことが確認できます。

value == test_2のレコードがparallelの方に分岐していることが確認できます。

DynamoDBテーブルにはtest_2のレコードがputされています。

SNSトピックにサブスクライブしたメールアドレスには通知メールが届いています。

Redshiftのstreaming_dataテーブルには全てのレコードが挿入されています。

入力に含まれる3レコードが全て挿入されており、期待する動作が行われたことが確認できました。

Mapの異常終了により中途半端なデータ出力となる場合

入力データは次のようになります。

00040000746573745f3200050000746573745f3200060000746573745f32

jsonにすると次のようになります。

[
  { "id": 4, "value": "test_2" },
  { "id": 5, "value": "test_2" },
  { "id": 6, "value": "test_2" }
]

全てのレコードがtest_2なので、正常終了すればDynamoDBのテーブルには3レコード分putされ、通知メールも3レコード分送信されることになります。
しかし、2レコード目のidが5なので、Map内のProcessで例外となりステートマシンの実行が失敗します。
2レコード目の実行失敗後すぐに、並行して実行されている1レコード目と3レコード目の処理が停止するはずで、どこまで処理が進んでいるか予測できませんが、運が良ければDynamoDBへのレコードputやSNSメール通知が部分的に実行される様子を観測できるはずです。

では、SQSキューにペーストして実行します。

ステートマシンの実行が失敗したことが確認できます。

ステップ毎の実行結果を見ると、Mapで失敗しPutRecordBatchが実行されなかったことが確認できます。
また、Mapの#1id == 5の2レコード目であり、想定通りProcessで失敗していることが確認できます。
他の1レコード目と3レコード目はDynamoDBへのputのみが成功しており、SNSトピックへの通知は中断されたことが分かります。
もしも再処理を行う場合に、DynamoDBに残された1レコード目と3レコード目のitemが見過ごせない場合は、それらを削除してからデッドレターキューのメッセージを再処理することになります。

DynamoDBのテーブルには確かにidが46のレコードがputされています。

SNSトピックからの通知メールは届いていないことが確認できます。

Redshiftのstreaming_dataテーブルにはレコードが挿入されていないことが確認できます。

実行が失敗しているため、SQSデッドレターキューにメッセージが送られています。

メッセージの内容から失敗した入力データであることが確認できます。

これでMapの異常終了により中途半端なデータ出力となることが確認できました。

おわりに

今回は主にStep Functionsを使用してストリーミングデータ処理を行うアーキテクチャを紹介しました。
前回のSQSとLambda関数を使用したアーキテクチャは1回のデータ処理が少し長めのやや重たい処理に向いており、出力データの重複を避けたり、データの細かな再処理が求められるシステムに向いていました。
対して今回のアーキテクチャは1回のデータ処理がもっと軽めで、出力データの重複や少しの不整合を許容できるシステムに向いていると思います。 Step Functionsはデータ処理の流れを直感的に把握でき、コード量の削減もできるため非常に便利です。
記事執筆の検証前は今回のステートマシンがCDKで記述できるか懸念していましたが、Step Functionsのほとんどの機能に対応しており、今回はステートマシン全体を記述することができました。Step Functionsの機能で対応していないのは比較的新しい機能であるDistributed Mapくらいのようです。githubのissueを見たところDistributed Mapは簡単な問題ではないらしく1年ほど検討がなされているようです。とはいえ、CDKは頻繁にアップデートされているので、そう遠くない未来に実装されるような気もします。
AWSにはストリーミングデータ処理に使用できるサービスが沢山用意されているので、データ処理の観点以外からも今後記事を書いていきたいです。