构建一个可扩展的流数据平台,实现对电动车的实时和批量分析,运行在 AWS 上 大数据博客
构建可扩展的电动车流数据平台,实现实时和批量分析
关键要点
本文介绍了如何在 AWS 上构建一个可扩展的电动车流数据平台,以便有效地实现实时及批量分析。利用 Amazon OpenSearch Ingestion、Amazon Managed Streaming for Apache Kafka (MSK) 和 Amazon S3,用户能够轻松地收集、处理和存储大量车辆数据,以优化性能、提高安全性并实现节能。
随着电动车EV在汽车行业的广泛应用,整个行业经历了显著的变革。电动车以其可持续性和环保特性,开启了交通的新纪元。随着环保意识的增强及绿色技术的推广,电动车的采用迅速增长,承诺重塑我们的出行方式。
电动车的激增带来了对数据获取和分析的迫切需求,以优化其性能、可靠性和效率。在迅速发展的电动车行业中,有效地获取、处理并从电动车生成的大量数据中提取见解,已成为制造商、服务提供商和研究人员的必备能力。
随着电动车市场的不断扩大,众多新兴和现有企业争相进入,使得车辆性能成为关键的差异化因素。
现代电动车配备了各种传感器和系统,持续监控其运营的多个方面,包括电压、温度、振动、速度等参数。从电池管理到电机性能,这些数据信息的传递,若能有效捕捉和分析,将极大地促进车辆设计,提升安全性,并优化能耗。数据可用于预测性维护、设备异常检测、实时客户警报、远程设备管理和监控。
然而,管理这庞大的数据涌流并非没有挑战。随着电动车的快速普及,强大的数据管道变得愈加重要,它们能够采集、存储以及处理来自不断增长的电动车数量的数据。此外,每辆车生成的数据粒度显著增加,迫切需要高效处理日益增加的数据点。面临的挑战涉及数据管理的技术复杂性,以及与数据安全、隐私和遵循不断变化的法规相关的问题。
在本文中,我们深入探讨了如何构建可靠的数据分析管道,以便扩展到百万辆电动车,且每秒生成数百个指标,使用 Amazon OpenSearch Ingestion。我们还提供了指南和示例配置,帮助您顺利实现解决方案。
前提条件
在实施解决方案之前,您需要以下内容:
前提条件描述IOT 主题规则配置 AWS IoT Core 的主题规则Amazon MSK 集群使用简单身份验证和安全层SASL/SCRAMAmazon OpenSearch Service 域创建用于数据存储的 OpenSearch 域解决方案概述
下图展示了一个可扩展的、完全托管的现代数据流平台架构。该架构使用 Amazon OpenSearch Ingestion 将数据流入 OpenSearch Service,并通过 Amazon Simple Storage Service (Amazon S3) 存储数据。OpenSearch 中的数据用于驱动实时仪表板,且可以用于通知客户车辆故障参见 在 Amazon OpenSearch Service 中配置警报。存储在 Amazon S3 的数据用于业务智能和长期存储。
在接下来的部分中,我们深入探讨架构中的三个关键组成部分:
从 Amazon MSK 到 OpenSearch 的数据流管道OpenSearch Ingestion 管道到 OpenSearch ServiceOpenSearch Ingestion 到 Amazon S3解决方案工作流程
步骤 1:从 MSK 到 Amazon OpenSearch 的数据流管道
每辆电动车通过 AWS IoT Core 向 Amazon MSK 集群传输大量数据,因此理解这些数据流是至关重要的。OpenSearch Ingestion 提供了一个完全托管的无服务器集成,能够有效利用这些数据流。
在 OpenSearch Ingestion 中的 MSK 源使用 Kafka 的消费者 API 从一个或多个 MSK 主题 中读取记录。OpenSearch Ingestion 中的 MSK 源与 MSK 无缝连接,将流数据引入其处理管道。
以下代码片段展示了从 MSK 集群引入数据的 OpenSearch Ingestion 管道配置。
在创建 OpenSearch Ingestion 管道时,在管道配置部分添加以下代码段。
yamlversion 2mskpipeline source kafka acknowledgments true topics name evdevicetopic groupid opensearchconsumer serdeformat json aws # 提供访问 MSK 的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiam ltltaccountidgtgtrole/opensearchpipelineRole # 提供域的区域。 region ltltregiongtgt msk # 提供 MSK ARN。 arn arnawskafkaltltregiongtgtltltaccountidgtgtcluster/ltltnamegtgt/ltltidgtgt
在配置 Amazon MSK 和 OpenSearch Ingestion 时,确保 Kafka 主题中的分区数量与分配给管道的 OpenSearch 计算单元OCUs之间建立最佳关系。这种优化配置可确保数据处理的效率并最大化吞吐量。有关更多信息,您可以查看 为 Amazon MSK 管道配置推荐的计算单元OCUs。
步骤 2:OpenSearch Ingestion 管道到 OpenSearch Service
OpenSearch Ingestion 提供了一种直接将电动车数据流转到 OpenSearch 的方法。OpenSearch 的接收插件将多个源的数据直接通道传输到 OpenSearch 域。您无需手动配置管道,而是使用 OCUs 定义管道的容量。每个 OCU 提供 6GB 的内存和两个虚拟 CPU。为了使 OpenSearch Ingestion 的自动扩展最优化,基于被处理的主题中的分区数量配置管道的最大 OCUs是至关重要的。如果某个主题的分区数量较多例如,超过 96 个,即每个管道的最大 OCUs,建议将管道配置为最大 196 个 OCUs。这使得管道根据需求在此范围内自动扩展或缩减。如果主题的分区数量较少例如,少于 96,建议将最大 OCUs 设置为等于分区数量。此方式确保每个分区由专用 OCU 处理,能够实现并行处理和最佳性能。在管道从多个主题获取数据的场景下,应采用分区数量最多的主题作为配置最大 OCUs 的基准。此外,若需要更高的吞吐量,可以为同一主题和消费者组创建另一个管道,配置新的 OCUs 集合,以实现接近线性的可扩展性。
魔方加速器最新版OpenSearch Ingestion 提供多个预定义的配置蓝图,帮助您快速在 AWS 上构建数据流管道。
以下代码片段展示了一个使用 OpenSearch 作为接收端,并带有 Amazon S3 死信队列DLQ的 OpenSearch Ingestion 管道的配置。当管道遇到写入错误时,会在配置的 S3 桶中创建 DLQ 对象。DLQ 对象以 JSON 文件存在,作为失败事件的数组。
yamlsink opensearch # 提供一个 AWS OpenSearch Service 域的端点 hosts [ https//ltltdomainnamegtgtltltregiongtgtesamazonawscom ] aws # 提供一个有权访问域的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt # 提供域的区域。 region ltltregiongtgt # 如果接收端为 Amazon OpenSearch Serverless 集合,请启用 serverless 标志 # 索引名称可以根据主题名称自动生成 index indexevpipe{yyyyMMdd} # 如果 AWS OpenSearch Service 域为 Elasticsearch 6x 版本,则启用 distributionversion 设置 #distributionversion es6 # 启用 S3 DLQ,以捕获 Ohan S3 桶中的任何失败请求 dlq s3 # 提供一个 S3 桶 bucket ltltbucketnamegtgt # 提供失败请求的路径前缀 keypathprefix osspipelineerrors/dlq # 提供桶的区域。 region ltltregiongtgt # 提供一个有权访问桶的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiam ltltaccountidgtgtrole/ltltrolenamegtgt
步骤 3:从 OpenSearch Ingestion 到 Amazon S3
OpenSearch Ingestion 提供了一个内置接收端,以直接将流数据加载到 S3。该服务可以压缩、分区并优化数据,以便在 Amazon S3 上实现具成本效益的存储和分析。加载到 S3 的数据可以根据车辆 ID、日期、地区或其他维度进行分区,以便于查询隔离和生命周期管理。
以下代码片段展示了我们如何在 Amazon S3 中分区和存储电动车数据。
yaml s3 aws # 提供一个有权访问桶的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt # 提供域的区域。 region ltltregiongtgt # 用于发送日志的桶 bucket evbucket objectkey # 可选的 s3 对象路径前缀 pathprefix indexevpipe/year={yyyy}/month={MM}/day={dd}/hour={HH} threshold eventcollecttimeout 60s codec parquet autoschema true
您可以按照 创建 Amazon OpenSearch Ingestion 管道 中的步骤创建管道。
以下是完整的管道配置,结合了前三个步骤的配置。根据需要更新 Amazon 资源名称ARNs、AWS 区域、OpenSearch Service 域端点和 S3 名称。
整个 OpenSearch Ingestion 管道配置可以直接复制到 AWS 管理控制台的“管道配置”字段中。
yamlversion 2mskpipeline source kafka acknowledgments true # 默认值为 false topics name ltltmsktopicnamegtgt groupid opensearchconsumer serdeformat json aws # 提供访问 MSK 的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt # 提供域的区域。 region ltltregiongtgt msk # 提供 MSK ARN。 arn arnawskafkauseast1ltltaccountidgtgtcluster/ltltclusternamegtgt/ltltclusteridgtgt processor parsejson sink opensearch # 提供一个 AWS OpenSearch Service 域的端点 hosts [ https//ltltopensearchservicedomainendpointgtgtuseast1esamazonawscom ] aws # 提供一个有权访问域的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt # 提供域的区域。 region ltltregiongtgt # 如果接收端为 Amazon OpenSearch Serverless 集合,请启用 serverless 标志 # 索引名称可以根据主题名称自动生成 index indexevpipe{yyyyMMdd} # 如果 AWS OpenSearch Service 域为 Elasticsearch 6x 版本,则启用 distributionversion 设置 #distributionversion es6 # 启用 S3 DLQ,以捕获 Ohan S3 桶中的任何失败请求 dlq s3 # 提供一个 S3 桶 bucket ltltbucketnamegtgt # 提供失败请求的路径前缀 keypathprefix osspipelineerrors/dlq # 提供桶的区域。 region ltltregiongtgt # 提供一个有权访问桶的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt s3 aws # 提供一个有权访问桶的角色 ARN。此角色应与 osispipelinesamazonawscom 建立信任关系 stsrolearn arnawsiamltltaccountidgtgtrole/ltltrolenamegtgt # 提供域的区域。 region ltltregiongtgt # 用于发送日志的桶 bucket ltltbucketnamegtgt objectkey # 可选的 s3 对象路径前缀 pathprefix indexevpipe/year={yyyy}/month={MM}/day={dd}/hour={HH} threshold eventcollecttimeout 60s codec parquet autoschema true
实时分析
数据在 OpenSearch Service 中可用后,您可以构建实时监控和通知系统。OpenSearch Service 强大的多种通知渠道支持,允许您通过 Slack、Chime、自定义 webhook、Microsoft Teams、电子邮件以及 Amazon Simple Notification Service (SNS) 等服务接收警报。
下图展示了 OpenSearch Service 支持的通知渠道。
OpenSearch Service 中的通知功能允许您创建监控器,以观察某些条件或数据变化,然后发出警报,例如监控车辆遥测数据,并在电池降级或异常能耗等问题上启动警报。例如,您可以创建一个监控器,分析电池容量随时间的变化,并在容量显著下降至预期降减曲线以下的多辆车辆中,通知值班团队通过 Slack。这可能指示需要调查的潜在制造缺陷。
除了通知之外,OpenSearch Service 还能够轻松构建实时仪表板,以可视化持续追踪您的车辆数据。您可以摄取车辆遥测数据,如位置、速度、燃料消耗等,并在地图、图表和仪表中进行可视化。仪表板可以提供对车辆健康和性能的实时可见性。
以下截图展示了在 OpenSearch Service 创建的示例仪表板。
OpenSearch Service 的一个主要优势是其能够处理高持续的摄取和查询速率,且响应延迟仅为毫秒级。它将传入的车辆数据分发到数据节点中进行并行处理。这使得 OpenSearch 能够扩展以支持极大的车队,同时仍然提供运营所需的实时性能和警报能力。
批量分析
数据在 Amazon S3 中可用后,您可以构建一个安全的数据湖,为各种分析用例提供强大见解。作为一个不可变的存储,新数据不断地存储在 S3 中,而现有数据则保持不变。这为下游分析提供了单一事实来源。

用于业务智能和报表的分析可以识别趋势、洞察并创造丰富的数据湖可视化。您可以使用 [Amazon QuickSight](https