Hello World, Kafka连接器 + Kafka流计算

重要提示:本文中的信息已经过时。通过最近的Kafka版本,Kafka Connect和Kafka流以及KSQL之间的集成变得更加简单和容易。请参阅 在20分钟内构建实时流ETL管道KSQL实战:从Oracle事务数据中实时流化ETL

本文由客籍博客Michal Haris与Neha Narkhede、Apache Kafka committer、Confluent联合创始人和CTO合作撰写。Michal在数字媒体行业工作,是一名专业从事实时大数据基础设施和流处理应用概念验证实现的技术架构师。

在过去的几年里,随着Apache Kafka的广泛采用,流处理已经走到了前台。最近,出现了几个与Kafka集成的流处理系统。其中一个系统Apache Samza有一个特别有趣的“hello world”教程,用于开始使用该系统;Hello Samza使用维基百科在其IRC频道上发布的实时更新。在本文中,我们将学习相同的教程,但是使用Apache Kafka项目伞下的两个新模块构建,一个最近发布,另一个计划在即将发布的版本中发布。

kafka_connect

Kafka Connect是Apache Kafka 0.9的一部分,它是Apache Kafka和其他数据系统之间可伸缩和可靠流数据的工具。它提供了API、运行时和REST服务,使开发人员能够快速定义连接器,从而将大型数据集移入和移出Kafka。它具有容错性和弹性,我们已经习惯了,并产生于整个一代摄食和卸载工具的经验。Kafka Connect可以摄取整个数据库,或者从所有应用服务器收集度量数据到Kafka主题中,从而使数据能够以较低的延迟进行流处理。这篇文章发布了该版本,您可以在其中找到链接和更多信息。

另一个特性是Kafka Streams——用于创建流处理应用程序的轻量级库。就部署而言,它是轻量级的,实际上在这方面它是完全“失重”的,不像大多数(如果不是全部的话)它的前辈和同时代人。这个库也是轻量级的,因为它构建在Kafka中针对流处理应用程序需要处理的问题(容错性、分区、可伸缩性、排序和负载平衡)本地构建的原语之上。对于我们这些重视软件简洁性的人来说,它是重量级冠军,用相对较少的资源实现了如此多的成就。更多关于背景和架构:介绍Kafka流

为了创建我们的Hello Kafka流程序,我们需要连接Wikipedia IRC通道,将它们转换成一个分区主题,我们可以从这个主题构建处理器拓扑。目前Kafka Connect还没有公开嵌入式API,尽管所有必要的构建块都已经就绪(在connect-runtime模块下)。但是,这项工作已经计划在将来的版本中进行,因此我们将通过在ConnectEmbedde.java中创建自己的临时扩展来加快进度。

在深入讨论之前,值得注意的是,有两种方法可以运行到Kafka的连接器:

  1. 作为一个连接器在集群进程上,运行Kafka Connect。
  2. 作为应用程序流程中的嵌入式实体。

您可能想知道,在另一个进程中嵌入连接器与在Kafka连接集群中作为长时间运行的进程相比,什么时候更好。

连接器更好地作为长期运行的Kafka连接流程来操作,以便在共享和标准数据系统之间支持流ETL流。例如,使用MySQL源连接器和HDFS接收器连接器从MySQL数据库中提取数据并将其加载到Hadoop中。另一方面,只与应用程序相关的连接器更方便地作为应用程序进程中的嵌入式实体运行。例如,本文中描述的一个演示应用程序。在这里,我们不希望为了运行一个简单的演示应用程序而单独运行服务和管理连接器,这会增加复杂性。有趣的是,在实际应用程序中常常是这样,而不仅仅是演示。在不更改体系结构的情况下降低开发的可伸缩性是可伸缩性的一个非常容易被忽略的方面——通常我们只考虑将可伸缩性提高到更大的数据量或更高的吞吐量。

好了,废话不多说,让我们直接进入构建Hello Kafka流。

如果您熟悉这些概念和api,可以跳过下一节,只查看GitHub上的整个项目

注意:由于本演示构建在即将发布的0.10.x版本的特性之上,因此您要么需要与Kafka流或Apache Kafka主干或本地安装的任何0.10.x版本的技术预览相结合的平台

维基百科卡夫卡流演示 —— 步骤

步骤

上面是连接器、主题和处理器的集成拓扑的概述图。

首先,我们需要介绍通用的IRC连接器。它在这里由3个类实现。主类IRCFeedConnector接受IRC主机和端口配置、通道列表和输出主题。在Kafka Connect设计中,连接器是序列化无关的。这使得它们可以在不同的上下文中使用,例如,如果您运行Kafka Connect服务并使用Avro序列化器进行配置,那么您可以像这样配置IRC连接器,无需修改即可重用它:

1
2
3
4
5
6
7
name=wikipedia-irc-source
connector.class=io.amient.kafka.connect.irc.IRCFeedConnector
tasks.max=10
irc.host=rc.wikimedia.org
irc.port=6667
irc.channels=#en.wikipedia,#en.wiktionary,#en.wikinews
topic=wikipedia-raw

在我们的主类WikipediaStreamDemo中,我们用.createWikipediaFeedConnectInstance()方法以编程方式提供这个配置。结果是一样的——不管运行多少个应用程序实例,我们最多只能得到3个任务,每个通道一个任务。因为我们正在启动一个嵌入式实例,所以我们还必须提供辅助配置、转换器等。为了保持演示的简洁,我们使用的org.apache.kafka.connect.json.JsonConverter附带Kafka,由于底层设计,它可以自动为任何源模式工作。

createWikipediaFeedConnectInstance

连接实例配置,剩下是调用方法.start() ,我们有效地集成连接器为我们的应用程序,我们现在有一个协调组运行在它自己的线程池,确保主题wikipedia-raw将一连串的json表示的通用IRCMessage维基百科服务器。

现在我们可以看一下.main(…)方法中定义的集成拓扑,它实际上是程序的唯一例程,其余的都是声明性的。

main

在connect实例启动之后,流拓扑实例化并启动。该方法的其余部分只是确保在关闭时这两个服务都被干净地停止。最后一段代码是.createWikipediaStreamsInstance()方法,它声明了流拓扑。

createWikipediaStreamsInstance

在创建json serde helper实例之后,我们声明第一个KStream<JsonNode, JsonNode> wikipediaRaw。除了从连接器离开的地方开始。

在下一段代码中,我们使用Java 8的方法引用来声明这个wikipedia-raw消息json流上的转换,以保持简洁。
方法.parseIRC()是从Hello Samza的作者那里借来的,用于提取和解析原始irc消息到WikipediaMessage。此方法还将转换后的流的键更改为提交编辑的用户的字符串用户名,这对于正确的每个用户聚合是必要的。
过滤掉任何表示解析器无法理解的消息的空值,剩下的就是一个<String,WikipediaMessage>流。
使用KStream的.through(..)方法将这个流持久化到主题wikipedia-parsed中——使用一个通用的jsonPOJOSerde<WikipediaMessage>,我们说这个主题的内容也将是json序列化的。
用于分区的键将是前面转换声明的作者的用户名。

最后一个KTable<String, Long>是通过过滤和聚合使用.countByKey()解析的wikipedia消息流来声明的——这也是我们的最终输出,它使用低级.process()方法连续打印更新。

Key Takeaways

因此,我们有一个完整的流处理应用程序,它从公共IRC服务器提取数据并计算一些使用分析。如果重新启动它,您会注意到它还记得每个用户的计数的最后一种状态——KTable保存在一个更改日志主题中,下次将从该主题中恢复。

我们将说明关于前面一点的降级:我们现在可以在终端中运行hello-kafka流的一个实例,在IDE中运行第二个实例,以调试实例在作为组的一部分时的行为——这在像YARN这样的框架中长期难以实现。

由于不涉及部署框架,且所有实例都是相同的(包括配置,在本例中,配置是直接烧入主类的),因此所有部署选项都是可用的;无论您选择将流处理应用程序打包到Docker中并在Mesos上运行,还是从Chef更改为Puppet进行配置管理,流处理应用程序都将保持不变。

需要注意的一点是,当应用程序被关闭时,IRC通道没有任何处理,这将导致数据丢失,因为没有计算编辑。每当连接器组中的工作发生重新平衡时,都会发生相同的数据丢失,但这只是IRC通道的一个特性—如果脱机,就会丢失消息,并且其中没有补偿的概念。如果你想要更多的无损的这个演示最好的变体,你所能做的就是减少应用程序停机时间是另一篇文章的主题也许——或者使用一个数据源有偏移的概念,至少一个时间戳,卡夫卡连接将愉快地做管理的工作最后一个可靠地消耗补偿。

至于Kafka流和Kafka Connect的集成,有一种情况是两者之间的一级集成,这样连接器就可以直接映射到KStream,从而允许在连接器的输出上直接应用任何流转换。例如,在这个wikipedia演示中,我们可以消除wiki-raw主题,并立即按用户名应用解析和分区,在不牺牲应用程序任何有用特性的情况下改善延迟和存储占用。

首先让我们看一种可能的实现。这里,我们重用了大部分现有的代码,并在流构建器上引入了一个新的重载方法.stream(…),该方法没有将主题名作为最后一个参数,而是采用一个与ConnectEmbedded类似的实例,只不过我们将它称为EmbeddedConnectSource:

createWikipediaStreamsInstance

上面,方法方法createConnectSource()使用与我们的工作演示解决方案的方法createWikipediaFeedConnectInstance()相同的配置。我们使用JsonConverter将源IRCMessage转换为序列化的json。但是请注意,我们立即告诉流构建器将这些序列化的数据解析回JsonNode,而JsonNode又被解构为WikipediaMessage。如果分析这个实现,您可能会发现这个转换、解析和解构占用了整个程序的大部分CPU周期。

如果只给流构建器连接器配置(而不是工作配置)并返回SchemaAndValue对象流,则可以避免转换为json并随后解析到JsonNode的更有效版本。这将需要公开一些连接器内部,而且在Kafka端实现起来比较困难,但是可以节省周期,而且在应用程序的初始化代码方面也比较少噪音:

createWikipediaStreamsInstance

不管怎样,集成API很快就会出现。请继续关注!

结论

  • 建在正确的基础上: Apache Kafka中的基本原语已经被证明是成功的,并且为实现流处理操作提供了正确的构建块,无论是实时摄取还是流处理。由此产生的简单性在基础设施工具方面大有裨益。替代方案包括为许多不同的系统构建和操作插件,因此与在数据中心中心拥有一个平台相比,它们的吸引力要小得多。

  • 易于接近和开发人员友好: 使用Connect和Streams(包括嵌入式运行时扩展)编写演示程序花费的时间(一天)与理解、打包和在真实集群上部署Hello Samza所用的时间差不多

  • 你所需要的: 将Kafka、Kafka Connect和Kafka流一起使用不仅是可能的,而且它还提供了编写流处理应用程序的完整工具。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2021 朝着牛逼的道路一路狂奔 All Rights Reserved.

访客数 : | 访问量 :