第十一章:流处理

第十一章:流处理
复杂的系统都是从简单的系统演进而来的,从另一个角度来说,开始就很复杂的系统是根本不会被设计出来的。
— 约翰·加尔, Systemantics (1975)
在第十章中我们讨论了批处理-它是将一组文件作为输出然后在产出一组新的文件的技术。输出是派生数据(derived data)的一种形式;运行在批处理流程中的数据在必要的时候是可以再次生成的。我们了解到了这一简单却强大的理念是如何被应用于构建搜索索引、推荐系统、分析工具等领域。
但是,在第十章中我们始终假设的一个命题是:输入是有边界的-即已知且有限的大小-所以批处理程序知道它何时完成了输入信息的读取。例如MapReduce的核心排序操作必须完整的读取所有的输入后才能开始输出结果:这就有可能会发生一个很小的键值数据记录很晚才被输入,因为这个小键值记录需要我们第一个输出,所以我们的输出不能很早进行(要等待这个最小键值输入后才能开始输出操作)。
实际上,很多数据之所以无边界是因为它们是随着时间的推移慢慢到达的:你的用户昨天和今天产生了数据,它明天还会继续产生更多的数据。除非你歇业,否者这个过程永远不会停止,所以在一定意义上,数据集从来不会有“完成(complete)”态[1]。因此,批处理程序必须人工的将数据划分为固定区间的数据块:例如:在每天结束的时候处理一天的有效数据,或者在每个小时结束后处理这一小时的有效数据。
以天为维度的批处理问题在于输入数据的变化只有在一天后才会对输出的结果有影响,这对很多急性子的用户来说太慢了。为了减少延迟,我们可以更频繁的执行批处理程序-例如每一秒都执行一次来处理上一秒的有效数据,甚至于说连续的,完全不划分固定区间,每个事件来临就立即处理。这便是「流处理」(stream processing)背后的思想。
通常,“流”指的是随着时间推移而逐渐可用的数据。这个概念出现在很多地方:在Unix的标准输入(stdin)和标准输出(stdout)中,编程语言(lazy lists)[],文件系统API(例如Java的FileInputStream),TCP连接,通过互联网传输的音频和视频等等。
本章,我们将把事件流(event streams)视为一种数据管理机制:一种对应于我们上一章所介绍的,一种无界的,持续增量处理的批数据处理方式。我们首先会讨论流在网络中怎样表示、存储和传输。在“数据库与流”中我们会讨论流和数据库之间的联系。最后,在“流处理”中,我们会继续探讨处理这些流的方法和工具,以及它们用来构建应用的方式。
传输事件流
在批处理的世界,任务的输入输出一般是文件(可能是分布式的文件系统)。那么流又是怎样的呢?
当输入是文件(字节序列)时,首先要做的处理步骤通常是把它解析成为记录序列。在流处理的上下文中,一条记录通常会被称为事件(event),本质上它们是同一件事物:一种小的、独立的、不可变的含有事物发生时间节点信息的对象。一个事件通常会包含一个表明在一天中发生的时钟节点一致的时间戳(详见单调和实时时钟“Monotonic Versus Time-of-Day Clocks”)。
举个例子,这个发生的事件既可能是来自于用户执行的一个操作,如浏览网页或者支付一笔账款。也可能是来自于机器,如对温度进行的周期性测量或者是CPU的利用率指标。在使用Unix工具的批处理这个例子中,web服务器的每一行日志便是一个事件。
事件的编码方式可能是字符串,或者是JSON,也可能是我们在第五章中讨论过的二进制表格。怎样编码决定了你可以怎样存储你的事件,例如你可以把它追加到文件中,也可以把它插入到关系型数据库表,又或者是把它写入到文档数据库中。另外你还可以通过网络把事件发送给其他节点来处理。
在批处理中,一次写入文件可能会被多个任务读取。类似的,在流处理的语境下,一个事件也是由生产者一次生成(也可称为发布者或发送者),然后可能被多个消费者消费(订阅者或者是收件人)[3]。在文件系统中,文件名用以标识一组相关记录;在「流处理」系统中,相关的事件通常会组合到主题(topic)或者数据流(stream)中。
原则上,文件或数据库足以连接生产者和消费者:生产者将它生产的每个事件写入到数据存储,然后每个消费者定期的从数据存储中轮询数据来检查从上次拉取后产生的新事件。实际上这正是批处理在每天结束时处理这一天有效数据的过程。
但是,若数据存储不是专门为这种用途而设计的,要在低延迟的情况下实现持续处理数据,轮询的代价是非常大的。你轮询操作越频繁,返回新事件的请求比例越低,系统开销也就越大。因此,当新事件产生时,我们最好主动通知消费者(而不是消费者轮询)。
传统数据库通常不能很好的支持这种通知机制:关系型数据库一般使用触发器(triggers),来承接这种变化(比如:往表里插入了一条数据),但是它们的能力非常有限,这更像是数据库事后的补偿设计[4]。所以,它是一种味了支撑事件通知而开发的特殊工具。
消息系统
向消费者通知新事件的通用方法就是使用消息系统(messaging system):生产者发送一条包含事件的消息,然后推送给消费者。在前面我们在“基于消息传递数据流”page 136介绍过,现在我们会进一步详细的介绍。
像Unix管道或者TCP链接这种建立在生产者和消费者之间的直接通信通道是实现消息系统的最简单方式。其实,大部分消息系统都是在基于这个模型演进的。不同的是,Unix管道和TCP链接是一个发送者对应一个接收者,而消息系统允许多个生产者节点往同一个topic发送消息且允许多个消费者节点从一个topic中接收消息。
在这种发布/订阅模型中,不同的系统采用不同的方法,并没有一个标准的答案可以同时满足不同的目的。想要区分这些系统,接下来的这两个问题将会对我们很有帮助:
当生产者发送消息过快超过了消费者的接收范围会发生什么?一般来说,有三种选择:丢弃消息,把消息缓存到队列,或者使用背压(也就是我们通常所说的流量控制-flow control;比如阻塞生产者来防止生产更多的消息)。例如,Unix管道和TCP链接就是使用了背压:他们使用了一个固定大小的缓冲区,当缓冲区满的时候,发送者会被阻塞直到有接收者从缓冲区消费数据(参见:网络拥塞与排队 第282页)。
如果消息被缓存到队列,那么了解队列增长时会发生什么将会很重要。如果消息队列长度达到内存最大容量时系统会不回崩溃,或者消息是否需要写入到磁盘?如果写磁盘,那么磁盘访问会对消息系统[6]的性能产生什么影响?
如果节点崩溃或者离线,是否会发生消息丢失?
和数据库一样,持久化可能需要写磁盘/复制这些组合操作(详见:复制与持久化),这些操作都是有成本的。如果你可以容忍偶发的消息丢失,那么在相同的硬件配置下你或许可以获得更高的吞吐量以及更低的延时。
消息丢失是否可以被接受在很大程度上取决于应用。例如,像传感器的读数和统计这种周期性的传输场景,偶尔丢失消息是没有那么重要的,因为不管怎样,下一个新的数据很快就会再次发送出来。但是,需要特别注意的是如果大量数据丢失,而这些丢失的数据可能不会被立即觉察到,那么就会导致统计的错误[7].如果你是在对事件计数,那么交付的可靠性将会变得非常重要,因为在每次丢失消息之后意味着你后面的统计数都是错误的。
在第十章批处理系统中有一个很好的特性是它提供了强可靠性的保证:失败的任务是可以自动重试的,并且失败任务中输出部分也会自动被丢弃。这也就是说没有失败的情况下输出内容都是相同的,这利于我们简化程序模型。稍后在本章中我们会探讨如何在流处理情景下怎么来实现类似的保证。
生产者消费者之间的直接消息传递
一部分消息系统在消费者和生产者之间采用无中间节点的直接网络通信:
UDP 组播广泛的应用于像股票市场这种需要低延迟要求的金融行业[8]。尽管UDP本身是不可靠的,但应用层协议可以重新请求丢失的包(生产者必须记录它所发送包以便于在需要的时候可以重新发送)。
像[ZeroMQ[4]无代理的消息库采用的方法类似,使用TCP或IP多播来实现消息的发布/订阅。
如果消费者在网络上暴漏服务,那么生产者可以直接发送HTTP或RPC请求(参见:基于服务的数据流:REST和RPC)来推送消息给消费者。这也正是webhook背后的思想,一个服务的回调URL被注册到另一个服务中,并且每当事件发生时都会向该URL发送请求。
尽管这些直接消息系统在适配当前设计的目标下可以很好的工作,它们通常需要应用代码能意识到消息丢失。缺点是它们容错十分有限:尽管这个协议能够感知到网络中包的丢失并重新转发,前提是它们始终认为生产者和消费者一直处于在线状态。
如果有消费者离线,它可能会丢失掉发送出去还未到达的消息。一些协议允许生产者对失败的消息重新投送,但是当生产者崩溃是,这种方法可能就会失效了,这会导致本来需要重新投送的消息缓冲区丢失。
消息代理
一种广泛的做法是使用消息代理(也称之为消息队列)来发送消息,本质上是一种优化消息流[13]处理方式的一类数据库。它作为服务器运行,生产者和消费者作为客户端与它连接。生产者写消息到代理服务器,消费者通过从代理服务器读取的方式来接收消息。
通过把数据集中在代理中的方式,这些系统可以从容的应对客户端的变化(连接、断开和故障),并且持久化的问题也同样的转移到了代理服务器上。某些消息代理仅仅将消息放到内存中,而另一些(取决于配置)会写入到磁盘,以此来保证代理服务器故障的时候不会丢失消息。对于速度比较慢的消费者,它们一般允许无限队列(而不是丢弃消息或者背压),当然这种选择也取决于配置。
排队的结果通常也使得消费者通常采用异步消费(asynchronous):当生产者发送一条消息,通常只需要等待代理服务器确认消息已经进入到缓冲区,而不需要关心消费者处理消息的过程。向消费者交付通常发生在未来的某一个不确定的时间点——一般在几分之一秒内,但如果遇到队列积压,有时也会出现明显的延迟。
消息代理和数据库对比
一些消息代理服务有时会使用XA或者JTA来实现两阶段提交(two-phase)协议(参见:分布式事务实践)。这个特性使得它本质上与数据库非常相似,尽管消息代理与数据库之间仍然存在着明显的差异:
数据库通常会保留数据直到主动的删除,而大多数消息代理在把消息投递给消费者后就会自动的删除数据。这样的消息代理服务是不适合做数据的长期存储的。
因为消息会被很快删除的原因,可以认定大部分消息代理服务器的工作集相当的小——也就是说队列很短。如果是因为消费者速度变慢而代理服务需要缓存很多消息数据的话(可能没有足够的内存,那么多出来的消息会被写到磁盘),那么每个独立的消息被处理的时间都会拉长,从而导致整体的吞吐量下降[6]。
数据库通常支持二级索引和多种查找数据的方式,而消息代理通常支持订阅适配了某些正则表达式的topic数据子集。这些机制虽然不同,但本质上都是为客户端选择它们需要的部分数据。
当查询数据库时,结果集通常是基于数据的某一个时间点的快照;如果其他的客户端随后又写入一些数据到数据库改变了之前的查询结果集,第一个客户端是发现不了它的部分结果是过期数据的(除非它重新查询或者轮询查看更改内容)。相比之下,消息代理虽不支持随时查询,但是当数据变化时(即有新的有效消息)它会通知客户端;
这是消息代理服务的传统观念,像JMS[14] 和 AMQP [15]标准中也有所体现,并且像RabbitMQ,ActiveMQ, HornetQ, Qpid, TIBCO Enterprise Message Service, IBM MQ, Azure Service Bus, and Google Cloud Pub/Sub [16]等系统中都有实现.
多消费者
当多个消费者从同一个主题中读取消息时,就会出现像 图11-1表示的那样,有两种主流的消息投递模式会被用到:
负载均衡模式 每个消息投递给其中一个消费者,从而消费者可以共享主题下的消息处理任务。消息代理可以任意的指定消费消息的消费者。这种模式适合消息处理代价比较高的场景,并且你可以通过增加消费者来水平扩展消息的处理能力。(在 AMQP 中,你可以通过让多个客户端从同一个队列中消费来实现负载均衡,在 JMS 中则被称为共享订阅。)
分发模式(扇出) 每条消息都会投递给所有的消费者。分发模式允许多个独立的消费者各自收听到同一个消息的广播,而不会相互影响——流处理等价于几个不同的批处理任务读取同一个输入文件。(JMS的主题订阅和AMQP的交换机绑定都提供了这种特性。)

图 11-1. (a)负载均衡:在消费者中间共享同一个主题下的消费任务;(b)分发:每个消息都投递给多个消费者。
这两种模式可以组合:比入两组相互独立的消费者组可能都订阅了同一个topic,这样每组可以共同接收所有的消息,但是在组内每条消息只能有一个节点接收。
确认和重传
消费者可能会随时崩溃,所以可能你会发生代理投递一个消息给消费者,但消费者从来没有处理它,或者在奔溃时只处理了一部分。为了确保消息不会丢失,消息代理使用确认(acknowledgments):当客户端处理完一条消息后必须明确的告知代理服务,然后代理服务就可以把它从队列中移除。
如果接收确认消息时和客户端的连接关闭或超时,那么代理服务会认为消息未被处理,然后会把这条消息再次投递给另一个消费者。(要注意,消息实际上可能完全处理完成,但是确认消息在网络传输中丢失了。为了处理这种场景就需要一个原子性的提交协议,正如“分布式事务实践”。)
当与负载均衡结合时,重新投递的行为在对消息的排序中会产生一个有趣的影响。在 [图11-2]中,消费者通常是根据生产者发送的顺序来处理消息。但是,当消费者1处理消息m4的同时,消费者 2处理消息m3发生故障。那么未被确认的消息m3随后被投递给了消费者1,结果就是消费者1按照m4,m3,m5的顺序来处理消息。因此,m3和m不是按照生产者1的发送顺序来处理的。

图 11-2. 消费者2在处理消息m3的时候发生了故障,随后消息被投递给了消费者1。
尽管消息代理尝试保持消息的顺序(正如JMS和AMQP协议要求的那样),但在组合负载均衡的时候,消息投递不可避免的导致消息的重排序。为了解决这个问题,你可以让每个消费者单独的使用一个消息队列(即不使用负载均衡特性)。如果消息之间是完全相互独立那么消息重排序就不算是一个问题,但是消息顺序对消息之间存在依赖关系的情况下是非常重要的,我们会在本章后面继续来讨论。
分区日志
通过网络发送的数据包或者发送给网络服务的请求通常是不会持久化操作痕迹的瞬时操作。尽管是可以永久记录(使用抓包和日志),我们通常不会考虑使用这种方式。尽管消息代理服务会把消息持久化写入到磁盘,但当消息投递给消费者后又会很快的被删除,因为消息代理是构建在瞬时消息思维模式之上的。
数据库和文件系统则采用完全相反的方式:任何写入数据库或文件的数据通常都是期望被永久保存,至少在显式的删除之前是这样。
这种思维方式上差异对于如何创建派生数据有很大影响。在第十章讨论中,批处理的一个关键特性就是你可以重复的执行并尝试处理步骤,但是并不会有损害输入数据的风险(因为输入是只读的)。这并不是AMQP/JMS风格的消息:接收了一个确认的消息对数据是有损坏性的,因为被消费者确认过的消息会被消息代理删除,所以你不能通过重复消费者操作来获得相同的数据。
如果你在消息系统中增加一个新的消费者,它通常只能接收到从它注册的时间点往后发送的消息;所有之前的消息都已经丢失并且不能被回复。相对于文件和数据库,则可以随时增加客户端,并且可以读取到以前任意写入的数据(是要应用程序没有明确的覆盖或删除数据)。
那为什么我们不能结合数据库的持久化和消息数据的低延迟混合起来使用呢?这正是消息日志代理背后的思想。
基于日志的消息存储
日志仅仅是磁盘上的一个支持追加的记录序列。我们之前在第三章的日志结构存储引擎和预写日志(WAL技术:write-ahead logs)以及第五张的副本中讨论过日志。
我们可以使用同样的结构来实现消息代理:生产者通过将消息追加到日志的末尾来发送消息,消费者通过依次读取日志来接收消息。如果消费者读取到了日志的末尾,那么它将会等待消息追加的通知。UNIX的tail -f 工具,正是基于这种工作思路来实现监听是否有数据追加到文件。
为了打破单个磁盘所能提供的高吞吐量的上限,日志支持分区(第六章的内容)。不同的分区可以分布在不同的机器上,每一个分区的日志都是隔离的,从而可以实现独立于其他分区的读取和写入。一个主题可以定义成带有相同类型数据的一组分区。如图11-3所示:
在每个分区中,代理服务为每条消息会分配一个单调递增的序列号或偏移量-offset(在图11-3,方框中的数字便是偏移量)。

图 11-3.生产者通过追加到主题分区文件的方式发送消息,然后消费者依次读取。
Apache Kafka[17,18],Amazon Kinesis Steams[19],和Twitter的DistributedLog[20,21] 是这种基于日志的消息代理实现方式。Google Cloud Pub/Sub 的架构也是类似,但是它是通过暴漏JMS风格的API的方式而非通过抽象日志的方式[16].尽管这些消息代理把所有的消息都写入磁盘,但是他们依然能够通过多服务器分区的方式实现百万级别的消息吞吐量,并且可以通过消息复制来实现容错性[22,23].
日志与传统消息系统对比
基于日志的方式可以很自然的支持消息的分发/扇出(fan-out),因为多个消费者可以做到不相互影响独立的读取日志-读取消息不回将它从日志中删除。为了在一组消费者之间实现负载均衡,代理可以将整个分区分配给消费者组的节点,而不是单独的将消息分配给消费者客户端。
每个客户端都会消费到分配给它的分区的所有消息。通常,当消费者被指定一个日志分区后,它将直接以单线程的方式依次从分区中读取消息。这种粗粒度的负载均衡方式有以下缺陷:
一个主题的消费的节点数最多等于该主题下分区数,因为同一个分区内的消息被投递给了同一个节点。[^注i]
如果单个消息处理缓慢,它会阻塞分区内后续的消息(这是一种对头阻塞的方式;参见:第13页的性能描述)。
注i: 可以创建一个负载均衡方案,在这个方案中,两个消费者通过读取全部消息来共享处理分区的工作,单其中一个只考虑偶数偏移量的消息,而另一个处理奇数偏移量的消息。或者,可以将消息处理扩展到线程池,但这种方法会使消费者偏移管理变得复杂。通常,单线程处理分区是优选的,并且可以通过使用更多的分区来增加并行性。
因此,在消息处理成本高昂,你希望在消息一条一条的基础上并行处理,并且消息没有有序性要求的场景中,JMS/AMQP类型消息代理会更合适。另一方面,当需要高吞吐量,每条消息都需要快速处理并且要保证消息顺序的场景下,基于日志的消息存储更适合。
消费者偏移
顺序的消费分区中的消息可以很容易的知道哪些消息是已经被处理的:所有位移小于消费者当前便宜量的消息都是已经被处理过的消息,所有大于消费者当前便宜量的消息都是还未处理的。所以消息代理不再需要为每条消息再标记确认信息——只需要周期性的记录消费者的偏移量即可。记录开销的减少以及批量处理和流水线操作有助于提高基于日志的系统的吞吐量。
偏移量实际上和主从复制数据库中常见的日志序列号非常相似,我们在第五章中配置新的从节点讨论过这种这种场景。在数据库复制中,日志序列号可以能够让追随者在和领导者断连后再次接入,而不会中断这断连期间的任何写入数据。同样这里也使用了相同的原则:消息代理的行为就像数据库领导者一样,消费者就像追随者。
如果消费者节点故障,那么这个故障的消费分区将会分配给消费者组中的另外一个节点,它会接着最后一次记录的偏移量继续消费。如果消费者消费了部分数据后还未来得及记录消费偏移量,那么这些消息在重新启动后将会在一次被消费处理。我们会在本章的后续部分来讨论这个问题的处理方式。
磁盘空间利用
如果你只是往日志里面追加写入,那么最后会造成磁盘空间溢出。为了回收磁盘空间,日志实际上是分段的,老的日志段会不定期的被删除或者被存档。(稍后我们会讨论更多更复杂的释放磁盘空间的方法。)
这就意味着如果一个很慢的消费者不能够维持消息消费的速率,那么它的消费偏移量将远远落后甚至偏移位置指向了一个删除后的日志片段,这就会导致一些消息的丢失。实际上,日志实现了一个有限大小的缓冲区也就是我们所熟知的圆形缓冲区或环形缓冲区,当它满的时候就会把旧的消息丢弃(覆盖掉)。但是,因为这个缓冲区在磁盘上,所有它的空间可能会非常大。
我们做一个粗略的计算。在撰写本文的时候,典型的硬盘容量一般是6TB,顺序写的吞吐量是150M/s。如果按照磁盘最快的速率写入消息,写满硬盘大约花费11个小时。所以,磁盘可以保留11个小时内的有效数据,接着后续写入的新消息便会覆盖旧消息。保即使你加再多的磁盘和机器,这个比率仍然不会发生改变。但实际中,很少能够达以磁盘的满带宽速度写入,所以日志通常可以保留几天甚至几周的有效数据。
无论你保留的时间有多长,日志的吞吐量基本保持恒定,因为所有的消息最终都会写入到磁盘[18]。这种操作方式和默认把消息写入缓存,只有当消息队列增长过快时才会将它们写入磁盘的方式形成鲜明的对比:当消息队列很短时系统很快,当队列过长以致于开始写磁盘时速度就会变的很慢,所以吞吐量取决于要保留历史数据的大小。
消费者跟不上生产者时
在本章开始的消息系统部分,我们讨论了当消费者消费的速率跟不上生产者生产的速率时的三种选择:丢弃消息,把消息缓存到队列或者背压。在这中分类方式下,基于日志的方式其实是一种针对数据庞大但是大小固定(取决于硬盘容量)的一种缓冲区的实现方式。
如果一个消费者落后太多,以致于它要请求的消息太旧已经不在磁盘上了(被新消息覆盖),那么它将无法读取到这些消息——实际上代理服务器会丢弃掉比缓冲区所能容纳的最大容量更早的消息。你可以监测消费者距离日志起点的落后状况,并在它显著落后的时候发起告警。如果缓冲区够大,那么将会有足够的时间来让手工处理慢消费者,并在它开始丢失消息之前追赶上来。
尽管某个消费者由于落后太多而开始丢失数据,但也只对这个消费者有影响;它不会影响到其它消费者服务。这实际上有很多的操作便利:你可以真实的去消费生产日志来用于开发,测试或用以调试而不必担心它会影响生产服务。当一个消费者关闭或故障而停止消费——唯一需要保留下来的就只有它的消费位移;
这种方式与传统的消息代理相比,你需要关心的是删除掉那些关闭了的消费者的队列——不然他们会持续的增加一些不必要的消息,并会和还在运行的消费者抢夺内存。
消息重放
我们注意到之前在使用AMQP和JMS风格的消息代理时,处理和消息确认是一个破坏性的操作,因为它会导致消息从代理服务中被删除。但是,在基于日志的消息代理中,消费消息更像是从文件中读取数据:它是一个不会更改日志的只读操作。
除了消费者的输出外,处理数据过程的唯一影响只是消费者的位移向前移动而已。因为位移是由消费者控制的,所以在必要的时候它可以很容易的操作:例如,为了重新处理昨天的有效数据,你可以拷贝一份从消费者昨天开始时间位移处的数据,将它输出到不同的地方。伴随着你处理程序的变化,你可以无数次的重复这样的操作。
在这种通过重复的转换处理将输入数据和输出数据明显分隔开的方面,使得基于日志消息发送更像是上一章的批处理,它可以允许我们进行更多测验并很容易的从错误或者bug中恢复,同样也使得它成为在组织内集成数据流的一个很好的工具24。
数据库与流
我们已经做了很多消息代理和数据库之间的比较。尽管他们在传统上属于两种不同的工具种类,但是我们看到基于日志的消息代理已经成功将数据库的思想应用到了消息投送上。也可以反过来说:从消息投送和流处理的思想应用于数据库。
我们之前说过一个事件就是对某件事情发生时间点的记录。这个事件的产生可能是用户的一个动作(例如:输入查询操作),或者是一个传感器的数据读取,但也是需要写入到数据库中。实际上写入数据库中的是一个可以被获取、存储和处理的事件。这一现象表明,数据库与数据流之间的联系比日志在磁盘上的存储(这个是基础)更加紧密。
事实上,日志复制(日志复制实现)便是一种由领导者在事务处理过程中写数据库的事件流。追随者将这些写入流应用到自己数据库的独立副本中,因此最终会得到一个相同数据的精确副本。复制日志中会记录描述产生数据变化的事件。
在全序广播中,我们也发现状态机复制原则:如果每一个事件都代表对数据库的写入,并且所有的副本都以相同的顺序处理相同的事件,那么所有副本最终都会达到一个相同的状态。(事件的处理是一个确定性的操作。)这又是另外一个事件流的经典案例!
这一部分我们首先来看在各种数据系统中出现的一个问题,然后探索如何通过将事件流的思想引入到数据库来解决这个问题。
保持系统同步
通过本书我们可以看到,并没有一个完美的系统可以同时满足所有的数据存储,查询和处理需求。实践中,多数优秀的应用都需要通过结合多种技术来满足需求:例如,OLTP数据库被用作用户请求的服务,使用缓存来增加请求速度,使用全文索引来处理搜索请求,数据仓库用作分析等等。他们中的每一个都拥有独立的数据副本,并以他们所做的任务性能更好的方式来存储。
当相同或相关的数据出现在多个不同的地方时,他们相互之间需要保证同步:如果数据库中的一条数据发生改变,那么缓存,搜素引擎以及数据仓库中的数据同样需要更新。对于数据仓库通常使用ETL执行来实现(详见:数据仓库),做法是先全量复制数据,进行数据转换,然后将他们批量加载到数据仓库——说白了就是批处理。同样,我们在第411页的批处理工作流输出中看到,如何使用批处理创建搜索索引、推荐系统和其他派生数据系统。
如果周期性的全量数据库转储太慢,有时候双写也会被用作一个备选方案来使用,当数据发生变更时应用代码会显式的写入到各个系统中:比如,首先写数据库,接着更新搜素索引,然后失效缓存数据(或者并发的执行这些操作)。
但是双写有一些致命的问题,其中一个便是图11-4中所示的竞态条件。在这个例子中,两个客户端同时更新X:client-1想要设置它为A,client-2想把它设置成B。两个客户端都是把新值先写入到数据库,然后再写入到搜索索引。由于时机不佳,请求是交叉的:数据库首先看到的是client-1设置的值A,然后client-2写入把该值设置为B,所以数据库里最终的值是B。搜索索引首先看到的是client-2的写入,然后是client-1,所以搜索索引最终看到的值是A。尽管没有错误发生,但是这两个系统之间发生了永久不一致性。

图 11-4. 数据库中,X首先被设置为了A,然后设置成B,在搜索索引中写入的顺序是相反的。
除非你额外的增加一些并发监测机制,比如我们在检测并发写入中讨论过的版本向量,你可以不用关心并发写入的发生——一个值可以简单直接的覆盖掉另外一个值。
双写的另外一个问题在于当其中一个写成功后另外一个可能写入失败。这是一个容错问题而非并发问题,但是它仍能对两个系统相互之间不一致产生影响。保证他们要么同时成功要么同时失败是原子性提交问题的一个案例,它解决起来通常成本较高(参见:原子性提交和两阶段提交(2PC))。
如果只有一个只包含领导者节点的数据库副本,那么领导者将决定写入操作的顺序,因此状态机复制机制适用于数据库副本间的协同工作。但是,在图图11-4中,不是单领导者:数据库和搜索索引可能有他们单独的领导者,他们既不互相追随,也不相互影响(参加:多领导者副本)。
如果确实只有一个领导者的情况还好——比如让数据库作为领导者——如果我们可以把搜索索引作为数据库的一个追随者。但是这在实践中可行吗?
变更数据捕获
大多数数据库复制日志的问题是他们长期以来注重的是数据库内部的实现细节,而非公共的API。客户端应该通过数据模型和查询语句来从数据库中查询数据,而不是去解析复制日志再把数据从中提取出来。
几十年来,许多数据库就连写入数据的变更日志文档都没有。基于这个原因就很难拿到数据库所有数据的变动并把它们复制到像搜索索引,缓存或者数据仓库这类不同技术栈的存储组件中去。
最近,人们对变更数据捕获(change data capture-CDC)兴趣日益浓厚,它通过观察所有写入数据库数据的变化过程,然后以某种可以被复制到其他系统的方式提取数据。如果数据是即时写入的且它们的变化是可以流式操作的,那么CDC尤其值得我们去关注。
例如,你可以捕获数据库中数据的变更,并且持续的把同样的变更应用到搜索索引中。如果日志的变更顺序相同,那么你可以认为搜索索引中的数据和数据库中的数据是一致的。搜索索引和其他任何派生数据系统一样都是流变更的消费者而已,如图图11-5所示:

图 11-5. 将数据按序写入数据库,然后以相同的顺序把数据应用到其他的系统中。
数据变更捕获实现
流处理
流处理的适用场景
小结
引用
[1] Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale,Unbounded, Out-of-Order Data Processing,” Proceedings of the VLDB Endowment,volume 8, number 12, pages 1792–1803, August 2015. doi:10.14778/2824032.2824076
[2] Harold Abelson, Gerald Jay Sussman, and Julie Sussman: Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN:978-0-262-51087-5, available online at mitpress.mit.edu
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec: “The Many Faces of Publish/Subscribe,” ACM Computing Surveys, volume 35, number 2, pages 114–131, June 2003. doi:10.1145/857076.857078
[4] Joseph M. Hellerstein and Michael Stonebraker: Readings in Database Systems,4th edition. MIT Press, 2005. ISBN: 978-0-262-69314-1, available online at redbook.cs.berkeley.edu
[5] Don Carney, Uğur Çetintemel, Mitch Cherniack, et al.: “Monitoring Streams – A New Class of Data Management Applications,” at 28th International Conference on Very Large Data Bases (VLDB), August 2002.
[6] Matthew Sackman: “Pushing Back,” lshift.net, May 5, 2016.
[7] Vicent Martí: “Brubeck, a statsd-Compatible Metrics Aggregator,” githubengineering.com, June 15, 2015.
[8] Seth Lowenberger: “MoldUDP64 Protocol Specification V 1.00,” nasdaqtrader.com, July 2009.
[9] Pieter Hintjens: ZeroMQ – The Guide. O’Reilly Media, 2013. ISBN: 978-1-449-33404-8
[10] Ian Malpass: “Measure Anything, Measure Everything,” codeascraft.com, February 15, 2011.
[11] Dieter Plaetinck: “25 Graphite, Grafana and statsd Gotchas,” blog.raintank.io,March 3, 2016.
[12] Jeff Lindsay: “Web Hooks to Revolutionize the Web,” progrium.com, May 3,2007.
[13] Jim N. Gray: “Queues Are Databases,” Microsoft Research Technical Report MSR-TR-95-56, December 1995.
[14] Mark Hapner, Rich Burridge, Rahul Sharma, et al.: “JSR-343 Java Message Service (JMS) 2.0 Specification,” jms-spec.java.net, March 2013.
[15] Sanjay Aiyagari, Matthew Arrott, Mark Atwell, et al.: “AMQP: Advanced Message Queuing Protocol Specification,” Version 0-9-1, November 2008.
[16] “Google Cloud Pub/Sub: A Google-Scale Messaging Service,” cloud.google.com,2016.
[17] “Apache Kafka 0.9 Documentation,” kafka.apache.org, November 2015.
[18] Jay Kreps, Neha Narkhede, and Jun Rao: “Kafka: A Distributed Messaging System for Log Processing,” at 6th International Workshop on Networking Meets Databases (NetDB), June 2011.
[19] “Amazon Kinesis Streams Developer Guide,” docs.aws.amazon.com, April 2016.
[20] Leigh Stewart and Sijie Guo: “Building DistributedLog: Twitter’s HighPerformance Replicated Log Service,” blog.twitter.com, September 16, 2015.
[21] “DistributedLog Documentation,” Twitter, Inc., distributedlog.io, May 2016.
[22] Jay Kreps: “Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines),” engineering.linkedin.com, April 27, 2014.
[23] Kartik Paramasivam: “How We’re Improving and Advancing Kafka at LinkedIn,” engineering.linkedin.com, September 2, 2015.
[24] Jay Kreps: “The Log: What Every Software Engineer Should Know About RealTime Data’s Unifying Abstraction,” engineering.linkedin.com, December 16, 2013.
[25] Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: “All Aboard the Databus!,” at 3rd ACM Symposium on Cloud Computing (SoCC), October 2012.
[26] Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “Wormhole: Reliable Pub-Sub to Support Geo-Replicated Internet Services,” at 12th USENIX Symposium on Networked Systems Design and Implementation (NSDI), May 2015.
[27] P. P. S. Narayan: “Sherpa Update,” developer.yahoo.com, June 8, .
[28] Martin Kleppmann: “Bottled Water: Real-Time Integration of PostgreSQL and Kafka,” martin.kleppmann.com, April 23, 2015.
[29] Ben Osheroff: “Introducing Maxwell, a mysql-to-kafka Binlog Processor,” developer.zendesk.com, August 20, 2015.
[30] Randall Hauch: “Debezium 0.2.1 Released,” debezium.io, June 10, 2016.
[31] Prem Santosh Udaya Shankar: “Streaming MySQL Tables in Real-Time to Kafka,” engineeringblog.yelp.com, August 1, 2016.
[32] “Mongoriver,” Stripe, Inc., github.com, September 2014.
[33] Dan Harvey: “Change Data Capture with Mongo + Kafka,” at Hadoop Users Group UK, August 2015.
[34] “Oracle GoldenGate 12c: Real-Time Access to Real-Time Information,” Oracle White Paper, March 2015.
[35] “Oracle GoldenGate Fundamentals: How Oracle GoldenGate Works,” Oracle Corporation, youtube.com, November 2012.
[36] Slava Akhmechet: “Advancing the Realtime Web,” rethinkdb.com, January 27, 2015.
[37] “Firebase Realtime Database Documentation,” Google, Inc., firebase.google.com,May 2016.
[38] “Apache CouchDB 1.6 Documentation,” docs.couchdb.org, 2014.
[39] Matt DeBergalis: “Meteor 0.7.0: Scalable Database Queries Using MongoDB Oplog Instead of Poll-and-Diff,” info.meteor.com, December 17, 2013.
[40] “Chapter 15. Importing and Exporting Live Data,” VoltDB 6.4 User Manual,docs.voltdb.com, June 2016.
[41] Neha Narkhede: “Announcing Kafka Connect: Building Large-Scale LowLatency Data Pipelines,” confluent.io, February 18, 2016.
[42] Greg Young: “CQRS and Event Sourcing,” at Code on the Beach, August 2014.
[43] Martin Fowler: “Event Sourcing,” martinfowler.com, December 12, 2005.
[44] Vaughn Vernon: Implementing Domain-Driven Design. Addison-Wesley Professional, 2013. ISBN: 978-0-321-83457-7
[45] H. V. Jagadish, Inderpal Singh Mumick, and Abraham Silberschatz: “View Maintenance Issues for the Chronicle Data Model,” at 14th ACM SIGACT-SIGMODSIGART Symposium on Principles of Database Systems (PODS), May 1995. doi:10.1145/212433.220201
[46] “Event Store 3.5.0 Documentation,” Event Store LLP, docs.geteventstore.com, February 2016.
[47] Martin Kleppmann: Making Sense of Stream Processing. Report, O’Reilly Media, May 2016.
[48] Sander Mak: “Event-Sourced Architectures with Akka,” at JavaOne, September 2014.
[49] Julian Hyde: personal communication, June 2016.
[50] Ashish Gupta and Inderpal Singh Mumick: Materialized Views: Techniques,Implementations, and Applications. MIT Press, 1999. ISBN: 978-0-262-57122-7
[51] Timothy Griffin and Leonid Libkin: “Incremental Maintenance of Views with Duplicates,” at ACM International Conference on Management of Data (SIGMOD),May 1995. doi:10.1145/223784.223849
[52] Pat Helland: “Immutability Changes Everything,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
[53] Martin Kleppmann: “Accounting for Computer Scientists,” martin.kleppmann.com, March 7, 2011.
[54] Pat Helland: “Accountants Don’t Use Erasers,” blogs.msdn.com, June 14, 2007.
[55] Fangjin Yang: “Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets,” metamarkets.com, June 3, 2015.
[56] Gavin Li, Jianqiu Lv, and Hang Qi: “Pistachio: Co-Locate the Data and Compute for Fastest Cloud Compute,” yahoohadoop.tumblr.com, April 13, 2015.
[57] Kartik Paramasivam: “Stream Processing Hard Problems – Part 1: Killing Lambda,” engineering.linkedin.com, June 27, 2016.
[58] Martin Fowler: “CQRS,” martinfowler.com, July 14, 2011.
[59] Greg Young: “CQRS Documents,” cqrs.files.wordpress.com, November 2010.
[60] Baron Schwartz: “Immutability, MVCC, and Garbage Collection,” xaprb.com,December 28, 2013.
[61] Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.: “Re: Turning the Database Inside-out with Apache Samza,” Hacker News discussion, news.ycombinator.com,March 4, 2015.
[62] “Datomic Development Resources: Excision,” Cognitect, Inc., docs.datomic.com.
[63] “Fossil Documentation: Deleting Content from Fossil,” fossil-scm.org, 2016.
[64] Jay Kreps: “The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard,” twitter.com, March 30, 2015.
[65] David C. Luckham: “What’s the Difference Between ESP and CEP?,” complexevents.com, August 1, 2006.
[66] Srinath Perera: “How Is Stream Processing and Complex Event Processing (CEP) Different?,” quora.com, December 3, 2015.
[67] Arvind Arasu, Shivnath Babu, and Jennifer Widom: “The CQL Continuous Query Language: Semantic Foundations and Query Execution,” The VLDB Journal,volume 15, number 2, pages 121–142, June 2006. doi:10.1007/s00778-004-0147-z
[68] Julian Hyde: “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch,” ACM Queue, volume 7, number 11, December 2009. doi:10.1145/1661785.1667562
[69] “Esper Reference, Version 5.4.0,” EsperTech, Inc., espertech.com, April 2016.
[70] Zubair Nabi, Eric Bouillet, Andrew Bainbridge, and Chris Thomas: “Of Streams and Storms,” IBM technical report, developer.ibm.com, April 2014.
[71] Milinda Pathirage, Julian Hyde, Yi Pan, and Beth Plale: “SamzaSQL: Scalable Fast Data Management with Streaming SQL,” at IEEE International Workshop on High-Performance Big Data Computing (HPBDC), May 2016. doi:10.1109/IPDPSW.2016.141
[72] Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “HyperLog Log: The Analysis of a Near-Optimal Cardinality Estimation Algorithm,” at Conference on Analysis of Algorithms (AofA), June 2007.
[73] Jay Kreps: “Questioning the Lambda Architecture,” oreilly.com, July 2, 2014.
[74] Ian Hellström: “An Overview of Apache Streaming Technologies,” databaseline.wordpress.com, March 12, 2016.
[75] Jay Kreps: “Why Local State Is a Fundamental Primitive in Stream Processing,”oreilly.com, July 31, 2014.
[76] Shay Banon: “Percolator,” elastic.co, February 8, 2011.
[77] Alan Woodward and Martin Kleppmann: “Real-Time Full-Text Search with Luwak and Samza,” martin.kleppmann.com, April 13, 2015.
[78] “Apache Storm 1.0.1 Documentation,” storm.apache.org, May 2016.
[79] Tyler Akidau: “The World Beyond Batch: Streaming 102,” oreilly.com, January 20, 2016.
[80] Stephan Ewen: “Streaming Analytics with Apache Flink,” at Kafka Summit, April 2016.
[81] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, et al.: “MillWheel: Fault-Tolerant Stream Processing at Internet Scale,” at 39th International Conference on Very Large Data Bases (VLDB), August 2013.
[82] Alex Dean: “Improving Snowplow’s Understanding of Time,” snowplowanalytics.com, September 15, 2015.
[83] “Windowing (Azure Stream Analytics),” Microsoft Azure Reference,msdn.microsoft.com, April 2016.
[84] “State Management,” Apache Samza 0.10 Documentation, samza.apache.org,December 2015.
[85] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, et al.: “Photon:Fault-Tolerant and Scalable Joining of Continuous Data Streams,” at ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272
[86] Martin Kleppmann: “Samza Newsfeed Demo,” github.com, September 2014.
[87] Ben Kirwin: “Doing the Impossible: Exactly-Once Messaging Patterns in Kafka,”ben.kirw.in, November 28, 2014.
[88] Pat Helland: “Data on the Outside Versus Data on the Inside,” at 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005.
[89] Ralph Kimball and Margy Ross: The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1
[90] Viktor Klang: “I’m coining the phrase ‘effectively-once’ for message processing with at-least-once + idempotent operations,” twitter.com, October 20, 2016.
[91] Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters,” at 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.
[92] Kostas Tzoumas, Stephan Ewen, and Robert Metzger: “High-Throughput, LowLatency, and Exactly-Once Stream Processing with Apache Flink,” data-artisans.com,August 5, 2015.
[93] Paris Carbone, Gyula Fóra, Stephan Ewen, et al.: “Lightweight Asynchronous Snapshots for Distributed Dataflows,” arXiv:1506.08603 [cs.DC], June 29, 2015.
[94] Ryan Betts and John Hugg: Fast Data: Smart and at Scale. Report, O’Reilly Media, October 2015.
[95] Flavio Junqueira: “Making Sense of Exactly-Once Semantics,” at Strata+Hadoop World London, June 2016.
[96] Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang: “KIP-98 – Exactly Once Delivery and Transactional Messaging,”cwiki.apache.org, November 2016.
[97] Pat Helland: “Idempotence Is Not a Medical Condition,” Communications of the ACM, volume 55, number 5, page 56, May 2012. doi:10.1145/2160718.2160734
[98] Jay Kreps: “Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind,”email to samza-dev mailing list, September 9, 2014.
[99] E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson:“A Survey of Rollback-Recovery Protocols in Message-Passing Systems,” ACM Computing Surveys, volume 34, number 3, pages 375–408, September 2002. doi:10.1145/568522.568525
[100] Adam Warski: “Kafka Streams – How Does It Fit the Stream Processing Landscape?,” softwaremill.com, June 1, 2016.
Last updated