实时无服务器数据摄取:使用 Kafka Connect 将您的 Kafka 集群数据导入 Amazo

  • 2026-01-27 13:58:34

实时无服务器的数据摄取:将 Kafka 集群数据导入 Amazon Timestream

作者:Kayalvizhi Kandasamy 和 Norbert Funke发布日期:2024年1月16日类别: 高级 (300) Amazon 管理型 Apache Kafka 流服务 (Amazon MSK) Amazon Timestream 无服务器 技术指引

关键要点

了解如何通过 Timestream Sink Connector 实时将 Amazon MSK 中的数据流转存入 Amazon Timestream。无需管理基础设施,使用 Amazon MSK 实现高效且安全的数据流处理。探索实时数据流的应用案例,包括物联网设备监控、在线游戏和广告技术。

为了获得及时见解并迅速做出反应,组织需要具备收集和分析大量数据的系统和机制。流处理数据技术使组织能够在数据生成时进行摄取、处理和分析。本文展示了如何利用 Timestream Sink Connector 将来自 Amazon 管理型流服务 (Amazon MSK) 的事件实时流入 Amazon Timestream 表。

通过 Amazon MSK,这种全托管服务使数百、数千个 Kafka 集群的数据流平台可以安全地扩展。您可以轻松迁移数据至下游系统,如数据库、数据湖、文件系统和搜索索引,以进行进一步的数据分析和处理。

Amazon Timestream 是一个快速、可扩展且无服务器的时间序列数据库服务,旨在简化每日电子流事件的存储与分析,适用于数百万个物联网设备、工业设备、游戏会话、流媒体视频会话等多种用例。

接下来,我们将讨论从 MSK 集群实时流入 Timestream 数据库表的架构模式:

AWS Lambda:AWS Lambda 是一个无服务器、事件驱动的计算服务,可以通过多种 AWS 事件触发,包括来自 Amazon MSK 的事件。Kafka Connect:这是一个开源框架,用于将 Kafka 集群与外部系统连接,使开发者更便于地在 Apache Kafka 集群间流动数据。

本文深入探讨 Kafka Connect 模式,演示如何利用 Timestream Sink Connector 将事件从 MSK 集群流入 Timestream 表。您可以在 GitHub 仓库 中找到连接器的源代码。

该连接器能够帮助您对从 MSK 集群流入时间序列事件的数据进行实时分析,适用范围广泛,包括实时竞价、广告技术服务中的报告和跟踪、在线游戏中的玩家行为分析,以及在移动和物流服务中的航班、公交和出租车的实时跟踪。

解决方案概述

在本节中,我们将讨论构建一个端到端管道的解决方案架构,起始于一个生成数据的 Amazon EC2 实例,该数据将摄取至目标 Timestream 表,以便后续分析。

在 MSK Connect 中部署 Timestream Sink Connector 后,它会从配置的 Amazon S3 存储桶中加载目标 Timestream 表的架构定义。连接器使用架构定义来验证来自 Kafka 主题的消息,然后将其作为记录插入到 Timestream 表中。

数据流从一台 Amazon EC2 Kafka 生产者实例开始,该实例将记录写入 Kafka 主题。随着数据的到达,Apache Kafka 的 Timestream Sink Connector 实例会将数据写入 Timestream 表,如下图所示。

连接器通过 VPC 端点由 AWS PrivateLink 提供安全地连接到 Timestream 和 S3,以确保 VPC 与连接服务之间的流量不离开 Amazon 网络,如下图所示。

魔方加速器最新版

本文将带您完成以下步骤:

使用 AWS CloudFormation 模板快速配置 AWS 资源,以便以自动化和安全的方式管理基础设施资源。配置已创建的 EC2 实例作为 Kafka 生产者客户端。配置 MSK Connect 部署 Timestream Sink Connector。向 Kafka 主题发布消息。验证数据是否已写入 Timestream 表。

我们将使用的 AWS 资源

借助 CloudFormation 模板,您可以配置解决方案所需的资源,包括 VPC、子网、跨越三个 可用区 的 MSK 集群、EC2 实例、VPC 端点、Timestream 数据库和表等。请选择 启动堆栈,在可用 Timestream 的 AWS 区域中配置资源。

注意: 此解决方案会创建可能导致费用的 AWS 资源,完成后请确保删除堆栈。

大约需要 1520 分钟完成堆栈配置,之后您可以在 AWS CloudFormation 控制台的 输出 标签中查看以下信息:

资源名称说明KafkaPublisherEC2InstanceId作为 Kafka 客户端和发布者的 EC2 实例 IDMSKClusterArn创建的 MSK 集群的 ARNMSKConnectCWLogGroupArn为 Timestream Sink Connector 创建的 CloudWatch 日志组MSKConnectIAMRoleArn为 Timestream Sink Connector 创建的 IAM 角色MSKKafkaVersionMSK 集群的 Kafka 版本S3BucketName存储 MSK Connect 自定义插件和工件的 S3 存储桶名称S3KeyCSVData实际数据的 S3 键,以便将消息发布到 MSK 主题S3KeyJMXTestPlanJMeter 文件的 S3 键,用于向 MSK 主题发布消息S3URIPlugin自定义插件代码 JAR 文件的 S3 URITimestreamDatabaseNameTimestream 数据库名称TimestreamIngestionVpcEndpointTimestream 摄取的 VPC 端点TimestreamTableName存储摄取数据的 Timestream 表

II 配置 Kafka 生产者

接下来,让我们配置作为 Kafka 生产者的 EC2 实例。我们将使用该实例创建主题,然后通过 Apache JMeter 向主题发布消息,JMeter 是一种开源应用,用于负载测试、功能行为测试、性能测量等。

要连接到 EC2 实例,我们使用 Session Manager,这是 AWS Systems Manager 的一个功能。有关详细信息,参阅 使用 AWS Systems Manager SessionManager 连接到您的 Linux 实例。

在 Amazon EC2 控制台的导航窗格中选择 实例。选择在控制台输出中提到的 KafkaPublisherEC2InstanceId 实例。选择 连接。选择 会话管理器 作为连接方式,然后选择 连接。

创建 Kafka 主题

在此步骤中,我们在 MSK 集群中创建一个 Kafka 主题,该集群由 MSKClusterArn 指定,版本由 MSKKafkaVersion 指定。

在基于浏览器的 SSH shell 中,运行以下命令:

bash sudo u ec2user i

在已配置的 MSK 集群中 创建一个主题。

配置 JMeter

在本节中,我们将使用 Apache JMeter 向我们创建的 Kafka 主题发布数千条消息。

下载并解压 jmeter 应用程序:

bash cd wget https//dlcdnapacheorg//jmeter/binaries/apachejmeter562tgz tar xf apachejmeter562tgz

验证 jmeter 是否已安装:

bash cd apachejmeter562/bin/ /jmeter v

输出应类似于以下截图。

下载 kafkaclientsjar 文件,该文件包含用于处理 Kafka 的类,并将其放置在 jmeter 的 /lib/ext 目录中:

bash cd cd apachejmeter562/lib/ext wget https//repo1mavenorg/maven2/org/apache/kafka/kafkaclients/{MSKKafkaVersion}/kafkaclients{MSKKafkaVersion}jar

下载 mskiamauthalljar 文件,该文件包含为使用 IAM 身份验证配置的 Amazon MSK 提供服务的类,并将其放置在 jmeter 的 /lib/ext 目录中:

bash cd cd apachejmeter562/lib/ext wget https//githubcom/aws/awsmskiamauth/releases/download/v119/awsmskiamauth119alljar

III 部署 Timestream Sink Connector

接下来,让我们展示如何在 MSK Connect 中部署 Timestream Sink Connector。

配置 Amazon MSK 自定义插件

在此步骤中,我们创建一个 自定义插件,该 AWS 资源包含定义连接器逻辑的代码。请注意,代码作为名为 kafkaconnectortimestream10SNAPSHOTjarwithdependenciesjar 的 JAR 文件打包,并上传到 S3BucketName 指定的 S3 存储桶中。

实时无服务器数据摄取:使用 Kafka Connect 将您的 Kafka 集群数据导入 Amazo在 Amazon MSK 控制台中,选择导航窗格中的 自定义插件。选择 创建自定义插件。对于 S3 URI,输入连接器文件的 S3 URI参考自 S3URIPlugin。给它起个名字,然后选择 创建自定义插件。

配置 Amazon MSK 工作配置

工作是运行连接器逻辑的 Java 虚拟机 (JVM) 进程。接下来,我们创建一个 自定义工作配置,使用基于 JSON 的转换器,而不是默认工作配置所自带的基于字符串的转换器。

在 Amazon MSK 控制台中,选择导航窗格中的 工作配置。选择 创建工作配置。在 工作配置 中,粘贴以下配置:

bash keyconverter=orgapachekafkaconnectjsonJsonConverter valueconverter=orgapachekafkaconnectjsonJsonConverter keyconverterschemasenable=false valueconverterschemasenable=false

给它起个名字,然后选择 创建工作配置。

配置 Timestream Sink Connector

现在是时候使用之前创建的自定义插件配置连接器了。

在 Amazon MSK 控制台中,选择导航窗格中的 连接器。选择 创建连接器。选择您之前创建的插件,然后选择 下一步。输入您选择的名称例如,可以用 msktimestreamconnector。选择已配置的 MSK 集群。在配置连接器时,替换值并输入以下配置:

bash #Kafka connect configurations connectorclass=softwareamazontimestreamTimestreamSinkConnector tasksmax=2 topics=lt您创建的主题gt awsregion=ltAWS 区域gt timestreamingestionendpoint=ltTimestreamIngestionVpcEndpointgt timestreamschemas3bucketname=ltS3BucketNamegt timestreamschemas3key=purchasehistoryjson timestreamdatabasename=ltTimestreamDatabaseNamegt timestreamtablename=ltTimestreamTableNamegt

在 工作配置 下,选择 使用自定义配置,选择您在前一步创建的自定义工作配置。在 访问权限 中,选择已配置的 IAM 角色参考自 MSKConnectIAMRoleArn,然后选择 下一步。在 日志 部分,选择 交付到 Amazon CloudWatch Logs 并更新 日志组 Arn 为 MSKConnectCWLogGroupArn 的值,然后选择 下一步。审核所有设置并选择 创建连接器。连接器创建需要 510 分钟完成。当状态变为 运行 时,管道准备就绪,如下图所示。

IV 发布消息

现在让我们向创建的 Kafka 主题发布消息。

使用 Systems Manager 连接到 Kafka 发布者 EC2 实例。在基于浏览器的 SSH shell 中,运行以下命令:

bash sudo u ec2user i

设置启动服务器 URL 和创建的主题名称为环境变量,运行以下命令:

bash export BOOTSTRAPSERVER=ltBootstrapServerStringgt export TOPIC=ltCreated Topicgt

有关获取启动经纪人信息的详细内容,请参见 获取启动经纪人。

重要提示: 您将获得每个代理的三个端点,您只需一个代理端点,如示例命令所示。使用包含多个服务器的 MSK 启动服务器连接字符串将导致连接错误。

bashexport BOOTSTRAPSERVER=b3mskcluster1234569470qvc4kafkaeuwest1amazonawscom9098export TOPIC=purchasehistory

从 S3BucketName 所指定的 S3 存储桶中下载带有键名 S3KeyCSVData 的示例数据文件和用于发布消息的 JMeter 测试计划文件 S3KeyJMXTestPlan:

bash cd cd apachejmeter562/bin/examples mkdir timeseries cd timeseries aws s3api getobject bucket key aws s3api getobject bucket key ltS3Key