当前位置: 首页 > news >正文

Flink介绍——实时计算核心论文之Kafka论文详解

引入

我们通过S4和Storm论文的以下文章,已经对S4和Storm有了不错的认识:

  • S4论文详解
  • S4论文总结
  • Storm论文详解
  • Storm论文总结

不过,在讲解这两篇论文的时候,我们其实没有去搞清楚对应的流式数据是从哪里来的。虽然S4里有Keyless PE,Storm里也有Spout,它们都是框架自己提供的发送流式数据的机制,但是这些框架本身并不能产生数据。我们需要有一个技术栈,能够把各种应用服务器产生的数据,传输到这些流式数据处理系统。

其实,不只是流式数据处理系统有这个需求,我们之前讲解过的GFS/MapReduce这些分布式文件系统,以及大数据批处理系统,一样面临这个“ 数据从哪里来”的问题。

这个问题,也就是我们今天要探讨的主题,就是我们应该通过一个什么样的系统,来传输数据。这个系统需要满足哪些需求,整个系统架构应该怎么设计。而对于这个问题的解答,就是开源的Kafka系统。

同样在2011年,来自LinkedIn的三位工程师,一起发表了《Kafka: a Distributed Messaging System for Log Processing》这样一篇论文,并且把论文里描述的这个系统Kafka开源。这篇论文,可以说帮我们圆上了整个大数据系统的最后一个环节,就是高性能、高可用的数据传输。

虽然后续,整个大数据领域仍然有不断的迭代更新,但是有了Kafka之后,通过Hadoop/Spark进行批数据处理,通过Hive搭建数据仓库,通过Storm进行流式数据处理,然后通过Kafka作为业务系统和大数据系统之间的消息管道,已经是一个完整而成熟的“标准方案”了。可以说,随着Kafka的发布,整个大数据领域开始迈入一个成熟的阶段。大部分公司都可以通过组合开源框架,搭建起完善的大数据系统,而不再需要自己去“造轮子”了。

Kafka:用于日志处理的分布式消息系统

摘要

日志处理已成为消费互联网公司数据管道的关键组成部分。我们推出了 Kafka,这是一个分布式消息系统,旨在以低延迟收集和传输大量日志数据。我们的系统融合了现有日志聚合器和消息系统的理念,适用于离线和在线消息消费。为使系统高效且可扩展,我们在 Kafka 中做出了不少非传统但实用的设计选择。实验结果表明,与两款流行的消息系统相比,Kafka 具有卓越的性能。我们已在生产环境中使用 Kafka 一段时间,它每天处理数百 GB 的新数据。

通用术语

管理、性能、设计、实验

关键词

消息传递、分布式、日志处理、吞吐量、在线

1. 引言

任何一家规模较大的互联网公司都会产生大量的 “日志” 数据。这些数据通常包括:(1)与登录、页面浏览量、点击量、“点赞”、分享、评论和搜索查询等对应的用户活动事件;(2)操作指标,如服务调用栈、调用延迟、错误,以及每台机器上的 CPU、内存、网络或磁盘利用率等系统指标。长期以来,日志数据一直是用于跟踪用户参与度、系统利用率和其他指标的分析数据的一部分。

然而,互联网应用的最新趋势使得活动数据成为生产数据管道的一部分,直接应用于网站功能中。这些应用包括:(1)搜索相关性;(2)推荐系统,其可能由活动流中的项目流行度或共现情况驱动;(3)广告定位与报告;(4)防范垃圾邮件或未经授权的数据抓取等滥用行为的安全应用;(5)为 “朋友” 或 “联系人” 聚合用户状态更新或操作以供阅读的动态消息功能。

日志数据在生产环境中的实时使用给数据系统带来了新的挑战,因为其数据量比 “真实” 数据大几个数量级。例如,搜索、推荐和广告通常需要计算精细的点击率,这不仅会为每次用户点击生成日志记录,还会为页面上未被点击的数十个项目生成日志记录。中国移动每天收集 5 - 8TB 的通话记录 [11],而 Facebook 每天收集近 6TB 的各种用户活动事件 [12]。

许多早期处理此类数据的系统依赖于从生产服务器上物理抓取日志文件进行分析。近年来,已经构建了一些专门的分布式日志聚合器,包括 Facebook 的 Scribe [6]、雅虎的 Data Highway [4] 和 Cloudera 的 Flume [3]。这些系统主要设计用于收集日志数据并将其加载到数据仓库或 Hadoop [8] 中以供离线使用。在领英(一个社交网站),我们发现除了传统的离线分析之外,我们还需要支持上述大多数实时应用,且延迟不超过几秒。

我们构建了一个名为 Kafka [18] 的新型日志处理消息系统,它结合了传统日志聚合器和消息系统的优点。一方面,Kafka 具有分布式和可扩展性,并提供高吞吐量。另一方面,Kafka 提供了类似于消息系统的 API,允许应用程序实时消费日志事件。Kafka 已开源,并在领英的生产环境中成功使用了 6 个多月。它极大地简化了我们的基础设施,因为我们可以使用单一软件来在线和离线消费所有类型的日志数据。

本文其余部分的组织如下。我们将在第 2 节回顾传统的消息系统和日志聚合器。在第 3 节,我们描述 Kafka 的架构及其关键设计原则。我们在第 4 节描述 Kafka 在领英的部署情况,并在第 5 节给出 Kafka 的性能结果。我们在第 6 节讨论未来的工作并得出结论。

2. 相关工作

传统的企业消息系统 [1][7][15][17] 已存在很长时间,并且作为处理异步数据流的事件总线,常常发挥着关键作用。然而,它们不太适合日志处理,原因有以下几点。

首先,企业系统所提供的功能存在不匹配的情况。这些系统往往侧重于提供丰富的投递保证。例如,IBM Websphere MQ [7] 具备事务支持,允许应用程序以原子操作的方式将消息插入多个队列。JMS [14] 规范允许在消息被消费后进行确认,甚至可能是无序确认。对于收集日志数据而言,这样的投递保证通常是过度的。比如说,偶尔丢失几个页面浏览事件绝非世界末日。这些不必要的功能往往会增加系统 API 以及底层实现的复杂性。

其次,许多系统并未将吞吐量作为首要设计约束给予足够重视。例如,JMS 没有提供 API 让生产者将多条消息显式地批量处理为单个请求。这意味着每条消息都需要完整的 TCP/IP 往返通信,对于我们领域的吞吐量需求而言,这是不可行的。

第三,这些系统在分布式支持方面较为薄弱。没有简便的方法在多台机器上对消息进行分区和存储。

最后,许多消息系统假定消息会被近乎立即消费,所以未消费消息的队列通常相当小。如果允许消息累积,就像数据仓库应用这类离线消费者的情况,它们并非持续消费,而是定期进行大规模加载,那么这些系统的性能会显著下降。

在过去几年中,已经构建了一些专门的日志聚合器。Facebook 使用一个名为 Scribe 的系统。每台前端机器可以通过套接字将日志数据发送到一组 Scribe 机器。每台 Scribe 机器聚合日志条目,并定期将它们转储到 HDFS [9] 或 NFS 设备。雅虎的数据高速公路项目有类似的数据流。一组机器从客户端聚合事件,并生成 “分钟” 文件,然后将这些文件添加到 HDFS。Flume 是 Cloudera 开发的一个相对较新的日志聚合器。它支持可扩展的 “管道” 和 “接收器”,使流式日志数据的处理非常灵活。它还具备更完善的分布式支持。然而,这些系统大多是为离线消费日志数据而构建的,并且常常不必要地向消费者暴露实现细节(例如 “分钟文件”)。此外,它们中的大多数使用 “推送” 模型,即代理将数据转发给消费者。在领英,我们发现 “拉取” 模型更适合我们的应用,因为每个消费者可以以其能够承受的最大速率检索消息,避免被推送速度超过其处理能力的消息淹没。拉取模型还便于消费者回滚,我们将在 3.2 节末尾讨论这一优点。

最近,雅虎研究院开发了一个名为 HedWig [13] 的新型分布式发布 / 订阅系统。HedWig 具有高度的可扩展性和可用性,并提供强大的持久性保证。然而,它主要用于存储数据存储的提交日志。

3. Kafka 架构与设计原则

由于现有系统存在局限性,我们开发了一种基于消息传递的新型日志聚合器 Kafka。我们首先介绍 Kafka 中的基本概念。特定类型的消息流由一个主题(topic)定义。生产者(producer)可以将消息发布到某个主题。发布的消息随后存储在一组称为代理(broker)的服务器上。消费者(consumer)可以从代理订阅一个或多个主题,并通过从代理拉取数据来消费已订阅的消息。

消息传递在概念上很简单,我们也努力使 Kafka API 同样简洁以体现这一点。我们不展示确切的 API,而是给出一些示例代码来说明如何使用该 API。下面是生产者的示例代码。一条消息被定义为仅包含字节负载。用户可以选择自己喜欢的序列化方法对消息进行编码。为提高效率,生产者可以在单个发布请求中发送一组消息。

生产者示例代码

producer = new Producer(…);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);

要订阅一个主题,消费者首先为该主题创建一个或多个消息流。发布到该主题的消息将平均分配到这些子流中。关于 Kafka 如何分配消息的详细信息将在 3.2 节中介绍。每个消息流为持续生成的消息流提供一个迭代器接口。然后,消费者遍历流中的每条消息并处理消息的负载。与传统迭代器不同,消息流迭代器永远不会终止。如果当前没有更多消息可消费,迭代器将阻塞,直到有新消息发布到该主题。我们既支持点对点(point - to - point)传递模型,即多个消费者共同消费一个主题中所有消息的单个副本,也支持发布 / 订阅(publish/subscribe)模型,即多个消费者各自检索一个主题的自己的副本。

消费者示例代码

streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {bytes = message.payload();// 对字节进行处理
}

Kafka 的整体架构如图 1 所示。由于 Kafka 本质上是分布式的,Kafka 集群通常由多个代理组成。为平衡负载,一个主题被划分为多个分区(partition),每个代理存储其中一个或多个分区。多个生产者和消费者可以同时发布和检索消息。在 3.1 节中,我们描述单个代理上单个分区的布局,以及为实现高效访问分区而做出的一些设计选择。在 3.2 节中,我们描述在分布式环境中生产者和消费者如何与多个代理进行交互。我们将在 3.3 节讨论 Kafka 的投递保证。

图 1. Kafka 架构 

3.1 单个分区的效率优化

为了让 Kafka 系统高效运行,我们做了一些设计决策。

  1. 简单存储:Kafka 的存储布局非常简单。每个主题的分区对应一个逻辑日志。在物理上,日志被实现为一组大小大致相同(例如 1GB)的段文件。每次生产者向分区发布消息时,代理只是将消息追加到最后一个段文件中。为了提高性能,我们仅在发布了可配置数量的消息或经过一定时间后,才将段文件刷新到磁盘。消息只有在刷新后才会对消费者可见。
    与典型的消息系统不同,Kafka 中存储的消息没有显式的消息 ID。相反,每个消息由其在日志中的逻辑偏移量来定位。这避免了维护辅助的、随机访问密集型索引结构的开销,这些结构通常用于将消息 ID 映射到实际的消息位置。请注意,我们的消息 ID 是递增的,但不连续。要计算下一条消息的 ID,我们必须将当前消息的长度加到其 ID 上。从现在起,我们将交替使用消息 ID 和偏移量这两个术语。
    消费者总是按顺序从特定分区消费消息。如果消费者确认了某个特定的消息偏移量,这意味着消费者已经接收到该分区中该偏移量之前的所有消息。实际上,消费者向代理发出异步拉取请求,以便为应用程序准备好数据缓冲区以供消费。每个拉取请求包含开始消费的消息偏移量和可接受的获取字节数。每个代理在内存中维护一个已排序的偏移量列表,其中包括每个段文件中第一条消息的偏移量。代理通过搜索偏移量列表来定位请求消息所在的段文件,然后将数据发送回消费者。消费者接收到消息后,计算下一条要消费的消息的偏移量,并在下一个拉取请求中使用它。Kafka 日志和内存索引的布局如图 2 所示。每个方框表示一条消息的偏移量。
  2. 高效传输:我们在 Kafka 数据的传入和传出方面非常谨慎。前面我们已经提到,生产者可以在单个发送请求中提交一组消息。虽然最终消费者 API 一次迭代一条消息,但实际上,消费者的每个拉取请求也会检索多个消息,直到达到一定大小,通常是几百 KB。
    我们做出的另一个非传统选择是避免在 Kafka 层显式地在内存中缓存消息。相反,我们依赖底层文件系统的页面缓存。这主要有避免双重缓冲的好处 —— 消息只在页面缓存中缓存。这还有一个额外的好处,即使代理进程重启,热缓存也能保留。由于 Kafka 根本不在进程中缓存消息,因此其在垃圾回收内存方面的开销非常小,使得在基于虚拟机的语言中实现高效成为可能。最后,由于生产者和消费者都按顺序访问段文件,并且消费者通常仅略微滞后于生产者,正常的操作系统缓存策略非常有效(特别是直写式缓存和预读)。我们发现,无论是生产还是消费,其性能与数据大小都呈一致的线性关系,即使数据量达到数 TB 也是如此。
    此外,我们还优化了消费者的网络访问。Kafka 是一个多订阅者系统,一条消息可能会被不同的消费者应用多次消费。将字节从本地文件发送到远程套接字的典型方法涉及以下步骤:(1)将数据从存储介质读取到操作系统的页面缓存中;(2)将页面缓存中的数据复制到应用程序缓冲区;(3)将应用程序缓冲区的数据复制到另一个内核缓冲区;(4)将内核缓冲区的数据发送到套接字。这包括 4 次数据复制和 2 次系统调用。在 Linux 和其他 Unix 操作系统上,存在一个sendfile API [5],它可以直接将字节从文件通道传输到套接字通道。这通常可以避免步骤(2)和(3)中引入的 2 次复制和 1 次系统调用。Kafka 利用sendfile API 将日志段文件中的字节高效地从代理传递给消费者。
  3. 无状态代理:与大多数其他消息系统不同,在 Kafka 中,关于每个消费者已消费多少信息不是由代理维护,而是由消费者自身维护。这样的设计减少了代理的很多复杂性和开销。然而,这使得删除消息变得棘手,因为代理不知道所有订阅者是否都已消费了该消息。Kafka 通过使用基于简单时间的服务级别协议(SLA)来解决这个问题。如果一条消息在代理中保留的时间超过特定时间段(通常是 7 天),它将被自动删除。这个解决方案在实践中效果良好。大多数消费者,包括离线消费者,要么每天、每小时完成消费,要么实时完成消费。Kafka 的性能不会随着数据量的增大而下降,这使得长时间保留数据成为可能。
    这种设计还有一个重要的附带好处。消费者可以故意回退到旧的偏移量并重新消费数据。这违反了队列的常见约定,但事实证明这对许多消费者来说是一个基本功能。例如,当消费者的应用逻辑出现错误时,应用程序在错误修复后可以重新播放某些消息。这对于将 ETL 数据加载到我们的数据仓库或 Hadoop 系统中尤为重要。再例如,已消费的数据可能仅定期刷新到持久存储(例如全文索引器)。如果消费者崩溃,未刷新的数据就会丢失。在这种情况下,消费者可以检查点未刷新消息的最小偏移量,并在重启时从该偏移量重新消费。我们注意到,在拉取模型中支持消费者回退比在推送模型中要容易得多。

图 2. Kafka 日志 

3.2 分布式协调

我们现在描述生产者和消费者在分布式环境中的行为。每个生产者可以将消息发布到随机选择的分区,或者发布到由分区键和分区函数语义确定的分区。我们将重点关注消费者与代理之间的交互。
Kafka 有消费者组的概念。每个消费者组由一个或多个消费者组成,它们共同消费一组订阅的主题,即每条消息仅被传递给组内的一个消费者。不同的消费者组各自独立地消费完整的订阅消息集,不同消费者组之间无需协调。同一组内的消费者可以在不同的进程或不同的机器上。我们的目标是在不引入过多协调开销的情况下,将存储在代理中的消息平均分配给消费者。

我们的第一个决策是将主题内的分区作为最小的并行单元。这意味着在任何给定时间,每个消费者组内,来自一个分区的所有消息仅由单个消费者消费。如果我们允许多个消费者同时消费单个分区,它们就必须协调谁消费哪些消息,这就需要锁和状态维护开销。相比之下,在我们的设计中,只有在消费者进行负载重新平衡时,消费进程才需要协调,而这是一个不频繁发生的事件。为了实现真正的负载平衡,我们要求一个主题中的分区数量比每个组中的消费者数量多得多。我们可以通过对主题进行过度分区轻松实现这一点。

我们做出的第二个决策是不设置中央 “主” 节点,而是让消费者以去中心化的方式相互协调。添加主节点会使系统复杂化,因为我们还得进一步担心主节点故障。为了便于协调,我们采用了高度可用的共识服务 Zookeeper [10]。Zookeeper 有一个非常简单的类似文件系统的 API。用户可以创建路径、设置路径的值、读取路径的值、删除路径以及列出路径的子节点。它还能做一些更有趣的事情:(a)可以在路径上注册一个监听器,并在路径的子节点或路径的值发生变化时收到通知;(b)路径可以创建为临时的(与持久的相对),这意味着如果创建客户端消失,Zookeeper 服务器会自动删除该路径;(c)Zookeeper 将其数据复制到多个服务器,这使得数据具有高度的可靠性和可用性。

Kafka 使用 Zookeeper 执行以下任务:(1)检测代理和消费者的添加和移除;(2)当上述事件发生时,触发每个消费者的重新平衡过程;(3)维护消费关系并跟踪每个分区的已消费偏移量。
具体来说,每个代理或消费者启动时,它会将自身信息存储在 Zookeeper 的代理注册表或消费者注册表中。代理注册表包含代理的主机名和端口,以及存储在其上的主题和分区集。消费者注册表包括消费者所属的消费者组以及它订阅的主题集。每个消费者组在 Zookeeper 中与一个所有权注册表和一个偏移量注册表相关联。所有权注册表为每个订阅分区都有一个路径,路径值是当前从该分区消费的消费者的 ID(我们使用 “消费者拥有此分区” 这一术语)。偏移量注册表为每个订阅分区存储该分区中最后消费的消息的偏移量。

在 Zookeeper 中创建的路径,对于代理注册表、消费者注册表和所有权注册表是临时的,对于偏移量注册表是持久的。如果一个代理发生故障,其上的所有分区将自动从代理注册表中删除。消费者的故障会导致它在消费者注册表中的条目以及它在所有权注册表中拥有的所有分区丢失。每个消费者在代理注册表和消费者注册表上都注册一个 Zookeeper 监听器,每当代理集或消费者组发生变化时,都会收到通知。

在消费者初始启动或通过监听器收到代理 / 消费者变化通知时,消费者会启动一个重新平衡过程,以确定它应该从哪些新的分区子集进行消费。该过程如算法 1 所述。通过从 Zookeeper 读取代理和消费者注册表,消费者首先计算每个订阅主题 T 可用的分区集(PT)以及订阅 T 的消费者集(CT)。然后,它将 PT 范围划分为 | CT | 个块,并确定性地选择一个块来 “拥有”。对于消费者选择的每个分区,它将自己写入所有权注册表,作为该分区的新所有者。最后,消费者启动一个线程,从每个 “拥有” 的分区拉取数据,从偏移量注册表中存储的偏移量开始。当从分区拉取消息时,消费者会定期更新偏移量注册表中最新消费的偏移量。

算法 1:组 G 中消费者 Ci 的再平衡过程
对于消费者 Ci 订阅的每个主题 T {
        从所有权注册表中移除消费者 Ci 所拥有的分区
        从 Zookeeper 读取代理和消费者注册表
        计算 PT = 主题 T 下所有代理中可用的分区
        计算 CT = 组 G 中订阅主题 T 的所有消费者
        对 PT 和 CT 进行排序
        设 j 为消费者 Ci 在 CT 中的索引位置,且设 N = |PT|/|CT|
        将 PT 中从 j * N 到 (j + 1) * N - 1 的分区分配给消费者 Ci
        对于每个分配到的分区 p {
                在所有权注册表中将 p 的所有者设置为 Ci
                设 Op 为偏移量注册表中存储的分区 p 的偏移量
                启动一个线程从偏移量 Op 开始拉取分区 p 中的数据
        }
}

当一个组内有多个消费者时,每个消费者都会收到代理或消费者变化的通知。然而,通知可能在不同消费者处稍有延迟。因此,有可能一个消费者试图获取仍被另一个消费者拥有的分区的所有权。发生这种情况时,第一个消费者只需释放它当前拥有的所有分区,稍作等待,然后重试重新平衡过程。在实践中,重新平衡过程通常经过几次重试后就会稳定下来。
当创建一个新的消费者组时,偏移量注册表中没有可用的偏移量。在这种情况下,消费者将从每个订阅分区上可用的最小或最大偏移量(取决于配置)开始,使用我们在代理上提供的 API。

3.3 投递保证

一般来说,Kafka 仅保证至少一次投递。恰好一次投递通常需要两阶段提交,而这对我们的应用程序来说并非必要。大多数情况下,一条消息会恰好一次投递到每个消费者组。然而,如果一个消费者进程在未正常关闭的情况下崩溃,接管故障消费者所拥有分区的消费者进程可能会收到一些重复的消息,这些消息是在成功提交到 Zookeeper 的最后偏移量之后的。如果应用程序在意重复消息,它必须添加自己的去重逻辑,可以使用我们返回给消费者的偏移量,或者消息内的一些唯一键。这通常比使用两阶段提交更具成本效益。

Kafka 保证来自单个分区的消息按顺序投递到消费者。但是,对于来自不同分区的消息的顺序没有保证。

为了避免日志损坏,Kafka 在日志中为每个消息存储一个 CRC 校验和。如果代理上发生任何 I/O 错误,Kafka 会运行一个恢复过程,删除那些 CRC 不一致的消息。在消息级别拥有 CRC 还允许我们在消息生产或消费后检查网络错误。

如果一个代理宕机,存储在其上尚未消费的任何消息将变得不可用。如果代理上的存储系统永久损坏,任何未消费的消息将永远丢失。未来,我们计划在 Kafka 中添加内置复制功能,以便在多个代理上冗余存储每个消息。

4. Kafka 在领英的应用

在本节中,我们将描述领英是如何使用 Kafka 的。图 3 展示了我们部署的简化版本。在每个运行面向用户服务的数据中心,我们都有一个与之共处一地的 Kafka 集群。前端服务生成各种日志数据,并批量发布到本地的 Kafka 代理。我们依靠硬件负载均衡器将发布请求均匀地分发到一组 Kafka 代理上。Kafka 的在线消费者运行在同一数据中心的服务中。

图 3. Kafka 部署情况 

我们还在一个单独的数据中心部署了一个 Kafka 集群用于离线分析,该数据中心在地理位置上靠近我们的 Hadoop 集群和其他数据仓库基础设施。这个 Kafka 实例运行一组嵌入式消费者,从生产数据中心的 Kafka 实例中拉取数据。然后,我们运行数据加载作业,将数据从这个 Kafka 副本集群拉取到 Hadoop 和我们的数据仓库中,在那里我们对数据运行各种报告作业和分析流程。我们还使用这个 Kafka 集群进行原型设计,并且能够针对原始事件流运行简单脚本进行临时查询。无需太多调整,整个数据管道的端到端延迟平均约为 10 秒,足以满足我们的需求。

目前,Kafka 每天积累数百 GB 的数据和近 10 亿条消息,随着我们完成对遗留系统的转换以利用 Kafka 的优势,预计这一数据量将显著增长。未来还会添加更多类型的消息。当运维人员为软件或硬件维护启动或停止代理时,再平衡过程能够自动重新定向消息消费。

我们的跟踪还包括一个审计系统,以验证整个数据管道中没有数据丢失。为了便于实现这一点,每条消息在生成时都会携带时间戳和服务器名称。我们对每个生产者进行了设置,使其定期生成一个监控事件,该事件记录了在固定时间窗口内该生产者为每个主题发布的消息数量。生产者将监控事件发布到 Kafka 的一个单独主题中。然后,消费者可以统计从给定主题接收到的消息数量,并与监控事件中的计数进行验证,以确保数据的正确性。

将数据加载到 Hadoop 集群是通过实现一种特殊的 Kafka 输入格式来完成的,这种格式允许 MapReduce 作业直接从 Kafka 读取数据。一个 MapReduce 作业加载原始数据,然后对其进行分组和压缩,以便将来进行高效处理。无状态代理和客户端对消息偏移量的存储在这里再次发挥作用,使得 MapReduce 任务管理(允许任务失败并重新启动)能够以自然的方式处理数据加载,在任务重启时不会重复或丢失消息。只有在作业成功完成时,数据和偏移量才会存储在 HDFS 中。

我们选择使用 Avro [2] 作为我们的序列化协议,因为它效率高且支持模式演进。对于每条消息,我们在负载中存储其 Avro 模式的 ID 和序列化后的字节。这种模式使我们能够强制实施一种契约,以确保数据生产者和消费者之间的兼容性。我们使用一个轻量级的模式注册服务将模式 ID 映射到实际模式。当消费者获取到一条消息时,它会在模式注册中心查找以检索模式,该模式用于将字节解码为对象(由于值是不可变的,每个模式只需进行一次这种查找)。

5. 实验结果

我们进行了一项实验研究,将 Kafka 的性能与 Apache ActiveMQ v5.4 [1](一种流行的 JMS 开源实现)和 RabbitMQ v2.4 [16](一个以性能著称的消息系统)进行比较。我们使用了 ActiveMQ 的默认持久化消息存储 KahaDB。虽然此处未展示,但我们也测试了另一种 AMQ 消息存储,发现其性能与 KahaDB 非常相似。只要有可能,我们都尽量在所有系统中使用可比较的设置。

我们在两台 Linux 机器上进行实验,每台机器有 8 个 2GHz 核心、16GB 内存、6 块组成 RAID 10 的磁盘。两台机器通过 1Gb 网络链路连接。其中一台机器用作代理,另一台用作生产者或消费者。

  1. 生产者测试:我们将所有系统中的代理配置为异步将消息刷新到其持久化存储。对于每个系统,我们运行单个生产者总共发布 1000 万条消息,每条消息大小为 200 字节。我们将 Kafka 生产者配置为以大小为 1 和 50 的批次发送消息。ActiveMQ 和 RabbitMQ 似乎没有简单的方法来批量发送消息,我们假设它们使用的批次大小为 1。结果如图 4 所示。x 轴表示随时间发送到代理的数据量(以 MB 为单位),y 轴表示生产者每秒发布的消息吞吐量。平均而言,Kafka 在批次大小为 1 和 50 时,分别可以每秒发布 50,000 条和 400,000 条消息。这些数字比 ActiveMQ 高出几个数量级,并且至少比 RabbitMQ 高出 2 倍。
    Kafka 性能更好有几个原因。首先,Kafka 生产者目前不等待代理的确认,而是以代理能够处理的最快速度发送消息。这显著提高了发布者的吞吐量。当批次大小为 50 时,单个 Kafka 生产者几乎使生产者和代理之间的 1Gb 链路达到饱和。对于日志聚合的情况,这是一种有效的优化,因为数据必须异步发送,以避免给实时流量服务引入任何延迟。我们注意到,如果不确认生产者,就不能保证每个发布的消息都实际被代理接收。对于许多类型的日志数据,只要丢失的消息数量相对较少,用持久性换取吞吐量是可取的。不过,我们确实计划在未来针对更关键的数据解决持久性问题。其次,Kafka 具有更高效的存储格式。平均而言,Kafka 中每条消息的开销为 9 字节,而 ActiveMQ 中为 144 字节。这意味着 ActiveMQ 存储同样的 1000 万条消息比 Kafka 多使用 70% 的空间。ActiveMQ 的一部分开销来自 JMS 要求的庞大消息头。另一部分开销是维护各种索引结构的成本。我们观察到,ActiveMQ 中最繁忙的一个线程大部分时间都在访问 B 树以维护消息元数据和状态。最后,通过分摊 RPC 开销,批量发送极大地提高了吞吐量。在 Kafka 中,批次大小为 50 条消息时,吞吐量几乎提高了一个数量级。
  2. 消费者测试:在第二个实验中,我们测试了消费者的性能。同样,对于所有系统,我们使用单个消费者检索总共 1000 万条消息。我们对所有系统进行配置,使每个拉取请求大约预取相同数量的数据 - 最多 1000 条消息或约 200KB。对于 ActiveMQ 和 RabbitMQ,我们将消费者确认模式设置为自动。由于所有消息都能放入内存,所有系统都从底层文件系统的页面缓存或一些内存缓冲区提供数据服务。结果如图 5 所示。
    平均而言,Kafka 每秒消费 22,000 条消息,是 ActiveMQ 和 RabbitMQ 的 4 倍多。我们可以想到几个原因。首先,由于 Kafka 具有更高效的存储格式,在 Kafka 中从代理传输到消费者的字节数更少。其次,ActiveMQ 和 RabbitMQ 中的代理都必须维护每条消息的投递状态。我们观察到,在这个测试过程中,ActiveMQ 的一个线程忙于将 KahaDB 页面写入磁盘。相比之下,Kafka 代理上没有磁盘写入活动。最后,通过使用 sendfile API,Kafka 降低了传输开销。

图 4. 生产者性能

 

图 5. 消费者性能

我们在本节结尾指出,实验的目的不是要表明其他消息系统不如 Kafka。毕竟,ActiveMQ 和 RabbitMQ 都比 Kafka 具有更多的功能。主要目的是说明一个专门的系统能够实现的潜在性能提升。

6. 结论与未来工作

我们提出了一个名为 Kafka 的新颖系统,用于处理大量的日志数据流。与消息系统类似,Kafka 采用基于拉取的消费模型,允许应用程序按照自己的速率消费数据,并在需要时回退消费。通过专注于日志处理应用,Kafka 实现了比传统消息系统高得多的吞吐量。它还提供了集成的分布式支持并且可以横向扩展。我们已经在领英成功地将 Kafka 用于离线和在线应用。

未来我们希望从多个方向继续探索。首先,我们计划在多个代理之间添加消息的内置复制功能,即使在机器发生不可恢复的故障时,也能保证数据的持久性和可用性。我们希望同时支持异步和同步复制模型,以便在生产者延迟和提供的保证强度之间进行权衡。应用程序可以根据其对持久性、可用性和吞吐量的要求选择合适的冗余级别。其次,我们希望在 Kafka 中添加一些流处理能力。从 Kafka 检索消息后,实时应用程序通常会执行类似的操作,例如基于窗口的计数,以及将每条消息与二级存储中的记录或另一个流中的消息进行连接。在最低层面上,这可以通过在发布期间根据连接键对消息进行语义分区来支持,这样所有带有特定键的消息都会进入同一个分区,从而到达单个消费者进程。这为在一组消费者机器上处理分布式流提供了基础。在此基础上,我们认为一个有用的流实用工具库,例如不同的窗口函数或连接技术,将对这类应用有益。

7. 参考文献

[1] http://activemq.apache.org/
[2] http://avro.apache.org/
[3] Cloudera’s Flume, https://github.com/cloudera/flume
[4] http://developer.yahoo.com/blogs/hadoop/posts/2010/06/ena
bling_hadoop_batch_processi_1/
[5] Efficient data transfer through zero copy:
https://www.ibm.com/developerworks/linux/library/jzerocopy/
[6] Facebook’s Scribe,
http://www.facebook.com/note.php?note_id=32008268919
[7] IBM Websphere MQ: http://www01.ibm.com/software/integration/wmq/
[8] http://hadoop.apache.org/
[9] http://hadoop.apache.org/hdfs/
[10] http://hadoop.apache.org/zookeeper/
[11] http://www.slideshare.net/cloudera/hw09-hadoop-baseddata-mining-platform-for-the-telecom-industry
[12] http://www.slideshare.net/prasadc/hive-percona-2009
[13] https://issues.apache.org/jira/browse/ZOOKEEPER-775
[14] JAVA Message Service:
http://download.oracle.com/javaee/1.3/jms/tutorial/1_3_1-
fcs/doc/jms_tutorialTOC.html.
[15] Oracle Enterprise Messaging Service: http://www.oracle.com/technetwork/middleware/ias/index093455.html
[16] http://www.rabbitmq.com/
[17] TIBCO Enterprise Message Service: http://www.tibco.com/products/soa/messaging/
[18] Kafka, http://sna-projects.com/kafka/

相关文章:

  • MCP认证难题破解
  • 文件上传Ⅰ
  • pgsql中使用jsonb的mybatis-plus和jps的配置
  • 微信小程序调用yolo目标检测模型
  • 仿腾讯会议项目开发——网络嵌入
  • AWS Elastic Beanstalk的部署Python Flask后端服务(Hello,World)
  • Hadoop的三大结构及其作用?
  • 计算机基础 | 常见进制与单位简介 / 表示 / 描述
  • 医疗行业如何构建合成数据平台?——技术、合规与实践全景
  • 数据结构-Map和Set
  • 第 8 期:条件生成 DDPM:让模型“听话”地画图!
  • 元宇宙概念兴起,B 端数字孪生迎来哪些新机遇?
  • 考研408第一章计算机系统概述——1.1-1.2操作系统的基本概念与发展历程
  • java基础从入门到上手(九):Java - List、Set、Map
  • Java并发编程高频面试题(已整理Java面试宝典PDF完整版)
  • 笔记整理五
  • Scrapeless Scraping Browser: A high-concurrency automation solution for AI
  • 【C++深入系列】:模版详解(上)
  • 群辉默认docker数据存储路径
  • Webpack基础
  • 交警不在就闯红灯?上海公安用科技手段查处非机动车违法
  • 民生访谈|公共数据如何既开放又安全?政务领域如何适度运用人工智能?
  • 高糖高脂食物可能让你 “迷路”
  • 专访倪军:人要有终身学习能力,一张文凭无法像以往支撑那么多年
  • “2025上海西九文化周”启动,香港顶尖文艺6月齐聚申城
  • 神舟二十号载人飞船成功飞天,上海航天有何贡献?