池州市减裂仙境150号 +13594780367 chastened@mac.com

经典案例

  • Home
  • 最佳实践:使用 Amazon Redshift Streaming Ingestion 和 Amaz

最佳实践:使用 Amazon Redshift Streaming Ingestion 和 Amaz

2026-01-27 14:25:58 23

使用亚马逊Redshift实现近实时分析的最佳实践

关键要点

在本篇文章中,我们讨论了在使用亚马逊MSK的情况下,如何通过亚马逊Redshift的流式数据摄取功能实现近实时分析的最佳实践,包括: 如何配置一个基于MSK主题的数据流式管道 使用JSON点表示法来解嵌JSON数据 实施增量数据加载策略 监控流式摄取过程中的失败情况

亚马逊红移Amazon Redshift是一个完全托管的、可扩展的云数据仓库,可以快速、直接、安全地进行大规模分析,使客户在分析数百PB数据时提升洞察力。借助 Amazon Redshift 流式摄取功能,您可以近实时更新分析数据库,简化数据管道。本文将介绍通过MSK实现数据流的最佳实践和实际操作步骤。

解决方案概述

我们将通过一个从MSK主题摄取数据到亚马逊Redshift的示例管道进行说明,同时展示如何在亚马逊Redshift中使用点表示法解嵌JSON数据。以下图示展示了解决方案架构:

数据处理流程包括以下步骤:

在Redshift集群中创建流式物化视图,以从MSK主题中消费实时流数据。使用存储过程在摄取的MSK主题上实现变更数据捕获CDC,根据Kafka分区和Kafka偏移量的唯一组合进行记录级别的捕获。在Redshift集群中创建用户可用的表,使用点表示法将流式物化视图中的JSON文档解嵌为表的数据列。您可以定期调用存储过程来持续加载新数据。建立连接,将Amazon QuickSight仪表板与Amazon Redshift连接,以提供可视化和洞察。

此外,我们还将讨论以下主题:

配置跨帐户从Amazon MSK到Amazon Redshift的流式摄取的步骤实现流式物化视图优化性能的最佳实践监控流式摄取失败的技术

前提条件

您需要具备以下条件:

一个AWS账户。根据您的使用案例,选择以下其中一项资源:如果使用Amazon Redshift Provisioned,则需要一个Redshift集群。有关说明,请参见创建示例Amazon Redshift集群。如果使用Amazon Redshift Serverless,则需要一个Redshift工作组。有关说明,请参见使用命名空间创建工作组。一个MSK集群。有关说明,请参见创建亚马逊MSK集群。在MSK集群中创建一个主题,供数据生产者发布数据。一个数据生产者,用于向MSK集群中的主题写入数据。

设置MSK主题时的注意事项

配置MSK主题时,请考虑以下注意事项:

云梯加速器破解版确保MSK主题名称不超过128个字符。目前,包含压缩数据的MSK记录无法在亚马逊Reidshift中直接查询。亚马逊Redshift不支持客户端压缩数据的任何本机解压缩方法。设置MSK集群时,请遵循最佳实践。检查流式摄取的限制,以了解其他注意事项。

设置流式摄取

要设置流式摄取,请完成以下步骤:

设置AWS身份和访问管理 (IAM) 角色和信任策略,以便进行流式摄取。有关说明,请参见为Kafka设置IAM和执行流式摄取。使用Amazon CloudWatch 指标确保数据流向您的MSK主题例如,使用BytesOutPerSec指标。从Amazon Redshift控制台启动查询编辑器v2,或使用您喜欢的SQL客户端连接到Redshift集群,以执行后续步骤。以下步骤在查询编辑器v2中运行。

创建一个外部架构以映射到MSK集群。在以下语句中替换您的IAM角色ARN和MSK集群ARN:

sqlCREATE EXTERNAL SCHEMA custschemaFROM MSKIAMROLE iamrolearn AUTHENTICATION { none iam }CLUSTERARN mskclusterarn

如果您的主题名称区分大小写,您需要启用 enablecasesensitiveidentifier 以便在亚马逊Redshift中访问它们。要使用区分大小写的标识符,请将 enablecasesensitiveidentifier 设置为 true,适用于会话、用户或集群级别:

sqlSET ENABLECASESENSITIVEIDENTIFIER TO TRUE

创建物化视图以从MSK主题消费流式数据:

sqlCREATE MATERIALIZED VIEW OrdersStreamMV ASSELECT kafkapartition kafkaoffset refreshtime JSONPARSE(kafkavalue) as DataFROM custschemaORDERTOPICWHERE CANJSONPARSE(kafkavalue)

在Amazon Redshift中,来自亚马逊MSK的元数据列 kafkavalue 存储为VARBYTE格式。 在本篇文章中,您使用了JSONPARSE函数将 kafkavalue 转换为SUPER数据类型。您还在过滤条件中使用了CANJSONPARSE函数,以跳过无效的JSON记录并防止因为JSON解析失败而出现的错误。我们在文章后面讨论如何存储无效数据以供将来调试。

刷新流式物化视图,这会触发亚马逊Redshift从MSK主题读取并加载数据到物化视图中:

sqlREFRESH MATERIALIZED VIEW OrdersStreamMV

您还可以将流式物化视图设置为使用自动刷新功能。这将使您的物化视图在数据到达流时自动刷新。有关创建具有自动刷新的物化视图的说明,请参见CREATE MATERIALIZED VIEW。

解嵌JSON文档

以下是从MSK主题摄取到流式物化视图 OrdersStreamMV 中的SUPER类型数据列的JSON文档示例:

json{ EventTypeOrders OrderID103 CustomerIDC104 CustomerNameDavid Smith OrderDate20230902 StoreNameStore103 ProductIDP004 ProductNameWidgetX003 Quatity5 Price2500 OrderStatusInitiated}

使用以下代码示例中的点表示法解嵌您的JSON有效负载:

sqlSELECT dataOrderIDINT4 as OrderID dataProductIDVARCHAR(36) as ProductID dataProductNameVARCHAR(36) as ProductName dataCustomerIDVARCHAR(36) as CustomerID dataCustomerNameVARCHAR(36) as CustomerName dataStoreNameVARCHAR(36) as StoreName dataOrderDateTIMESTAMPTZ as OrderDate dataQuatityINT4 as Quatity dataPriceDOUBLE PRECISION as Price dataOrderStatusVARCHAR(36) as OrderStatus kafkapartitionBIGINT kafkaoffsetBIGINTFROM ordersstreammv

以下截图展示了解嵌后的结果:

如果您在JSON文档中有数组,请考虑使用亚马逊Redshift中的PartiQL语句解嵌您的数据。有关更多信息,请参阅帖子中“解嵌JSON文档”部分:使用亚马逊Redshift流式摄取与亚马逊Kinesis数据流和亚马逊DynamoDB进行近实时分析。

增量数据加载策略

请完成以下步骤以实施增量数据加载:

在亚马逊Redshift中创建一个名为Orders的表,供最终用户进行可视化和业务分析:

sqlCREATE TABLE publicOrders ( orderid integer ENCODE az64 productid character varying(36) ENCODE lzo productname character varying(36) ENCODE lzo customerid character varying(36) ENCODE lzo customername character varying(36) ENCODE lzo storename character varying(36) ENCODE lzo orderdate timestamp with time zone ENCODE az64 quatity integer ENCODE az64 price double precision ENCODE raw orderstatus character varying(36) ENCODE lzo) DISTSTYLE AUTO

接下来,您创建一个名为 SPOrdersLoad 的存储过程,以从流式物化视图实施CDC并加载到最终的 Orders 表中。您可以利用流式物化视图中的 KafkaPartition 和 KafkaOffset 的组合来实施CDC,这两列在每个MSK主题中始终是唯一的,确保在过程中不会遗漏任何记录。存储过程包括以下组件:

如需使用区分大小写的标识符,请将 enablecasesensitiveidentifier 设置为true,适用于会话、用户或集群级别。手动刷新流式物化视图如果未启用自动刷新。如果不存在,则创建一个名为 OrdersStreamingAudit 的审计表,以跟踪上一次运行存储过程时加载到Orders表中的每个分区的最后偏移量。解嵌并仅插入新或更改的数据到名为 OrdersStagingTable 的临时表中,该表读取来自流式物化视图 OrdersStreamMV 的数据,其中 KafkaOffset 大于审计表中为被处理 KafkaPartition 记录的上一次处理的 KafkaOffset。在首次使用此存储过程进行加载时,OrdersStreamingAudit 表中没有数据,此时会将所有数据从 OrdersStreamMV 加载到Orders表中。从临时表 OrdersStagingTable 选择并插入仅与业务相关的列到用户可用的 Orders 表中。将加载每一个 KafkaPartition 的最大 KafkaOffset 插入审计表 OrdersStreamingAudit。

我们在此解决方案中添加了中间临时表 OrdersStagingTable,以帮助调试意外失败并进行追踪。跳过临时步骤并直接从 OrdersStreamMV 加载到最终表中,可能会根据使用案例提供更低的延迟。

最佳实践:使用 Amazon Redshift Streaming Ingestion 和 Amaz创建如下代码的存储过程:

sqlCREATE OR REPLACE PROCEDURE SPOrdersLoad()AS BEGIN

SET ENABLECASESENSITIVEIDENTIFIER TO TRUEREFRESH MATERIALIZED VIEW OrdersStreamMV

如果不存在,则创建一个审计表以跟踪加载到Orders表的每个分区的最大偏移CREATE TABLE IF NOT EXISTS OrdersStreamingAudit( kafkapartition BIGINT kafkaoffset BIGINT)SORTKEY(kafkapartition kafkaoffset)

DROP TABLE IF EXISTS OrdersStagingTable

根据新/现有分区的最大偏移,从流式视图插入到临时表中仅新可用的数据在首次加载时,即OrdersStreamingAudit表中没有数据时,将从流式视图加载所有数据CREATE TABLE OrdersStagingTable AS SELECT dataOrderIDNINT4 AS OrderID dataProductIDSVARCHAR(36) AS ProductID dataProductNameSVARCHAR(36) AS ProductName dataCustomerIDSVARCHAR(36) AS CustomerID dataCustomerNameSVARCHAR(36) AS CustomerName dataStoreNameSVARCHAR(36) AS StoreName dataOrderDateSTIMESTAMPTZ AS OrderDate dataQuatityNINT4 AS Quatity dataPriceNDOUBLE PRECISION AS Price dataOrderStatusSVARCHAR(36) AS OrderStatus skafkapartitionBIGINT skafkaoffsetBIGINTFROM OrdersStreamMV sLEFT JOIN ( SELECT kafkapartition MAX(kafkaoffset) AS kafkaoffset FROM OrdersStreamingAudit GROUP BY kafkapartition) AS m ON NVL(skafkapartition0) = NVL(mkafkapartition0)WHERE mkafkaoffset IS NULL OR skafkaoffset gt mkafkaoffset

仅从临时表选择与业务相关的列并插入到最终表中INSERT INTO Orders SELECT OrderID ProductID ProductName CustomerID CustomerName StoreName OrderDate Quatity Price OrderStatusFROM OrdersStagingTable

将每个加载的Kafka分区的最大kafkaoffset 插入审计表 INSERT INTO OrdersStreamingAuditSELECT kafkapartition MAX(kafkaoffset)FROM OrdersStagingTableGROUP BY kafkapartition

END LANGUAGE plpgsql

运行存储过程将数据加载到 Orders 表中:

sqlCALL SPOrdersLoad()

验证Orders表中的数据。

建立跨帐户流式摄取

如果您的MSK集群属于不同的帐户,请完成以下步骤以创建IAM角色以设置跨帐户流式摄取。假定Redshift集群在帐户A中,而MSK集群在帐户B中,如下图所示。

完成以下步骤:

在帐户B中创建一个名为 MyRedshiftMSKRole 的IAM角色,该角色允许亚马逊Redshift帐户A与名为 MyTestCluster 的MSK集群帐户B进行通信。根据MSK集群是否使用IAM身份验证或匿名访问进行连接,您需要创建具有以下其中一种策略的IAM角色:

用于未认证访问的IAM 策略:

json{ Version 20121017 Statement [ { Sid RedshiftMSKPolicy Effect Allow

发表评论