apache kafka 速度快的原理

kafka采用页缓存技术、顺序写入磁盘等技术来提升性能。在顺序读写的情况下,磁盘的顺序读写速度和内存相差无几,PageCache是系统级别的缓存,它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高IO效率;所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手

apache kafka 采用页缓存技术、顺序写入磁盘等技术来提升性能。在顺序读写的情况下,磁盘的顺序读写速度和内存相差无几,PageCache是系统级别的缓存,它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高IO效率;所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手

预备知识一: 页缓存(Page Cache)

缓存能提高I/O性能是基于以下2个重要的原理:

  1. CPU访问内存的速度远远大于访问磁盘的速度(访问速度差距不是一般的大,差好几个数量级)
  2. 数据一旦被访问,就有可能在短期内再次被访问(临时局部原理)
  • 文件一般存放在硬盘(机械硬盘或固态硬盘)中,CPU 并不能直接访问硬盘中的数据,而是需要先将硬盘中的数据读入到内存中,然后才能被 CPU 访问
  • 读写硬盘的速度比读写内存要慢很多,为了避免每次读写文件时,都需要对硬盘进行读写操作,Linux 内核使用 页缓存(Page Cache) 机制来对文件中的数据进行缓存。
  • 为了提升对文件的读写效率,Linux 内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(称为 页缓存 )与文件中的数据块进行绑定。
  • 页缓存,也称为磁盘缓存,是计算机随机存取存储器(RAM)的一个区域,用于保存并可能修改存储在硬盘或其他永久存储设备上的数据。
  • 页缓存,也称为磁盘缓存,是计算机随机存取存储器(RAM)的一个区域,用于保存并可能修改存储在硬盘或其他永久存储设备上的数据。
    1. 当从文件中读取数据时,如果要读取的数据所在的页缓存已经存在,那么就直接把页缓存的数据拷贝给用户即可。否则,内核首先会申请一个空闲的内存页(页缓存),然后从文件中读取数据到页缓存,并且把页缓存的数据拷贝给用户。
    2. 当向文件中写入数据时,如果要写入的数据所在的页缓存已经存在,那么直接把新数据写入到页缓存即可。否则,内核首先会申请一个空闲的内存页(页缓存),然后从文件中读取数据到页缓存,并且把新数据写入到页缓存中。对于被修改的页缓存,内核会定时把这些页缓存刷新到文件中。

预备知识二:「写缓存」常见的有3种策略

  1. 不缓存(nowrite) :: 也就是不缓存写操作,当对缓存中的数据进行写操作时,直接写入磁盘,同时使此数据的缓存失效
  2. 写透缓存(write-through) :: 写数据时同时更新磁盘和缓存
  3. 回写(copy-write or write-behind) :: 写数据时直接写到缓存,由另外的进程(回写进程)在合适的时候将数据同步到磁盘

预备知识三:通用页缓存流程

# DMA direct memory access:直接存储器访问,也就是直接访问RAM,不需要依赖CPU的负载
# CPU :中央核心处理器,主要用于计算,如果用于拷贝就太浪费资源
磁盘文件 ==DMAcopy=> 页缓存 ==CPUcopy=> 用户空间缓存 ==CPUcopy=> Socket缓存 ==DMAcopy=>> 网卡

页缓存减少了连续读写磁盘文件的次数,操作系统自动控制文件块的缓存与回收生命周期,用访问RAM的缓存代替访问磁盘区域的机制,增强查询效率。

Linux操作系统中的vm.dirty_background_ratio参数用来指定当脏页数量达到系统内存的百分之多少之后就会触发pdflush/flush/kdmflush等后台回写进程的运行来处理脏页,一般设置为小于10%的值即可,但不建议设置为0.与这个参数对应的还一个vm.dirty_ratio参数,它用来指定当脏页数量达到系统内存的百分之多少之后就不得不开始对脏页进行处理,在此过程中,新的I/O请求会被阻挡直至所有脏页被冲刷到磁盘中。

kafka页缓存技术如何实现

Kafka中大量使用了页缓存,

  • 消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务
  • 在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,可以通过log.flush.interval.message、log.flush.interval.ms等参数来控制。
  • 同步刷盘可以提高 消息的可行性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。
  • 一般不建议做同步刷盘,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障

MMFile (Memory Mapped File):

  • (简称 mmap)也被翻译成内存映射文件 ,在 64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。
  • 完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
  • 通过 mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小,有虚拟内存为我们兜底。

kafka的零拷贝是什么意思?

场景描述:把磁盘中的某个文件内容发送到远程服务器上,那么他必须经过几个拷贝过程

  1. 从磁盘中去读取目标文件的内容拷贝到内核缓冲区中
  2. 把内核缓冲区的数据拷贝到用户空间的缓冲区中
  3. 在应用程序中调用write()方法把用户空间缓冲区的数据拷贝到内核空间的socket Buffer中
  4. 把在内核模式下的socket Buffer中的数据赋值到网卡缓冲区,
  5. 最后网卡缓冲区再把数据传输到目标服务器上。

在这个过程中我们发现数据从磁盘到最终发送出去要经历4次拷贝,而在这4次拷贝过程中,有两次拷贝是浪费的,

1. 从内核空间拷贝到用户空间
2. 从用户空间再次拷贝到内核空间。

所谓的零拷贝就是把这两次多余的拷贝忽略掉。应用程序可以直接把磁盘中的数据从内核中直接传输到socket.

零拷贝是一种为了解决数据从内核缓存到用户缓存的CPU拷贝产生的性能消耗的技术。

原理:当数据从磁盘经过DMA copy到页缓存(内核缓存)后,为了减少CPU拷贝的性能损耗,操作系统会将该内核缓存与用户层进行共享,减少一次CPU copy过程,同时用户层的读写也会直接访问该共享存储,本身由用户层到Socket缓存的数据拷贝过程也变成了从 内核到内核的CPU拷贝过程,更加的快速。

磁盘文件 ==DMAcopy=> 【页缓存并共享作为用户空间缓存】 ==CPUcopy=> Socket缓存 ==DMAcopy=>> 网卡

kafka的顺序读写

  • 硬盘是机械结构,每次读写都会“寻址”,其中寻址是一个“机械动作”,是最耗时的;顺序I/O比随机I/O快,为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O
  • 在顺序读写的情况下,磁盘的顺序读写速度和内存相差无几;

顺序写磁盘就是在一个磁道连续的写入,数据都排在一起,分布在连续的磁盘扇区,主需要一次寻址就能找到对应的数据,而kafka本身的数据是不需要删除数据的,是已追加的方式写到磁盘,所以这样就能保证磁盘数据连续紧凑,同时kafka是以segment log flie进行分段存储的,每次访问磁盘文件的时候只需要寻址最后一个segment file的磁盘空间,能够保证写入和读取的效率。

kafka 基础面试题

Apache Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

what’s kafka?

1、kafka的消费者是pull(拉)还是push(推)模式,这种模式有什么好处?

Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。

优点:pull模式消费者自主决定是否批量从broker拉取数据,而push模式在无法知道消费者消费能力情况下,不易控制推送速度,太快可能造成消费者奔溃,太慢又可能造成浪费。

缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到到达。为了避免这点,Kafka 有个参数可以让 consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

2、kafka维护消息状态的跟踪方法

Kafka中的Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。然后再通过offset进行消息位置标记,通过位置偏移来跟踪消费状态。相比其他一些消息队列使用“一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记”的优点是,避免了通信消息发送后,可能出现的程序奔溃而出现消息丢失或者重复消费的情况。同时也无需维护消息的状态,不用加锁,提高了吞吐量。

3、zookeeper对于kafka的作用是什么?

Zookeeper 主要用于在集群中不同节点之间进行通信,在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

4、kafka判断一个节点还活着的有那两个条件?

1. 节点必须维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
2. 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

5、讲一讲 kafka 的 ack 的三种机制

request.required.acks 有三个值 0 1 -1(all),具体如下:

  • 0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
  • 1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
  • -1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失。

6、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数,partiton 和 key 是可选的。

Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证FIFO 的顺序。不同 partition 之间不能保证顺序。因此你可以指定 partition,将相应的消息发往同 1个 partition,并且在消费端,Kafka 保证1 个 partition 只能被1 个 consumer 消费,就可以实现这些消息的顺序消费。

另外,你也可以指定 key(比如 order id),具有同 1 个 key 的所有消息,会发往同 1 个partition,那这样也实现了消息的顺序消息。

7、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。

这个问题换种问法,就是kafka如何保证消息的幂等性。对于消息队列来说,出现重复消息的概率还是挺大的,不能完全依赖消息队列,而是应该在业务层进行数据的一致性幂等校验。

比如你处理的数据要写库(mysql,redis等),你先根据主键查一下,如果这数据都有了,你就别插入了,进行一些消息登记或者update等其他操作。另外,数据库层面也可以设置唯一健,确保数据不要重复插入等 。一般这里要求生产者在发送消息的时候,携带全局的唯一id。

8、讲一下kafka集群的组成?

  • Broker(代理):
    Kafka集群通常由多个代理组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka经纪人领导选举可以由ZooKeeper完成。

  • ZooKeeper:
    ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后生产者和消费者采取决定并开始与某些其他代理协调他们的任务。

  • Producers(生产者):
    生产者将数据推送给经纪人。 当新代理启动时,所有生产者搜索它并自动向该新代理发送消息。 Kafka生产者不等待来自代理的确认,并且发送消息的速度与代理可以处理的一样快。

  • Consumers(消费者):
    因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。

9、kafka是什么?

  • Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。

  • broker: Kafka服务器,负责消息存储和转发

  • topic:消息类别,Kafka按照topic来分类消息

  • partition: topic的分区,一个topic可以包含多个partition, topic 消息保存在各个partition上4.

  • offset:消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号

  • Producer:消息生产者

  • Consumer:消息消费者

  • Consumer Group:消费者分组,每个Consumer必须属于一个group

  • Zookeeper:保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现, partition leader选举,负载均衡等功能

10、partition的数据文件(offffset,MessageSize,data)

partition中的每条Message包含了以下三个属性: offset,MessageSize,data,其中offset表示Message在这个partition中的偏移量,offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message,可以认为offset是partition中Message的 id; MessageSize表示消息内容data的大小;data为Message的具体内容。

11、kafka如何实现数据的高效读取?(顺序读写、分段命令、二分查找)

Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为index。 index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。

12、 Kafka 消费者端的 Rebalance 操作什么时候发生?

同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
消费者离开当期所属的 consumer group组。比如宕机
分区数量发生变化时(即 topic 的分区数量发生变化时)
消费者主动取消订阅

Rebalance的过程如下:

  • 第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
  • 第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

所以对于Rebalance来说,Coordinator起着至关重要的作用

13、Kafka 中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?

kafka中与leader副本保持一定同步程度的副本(包括leader)组成ISR。
与leader滞后太多的副本组成OSR。
分区中所有的副本通称为AR。

  • ISR : 速率和leader相差低于10秒的follower的集合
  • OSR : 速率和leader相差大于10秒的follower
  • AR : 全部分区的follower

14、Kafka 中的HW、LEO等分别代表什么?

  • HW:高水位,指消费者只能拉取到这个offset之前的数据
  • LEO:标识当前日志文件中下一条待写入的消息的offset,大小等于当前日志文件最后一条消息的offset+1.

15、Kafka的那些设计让它有如此高的性能?

  1. kafka是分布式的消息队列
  2. 对log文件进行了segment,并对segment创建了索引
  3. (对于单节点)使用了顺序读写,速度能够达到600M/s
  4. 引用了zero拷贝,在os系统就完成了读写操做

16、Kafka为什么不支持读写分离?

  1. 这其实是分布式场景下的通用问题,因为我们知道CAP理论下,我们只能保证C(一致性)和A(可用性)取其一,如果支持读写分离,那其实对于一致性的要求可能就会有一定折扣,因为通常的场景下,副本之间都是通过同步来实现副本数据一致的,那同步过程中肯定会有时间的消耗,如果支持了读写分离,就意味着可能的数据不一致,或数据滞后。
  2. Leader/Follower模型并没有规定Follower副本不可以对外提供读服务。很多框架都是允许这么做的,只是 Kafka最初为了避免不一致性的问题,而采用了让Leader统一提供服务的方式。
  3. 不过,自Kafka 2.4之后,Kafka提供了有限度的读写分离,也就是说,Follower副本能够对外提供读服务。

17、分区Leader选举策略有几种?

分区的Leader副本选举对用户是完全透明的,它是由Controller独立完成的。在不同的场景下,需要执行分区Leader选举。每一种场景对应于一种选举策略。

  1. OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
  2. ReassignPartition Leader选举:当你手动运行Kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
  3. PreferredReplicaPartition Leader选举:当你手动运行Kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
  4. ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。

这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。

18、请简述下你在哪些场景下会选择 Kafka?

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和 Flink

19、请谈一谈 Kafka 数据一致性原理

一致性就是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

kafka 单播和多播

  1. 单播消息
    如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息

     

    ./kafka-console-consumer.sh --bootstrap-server bigdata:9092 --topic testone --from-beginning --consumer-property group.id=testgroup1
  2. 多播消息
    不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。

     

    ./kafka-console-consumer.sh --bootstrap-server bigdata:9092 --topic testone --from-beginning --consumer-property group.id=testgroup1
    ./kafka-console-consumer.sh --bootstrap-server bigdata:9092 --topic testone --from-beginning --consumer-property group.id=testgroup2

Spark ShuffeManager 基本介绍

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle

负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

  1. 在Spark1.2以前,默认的shuffle计算引擎是HashShuffleManager。
  2. 在Spark1.2以后的版本中,默认的ShuffleManager变成SortShuffleManager
  3. SortShuffleManager有两种机制,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制

HashShuffeManager VS SortShuffeManager

  • HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
  • SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle-read-task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可

HashShuffleManager运行原理:

  1. 未经优化的HashShuffleManager:

    • shuffle-write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子
      (比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算
      法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。

    • 在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    • 那么每个执行shuffle-write的task,要为下一个stage创建多少个磁盘文件呢?下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。

      比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。

    • shuffle-read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。

      由于shufflewrite的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffleread的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

  2. 优化后的HashShuffleManager:

    • spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
    • 开启consolidate机制之后,在shuffle-write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPUcore,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
    • 当Executor的CPU-core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。
    • 因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能

      • 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。
      • 但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:*CPUcore的数量下一个stage的task数量**。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager基本原理

  1. 普通运行机制:
    • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。
    • 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
    • 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
    • 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
    • 写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
    • 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。
    • 此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。

比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

  1. bypass运行机制:

    bypass运行机制的触发条件如下:

    1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    2. 不是聚合类的shuffle算子(比如reduceByKey)。
    • 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。
    • 当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
    • 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
    • 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。
    • 也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。