使用 AWS Step Functions 协调依赖的文件上传
作者: Benjamin Smith,2023 年 11 月 2 日 于 AWS Step Functions Serverless永久连结 分享
主要重点
本文介绍如何使用 AWS Step Functions 管理文件上传之间的依赖关系。AWS Step Functions 可以协调无伺服器架构中对应档案的处理。利用事件驱动架构,实现不同团队独立工作的情况下的文件依赖管理。Amazon S3 是一项许多客户用于文件存储的物件存储服务。使用 Amazon S3 事件通知 或 Amazon EventBridge,客户可以构建基于 事件驱动架构EDA的工作负载。这种架构对 S3 存储桶中的物件变更事件作出反应。
EDA 涉及系统元件之间的非同步通信,这样可以将元件解耦,使每个元件都能独立运行。
然而,某些场景可能会因事件之间的依赖关系而引入架构的耦合。本文将介绍这种耦合的一个常见示例,以及如何使用 AWS Step Functions 来处理。
概览
在这个示例中,一个组织有两个分布式自治团队销售团队和仓储团队。每个团队都负责每月将数据文件上传到 S3 存储桶,供后续处理。
上传文件时会生成事件,这启动了后续处理。仓储文件的处理工作会清理数据并将其与运输团队的数据结合。销售文件的处理则将数据与合并的仓储及运输数据相关联,这使得分析人员能够进行预测并获得其他见解。
为了实现这种关联,仓储文件必须在销售文件之前处理。由于这两个团队都是独立的,因此团队之间没有协调,这意味著文件可以随时上传,无法保证仓储文件在销售文件之前被处理。
对于这样的场景,可以使用 聚合器模式。该模式会收集并存储事件,并根据组合事件触发新的事件。在上述情境中,组合事件就是已处理的仓储文件和上传的销售文件。
聚合器模式的要求包括:
关联 一种对相关事件进行分组的方式。这可以通过文件名中的唯一识别符实现。事件聚合器 用于存储事件的有状态存储。完成检查及触发 当收到组合事件后的条件和发布结果事件的方式。架构概览
该架构使用了以下 AWS 服务:
Amazon DynamoDB 作为事件聚合器。Step Functions 用于协调工作流。AWS Lambda 用于解析文件名并提取关联识别符。AWS 无伺服器应用程序模型 (AWS SAM) 用于基础架构即代码和部署。
文件上传: 销售和仓储团队将各自的文件上传至 S3。
EventBridge: 物件创建事件会发送到 EventBridge,该处有一个针对主工作流的规则。主状态机: 这个状态机协调聚合器操作和文件处理工作流,并将文件的工作流与聚合逻辑分开。文件解析与关联: 在此 Lambda 函数中执行业务逻辑以识别文件及其类型。有状态存储: DynamoDB 表存储有关文件的信息,例如文件名、类型和处理状态。状态机会从 DynamoDB 表读取和写入。同时,任务 Token 也在此表中保存。文件处理: 根据文件类型和任何前置条件,运行对应于文件类型的状态机。这些状态机包含处理特定文件的逻辑。任务令牌与回调: 当依赖文件尝试在独立文件之前进行处理时,会生成任务令牌。Step Functions 的 “等待回调” 模式在独立文件处理后继续执行依赖文件。操作指南
您需要以下前提条件:
安装好的 AWS CLI 和 AWS SAM CLI。一个 AWS 帐户。足够的权限来管理 AWS 资源。安装好的 Git。要部署示例,请遵循 GitHub 仓库 中的说明。
这个指南展示了如果依赖文件销售文件在独立文件仓储文件之前上传会发生什么情况。
工作流以将销售文件上传到专用销售 S3 存储桶开始。由于假设销售和仓储团队是分散和自主的,因此此示例使用了两个文件的独立 S3 存储桶。您可以在代码库中找到 示例文件。将文件上传至 S3 会向 EventBridge 发送事件,聚合器状态机将对其进行操作。EventBridge 规则中使用的事件模式为:
json{ detailtype [Object Created] source [awss3] detail { bucket { name [salesmfueda09092023 warehousemfueda09092023] } reason [PutObject] }}
聚合器状态机开始时调用文件解析 Lambda 函数。该函数解析文件类型并使用标识符进行文件关联。在此示例中,文件名包含文件类型和关联识别符年份月份。要使用其他方式表示文件类型和关联识别符,您可以修改此函数以解析该信息。
状态机的下一步在事件聚合器 DynamoDB 表中插入事件的记录。该表具有 复合主键,其中关联识别符是分区键,文件类型是排序键。文件的处理状态被跟踪,以便提供有关工作流程状态的反馈。
根据文件类型,状态机决定要遵循哪个分支。在示例中,运行销售分支。状态机使用关联识别符从 DynamoDB 获取依赖仓储文件的状态。根据此查询的结果,状态机将确定对应的仓储文件是否已经处理。
云梯加速器永久免费版由于仓储文件尚未处理,遂使用 waitForTaskToken 整合模式。状态机在此步骤中保持等待并创建任务令牌,外部服务将利用该令牌触发状态机继续执行。DynamoDB 表中的销售记录与任务令牌一起进行更新。

前往 S3 控制台,向仓储 S3 存储桶上传示例仓储文件。这将调用 Step Functions 工作流的新实例,该工作流将在文件类型选择步骤之后进入另一分支。在此分支中,运行仓储状态机并在 DynamoDB 中更新文件的处理状态。
当仓储文件的状态变更为 “已完成” 时,仓储状态机检查 DynamoDB 是否存在未完成的销售文件。如果存在一个,则检索任务令牌并调用 SendTaskSuccess 方法。这将触发销售状态机,让其从等待状态继续执行。销售状态机启动,处理状态也会得到更新。
结论
本文展示了如何在事件驱动架构中处理文件依赖性。您可以根据自己的需求自定义代码库中提供的示例。
此解决方案特别针对事件驱动架构中的文件依赖性。欲了解有关解决事件依赖性和聚合器更多信息,请阅读文章:无伺服器事件聚合器的事件驱动架构转换。
要了解有关事件驱动架构的更多信息,请访问 Serverless Land 的事件驱动架构部分。
标签:贡献 无伺服器
发表评论