在20分钟内构建实时流ETL管道

最近有很多人说传统的ETL已经死了。在传统的ETL范式中,数据仓库是王者,ETL作业是批处理驱动的,所有内容都与其他内容进行通信,可伸缩性限制非常普遍。当人们咕哝着说由此造成的混乱是“做生意的成本”时,混乱的管道被勉强容忍。“人们学会了忍受这样的一团糟:

etl_mess

然而,ETL并没有死。开发人员越来越喜欢一种新的ETL范式,它具有分布式系统和事件驱动的应用程序,其中业务可以实时和大规模地处理数据。仍然需要“提取”、“转换”和“加载”,但是现在的区别是将数据作为一等公民对待。企业不再希望将数据降级为批处理,而批处理通常仅限于每天离线完成一次。它们拥有更多不同类型的数据源,并且希望消除混乱的点到点连接。我们可以将流处理直接嵌入到每个服务中,核心业务应用程序可以依赖流平台来分发和处理事件。这篇文章的重点是演示在Apache卡夫卡®如何轻松地可以实现这些流ETL管道。

etl_streaming

Kafka是一个分布式流平台,是现代企业架构的核心。它提供了在Kafka Connect框架中运行的Kafka连接器,可以从不同的数据源提取数据;提供了丰富的Kafka流API,可以从核心应用程序中执行复杂的转换和分析;您可以部署Confluent模式注册中心来集中管理模式,验证兼容性,并在数据不符合模式时提供警告。(不明白为什么需要为关键任务数据建立模式注册表?)阅读这篇博客文章。端到端参考体系结构如下:

connect_streams_ref_arch

让我们考虑一个使用Kafka Streams API进行实时有状态流处理的应用程序。我们将运行端到端参考体系结构的一个具体示例,并向您展示如何:

  • 运行Kafka源连接器从另一个系统(SQLite3数据库)读取数据,然后使用单个消息转换(SMTs)修改正在运行的数据,然后将其写入Kafka集群
  • 使用Kafka Streams API(例如count和sum)处理和丰富来自Java应用程序的数据
  • 运行Kafka接收器连接器将数据从Kafka集群写到另一个系统(AWS S3)

本例的工作流程如下:

如果您希望在您的环境中进行后续操作并进行尝试,请使用快速入门指南来设置Kafka集群并下载完整的源代码

提取数据到Kafka

首先,我们必须将数据导入客户机应用程序。要在Kafka和其他系统之间复制数据,用户可以从各种现成的连接器中选择Kafka连接器。Kafka源连接器从另一个系统导入数据到Kafka, Kafka接收器连接器从Kafka导出数据到另一个系统。

对于我们的示例,我们希望从SQLite3数据库中提取数据,该数据库保存到/usr/local/lib/retail.db。数据库有一个名为locations的表,它有三个列idnamesale,其中包含示例内容:

locations

id name sale
1 Raleigh 300
2 Dusseldorf 100
1 Raleigh 600
3 Moscow 800
4 Sydney 200
2 Dusseldorf 400
5 Chennai 400

我们希望从该表创建一个数据流,其中流中的每个消息都是K/V对。你会问,key是什么,value是什么?我们来算一下。

streams_kv_generic

为了将表数据提取到Kafka主题中,我们使用免费的Confluent开源下载附带的JDBC连接器。注意,默认情况下,JDBC连接器不会向消息添加键。由于消息键对于组织和分组消息非常有用,因此我们将使用smt设置该键。如果使用默认配置设置,数据将被写入Kafka主题,配置如下:

默认配置

配置名 配置值
源数据库 test.db
Kafka 创建的topic test-sqlite-jdbc-locations
消息键呈现 有效的空 (具有null值的JSON模式)
信息值模式 JSON
数据治理 None

相反,我们需要以下目标配置:

目标配置

配置名 配置值
源数据库 /usr/local/lib/retail.db
Kafka 创建的topic retail-locations
消息键呈现 是的,使用Kafka Connect的单个消息转换特性插入密钥
信息值模式 Avro
数据治理 是的,使用模式注册表

为了实现目标配置,我们修改JDBC源连接器属性文件source-quickstart-sqlite.properties:

然后,我们将这些配置行添加到JDBC源连接器属性文件中,以利用单个消息转换(SMT)函数,这些函数在将从表行提取的数据写到Kafka主题之前操作这些数据。我们使用ValueToKeyExtractField SMT函数将null键替换为从消息值派生的字段。

最后,我们将这些配置行添加到JDBC源连接器属性文件中,以将键转换器配置为字符串(可以像JSON或Avro那样轻松地序列化),将值转换器配置为模式的Avro。Confluent模式注册表运行在http://schemaregistry1:8081

为了简单起见,我们以Kafka Connect独立模式运行JDBC源连接器。在生产中,您应该始终使用分布式模式进行可伸缩性和容错,并使用合流控制中心进行集中管理。

此时,Kafka Connect运行JDBC连接器,并将表的每一行作为键/值对写入Kafka主题retail-locations。对该表的状态感兴趣的应用程序将从本主题读取。当将行添加到SQLite3数据库中的源表时,Kafka Connect会自动将它们作为消息写入Kafka主题,然后在KStream对象中自动提供给客户机应用程序。因此,我们实现了数据的无界、连续的实时流。这个数据流就是我们所说的“流”。

每个表行被序列化为Avro记录。我们可以从主题retail-locations查找Confluent模式注册表中消息值的模式。

使用Kafka Streams API转换数据

既然源数据已写入Kafka主题,那么任何数量的应用程序都可以从该主题读取并使用模式注册表反序列化消息值(即Avro记录)。一个简单的应用程序可以是kafka-avro-consol-consumer:

但是控制台使用者命令行工具不是我们的最终目标。我们希望演示如何在客户机应用程序中使用Kafka Streams API来处理该主题中的数据。Confluent提供了关于如何使用API开发应用程序的优秀文档

在这里,我想强调应用程序的两个部分:

  1. 从Kafka主题创建Kafka流对象
  2. 使用Kafka流处理操作数据转换

创建Kafka流对象

从Kafka主题创建Kafka流对象意味着将Kafka中的字节记录转换为客户机应用程序中的Java对象。因为消息值是Avro记录,所以我们需要匹配该模式的Java类。我们创建一个名为location.avsc的Avro模式文件,它定义了客户机对数据结构的期望。它是JSON格式的,并且有一个记录,其中包含三个字段idnamesales,这些字段对应于表列

为了让Java客户机应用程序能够反序列化在这个Avro模式中编写的消息,我们需要有一个对应的Java类(例如Location)。但是,我们不需要编写Java代码!在我们的pom.xml 中,我们使用Maven插件avro-maven-plugin,它自动生成这些类的Java代码。

现在Java源代码已经从Avro模式文件中自动创建,您的应用程序可以导入该包:

下一个关键步骤是配置streams配置,以使用适当的serialization/deserialization类并指向模式注册中心:

您现在可以创建KStream对象:

请注意,在上面的工作流中省略了一些基本的Kafka流设置,但是请记住查看Streams开发人员指南以获得详细的说明。

处理和丰富数据

在客户端应用程序的这一点上,我们有一个名为locationsStreamKStream对象,它包含一个消息流,其中消息键是<Long> id,消息值是<Location>记录,其中包含它的id、name和sale值。

现在我们可以用流处理器进行数据转换。有一个丰富的Kafka Streams API用于实时流处理,您可以在核心业务应用程序中利用它。有许多流处理器,它们一次接收一条输入记录,将其操作应用于该记录,然后可能向其下游处理器生成一条或多条输出记录。这些处理器可以是无状态的(例如一次转换一条消息,或者根据某些条件过滤掉消息),也可以是有状态的(例如跨多个消息的join, aggregate, 或 window数据)。

为了开始使用Kafka Streams API,下面是三个示例,其中包含示例代码和相应的可视化结果流数据:

  1. 将数据流转换为新类型的键/值对。例如,我们可以使用map方法将原来的KStream<Long, Location>转换为KStream<Long, Long>的key/value对,其中key是相同的key, value只是sales的值。

streams_kv_transform

  1. 计算特定键的出现次数,首先根据键对消息进行分组,然后使用Count方法计算出现次数。Kafka Streams API具有捕获流和表的对重性的本地抽象:KStream表示消息流,其中每个数据记录表示无界数据集中的自包含数据,KTable表示changelogs,其中每个数据记录表示更新。我们还可以为aKTable命名一个本地状态存储,这允许我们像查询普通表一样轻松地查询它。例如,我们可以计算每个键出现的次数。

  1. 通过首先基于键对消息进行分组,然后使用reduce方法对值进行求和,从而对特定键的值进行求和。例如,我们可以跨所有消息对给定键的销售额进行求和,并在添加新销售额时对其进行实时更新。这个例子还展示了Kafka流如何允许我们在KStream和ktable之间来回切换。

将数据加载到其他系统

在这一点上,所有丰富的数据仍然在我们的客户端应用程序中,我们可能希望将它流到另一个系统中。事实上,您可能希望在许多目标下游系统中的数据作为扇出管道的一部分。您可以使用多个Kafka接收器连接器运行Kafka Connect,以便任何数量的目标下游系统都可以接收相同的数据。Kafka接收器连接器可以与Kafka源连接器并行运行。

在我们的示例中,目标系统是AWS S3 bucket。要在那里加载数据,从Kafka获取数据和从Kafka获取数据一样容易。应用程序用这条语句将theKStream写到Kafka主题上:

然后Kafka接收器连接器处理进入下游系统的摄入。您可以将接收器连接器指向这个Kafka主题,并使用Kafka Connect以与为Kafka源连接器运行Kafka Connect类似的方式运行它。

要将数据加载到AWS S3中,可以使用适当的主题名称、S3区域和bucket配置S3连接器属性,然后运行连接器。现在我们将省略S3的精确配置(很快就会有另一篇关于此配置的博客文章),但是您可以使用以下命令启动连接器:

结论

也许在这篇博客文章中描述的工作流对于我们这个非常简单的例子来说似乎带来了很多开销。但是,考虑具有多个数据源和目标的真实场景,这些数据源和目标需要摄取数据,同时支持随时间变化的各种模式。有多步骤的实时工作流,具有复杂的转换,需要高耐久性和容错能力。Kafka为构建流ETL管道提供了一个非常灵活、可伸缩的架构。您不希望试图将其与传统的ETL范式结合在一起,因为整个过程不可避免地会变得混乱。回顾Kafka参考体系结构图,您可以看到实时的、关键业务的应用程序如何从Kafka的流化ETL功能中获益:

Kafka流媒体平台允许关键任务应用程序实时处理数据:

  • Kafka连接器在Kafka Connect框架中运行,开发人员可以从一个系统中提取数据或将数据加载到另一个系统中
  • Kafka Streams API为应用程序提供了流处理功能,可以一次转换一条消息或事件。这些转换可以包括连接多个数据源、过滤数据和在一段时间内聚合数据
  • Confluent模式注册中心使用Avro模式提供数据治理
  • 汇流控制中心提供集中管理

下载Confluent开放源代码,并从使用Kafka流、Kafka Connect、Avro、模式注册表的代码示例开始。如果你喜欢Docker,你也可以看看Kafka的音乐演示

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2021 朝着牛逼的道路一路狂奔 All Rights Reserved.

访客数 : | 访问量 :