redis cluster 基础概念随笔

这里的A和B可以存在于各种领域,但一般fail-over特指计算机领域的数据库、应用服务、硬件设备等的失效转移。
failover是redis cluster提供的容错机制,cluster最核心的功能之一。failover支持两种模式:故障failover:自动恢复集群的可用性,人为failover:支持集群的可运维操作

单机都会存在单点故障的问题,及服务部署在一台服务器上,一旦服务器宕机服务就不可用;让服务高可用,分布式服务就出现了,将同一服务部署到多台机器上,即使其中几台服务器宕机,只要有一台服务器可用服务就可用。

故障恢复 Failover

失效转移/故障恢复:通俗地说,即当A无法为客户服务时,系统能够自动地切换,使B能够及时地顶上继续为客户提供服务,且客户感觉不到这个为他提供服务的对象已经更换。

这里的A和B可以存在于各种领域,但一般fail-over特指计算机领域的数据库、应用服务、硬件设备等的失效转移。
failover是redis cluster提供的容错机制,cluster最核心的功能之一。failover支持两种模式:

  • 故障failover:自动恢复集群的可用性
  • 人为failover:支持集群的可运维操作

redis演进

  1. 为了解决单机故障引入了主从模式,但主从模式存在一个问题:master节点故障后服务,需要人为的手动将slave节点切换成为maser节点后服务才恢复
  2. 为解决这一问题又引入了哨兵模式,哨兵模式能在master节点故障后能自动将salve节点提升成master节点,不需要人工干预操作就能恢复服务可用
  3. redis cluster发布得比较晚(2015年才发布正式版 ),各大厂等不及了,陆陆续续开发了自己的redis数据分片集群模式,比如:Twemproxy、Codis等
  4. 主从模式、哨兵模式都没有达到真正的数据sharding存储,每个redis实例中存储的都是全量数据,所以redis cluster就诞生了,实现了真正的数据分片存储

redis单节点

虽然有通过RDB和AOF持久化机制能将数据持久化到硬盘上,但数据是存储在一台服务器上的,如果服务器出现硬盘故障等问题,会导致数据不可用,而且读写无法分离,读写都在同一台服务器上,请求量大时会出现I/O瓶颈。

Redis 4.0 提供了混合持久化方案,将 RDB 文件的内容和增量的 AOF 日志文件存在一起。这里的 AOF 日志不再是全量的日志,而是自 RDB 持久化开始到持久化结束这段时间发生的增量 AOF 日志,通常这部分日志很小。

RDB (Redis Database)

  • 持久化在指定的时间间隔生成数据集的时间点快照(point-in-time )
  • RDB方式是在指定的时间间隔或者执行特定命令时将当前系统中的数据保存备份,以二进制的形式写入磁盘中,默认文件名为dump.rdb。
  • RDB 的触发有三种机制,执行save命令;执行bgsave命令;在redis.config中配置自动化。
  • save命令会阻塞当前的Redis服务器;bgsave时可以通过fork一个子进程,然后通过这个子进程来处理接下来所有的保存工作
  • 需要我们在客户端中去执行save或者bgsave命令,在生产情况下我们更多地需要是自动化的触发机制,那么Redis就提供了这种机制,我们可以在redus.config中对持久化进行配置
  • “save 600 1” 指在 600 秒内,如果有一个或一个以上的修改操作,那么就自动进行一次自动化备份

AOF(Append Only-file)

  • 它的工作原理:AOF日志存储的是Redis服务器指令序列,AOF只记录对内存进行修改的指令记录.
  • 先执行指令才将日志存盘,它有三种策略
  • always:每次发生数据修改就会立即记录到磁盘文件中,这种方案的完整性好但是IO开销很大,性能较差;
  • everysec:在每一秒中进行同步,速度有所提升。但是如果在一秒内宕机的话可能失去这一秒内的数据;
  • no:默认配置,即不使用 AOF 持久化方案

主从模式

避免单点故障 和 读写不分离,Redis 提供了复制(replication)功能实现master数据库中的数据更新后,会自动将更新的数据同步到其他slave数据库上

  1. redis主从结构特点:一个master可以有多个salve节点;salve节点可以有slave节点,从节点是级联结构
  2. 主从结构具有读写分离,提高效率、数据备份,提供多个副本等优点。
  3. 最大的不足就是主从模式不具备自动容错和恢复功能,主节点故障,集群则无法进行工作,可用性比较低,从节点升主节点需要人工手动干预。
  4. 普通的主从模式,当主数据库崩溃时,需要手动切换从数据库成为主数据库:
  5. 在从数据库中使用SLAVE NO ONE命令将从数据库提升成主数据继续服务。
  6. 启动之前崩溃的主数据库,然后使用SLAVEOF命令将其设置成新的主数据库的从数据库,即可同步数据。

哨兵模式

哨兵模式是一种特殊模式,首先Redis提供了哨兵命令,哨兵是一个独立的进程,它会独立运行。其原理是哨兵通过发送命令等待Redis服务器响应,从而监控运行的多个Redis实例。哨兵模式是从Redis的2.6版本开始提供的,这个版本当时是不稳定的,直到Redis的2.8版本以后,这个哨兵模式才稳定下来。

哨兵的作用:

  1. 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  2. 当哨兵检测到master宕机,会自动将slave切换为master,然后通过发送订阅模式通知其他的从服务器,修改配置文件,让他们切换主机。
    一个哨兵进程对Redis服务器进行监控,可能会出现问题,所以我们使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多个哨兵模式。

哨兵模式核心还是主从复制,只不过在相对于主从模式在主节点宕机导致不可写的情况下,多了一个竞选机制:从所有的从节点竞选出新的主节点。竞选机制的实现,是依赖于在系统中启动一个sentinel进程。
哨兵在启动进程时,会读取配置文件的内容: sentinel monitor master-name ip port quorum

  1. master-name是主数据库的名字
  2. ip和port 是当前主数据库地址和端口号
  3. quorum表示在执行故障切换操作前,需要多少哨兵节点同意。

需要连接主节点,是因为通过主节点的info命令,获取从节点信息,从而和从节点也建立连接,同时也能通过主节点的info信息知道新增从节点的信息。一个哨兵节点可以监控多个主节点,但是并不提倡这么做,因为当哨兵节点崩溃时,同时有多个集群切换会发生故障

跟主数据库建立连接后会定时执行以下三个操作:

  1. 每隔10s向master和 slave发送info命令。作用是获取当前数据库信息,比如发现新增从节点时,会建立连接,并加入到监控列表中,当主从数据库的角色发生变化进行信息更新。
  2. 每隔2s向主数据里和从数据库的sentinel:hello频道发送自己的信息。作用是将自己的监控数据和哨兵分享。每个哨兵会订阅数据库的_sentinel:hello频道,当其他哨兵收到消息后,会判断该哨兵是不是新的哨兵,如果是则将其加入哨兵列表,并建立连接。
  3. 每隔1s向所有主从节点和所有哨兵节点发送ping命令,作用是监控节点是否存活。

哨兵节点发送ping命令时,当超过一定时间(down-after-millisecond)后,如果节点未回复,则哨兵认为主观下线。主观下线表示当前哨兵认为该节点已经下面,如果该节点为主数据库,哨兵会进一步判断是够需要对其进行故障切换,这时候就要发送命令(SENTINEL is-master-down-by-addr)询问其他哨兵节点是否认为该主节点是主观下线,当达到指定数量(quorum)时,哨兵就会认为是客观下线。

故障切换的过程

假设主服务器宕机,哨兵1先检测到这个结果,系统不会马上进行failover过程,仅仅时哨兵1直观认为主服务器不可用,这个现象叫主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵会进行一次投票,投票的结果由一个哨兵发起,进行failover操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程成为客观下线。

当主节点客观下线时就需要进行主从切换,主从切换的步骤为:

  • (1)选出领头哨兵。
  • (2)领头哨兵所有的slave选出优先级最高的从数据库。优先级可以通过slave-priority选项设置。
  • (3)如果优先级相同,则从复制的命令偏移量越大(即复制同步数据越多,数据越新),越优先。
  • (4)如果以上条件都一样,则选择run ID较小的从数据库。

选出一个从数据库后,哨兵发送slave no one命令升级为主数据库,并发送slaveof命令将其他从节点的主数据库设置为新的主数据库。

哨兵模式的不足:

  • 是一种中心化的集群实现方案:始终只有一个Redis主机来接收和处理写请求,写操作受单机瓶颈影响;
  • 集群里所有节点保存的都是全量数据,浪费内存空间,没有真正实现分布式存储。数据量过大时,主从同步严重影响master的性能;
  • Redis主机宕机后,哨兵模式正在投票选举的情况之外,因为投票选举结束之前,谁也不知道主机和从机是谁,此时Redis也会开启保护机制,禁止写操作,直到选举出了新的Redis主机

Redis Cluster模式

redis cluster主要是针对海量数据+高并发+高可用的场景,海量数据,如果你的数据量很大,那么建议就用redis cluster,数据量不是很大时,使用sentinel就够了。redis cluster的性能和高可用性均优于哨兵模式。

Redis Cluster是一种服务器Sharding技术(分片和路由都是在服务端实现),采用多主多从,每一个分区都是由一个Redis主机和多个从机组成,片区和片区之间是相互平行的。Redis Cluster集群采用了P2P的模式,完全去中心化。

主要特点

  1. 集群完全去中心化,采用多主多从;
  2. 所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽。
  3. 客户端与 Redis 节点直连,不需要中间代理层。
  4. 客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。
  5. 每一个分区都是由一个Redis主机和多个从机组成,片区和片区之间是相互平行的。
  6. 每一个master节点负责维护一部分槽,以及槽所映射的键值数据。

分片机制-虚拟槽

采用虚拟哈希槽分区而非一致性hash算法,预先分配16384(2^14)个卡槽,所有的键根据哈希函数映射到 0 ~ 16383整数槽内,每一个分区内的master节点负责维护一部分槽以及槽所映射的键值数据。

    #key到hash槽映射算法:对每个key计算CRC16值,然后对16384取模
    计算公式:slot = CRC16(key) & 16383
  • hash卡槽只会分配给每个片区的主节点上,从节点不会分配卡槽,从节点会同步master上的hash槽。
  • 每个hash卡槽可以存放多个Key,每一个数据key对应一个hash槽。
  • hash卡槽的目的是确认数据存放到哪个片区的Redis主节点上,实现Redis集群分摊Key。
  • 每个片区的Redis主节点卡槽数都对应一个范围,多个片区之间卡槽数范围是等比分配的。比如:存在3个片区对应3个Redis主机,那么3个Redis主机的卡槽总数分别是:16384/3。3个Redis主机的卡槽范围分别是:
    第一台Redis主机:0~5461
    
    第二台Redis主机:5462 ~ 10922
    
    第三台Redis主机:10923~16383
  • 写操作时,会根据Key值计算出对应的卡槽所在的位置,再将数据存入卡槽区对应的master中;读数据也是一样,通过key得到slot,再通过slot找到node获取数据(客户端读请求是打到任意节点上的,当请求的数据没有在接受请求的node上时,会出现重定向,后面有详细讲解)。
  • Redis Cluster的节点之间会共享消息,每个节点都会知道是哪个节点负责哪个范围内的数据槽。所以客服端请求任意一个节点,都能获取到slot对应的node信息。
#面试题:
#1.redis cluster为什么没有使用一致性hash算法,而是使用了哈希槽预分片?
    缓存热点问题:一致性哈希算法在节点太少时,容易因为数据分布不均匀而造成缓存热点的问题。一致性哈希算法可能集中在某个hash区间内的值特别多,会导致大量的数据涌入同一个节点,造成master的热点问题(如同一时间20W的请求都在某个hash区间内)。

#2.redis的hash槽为什么是16384(2^14)个卡槽,而不是65536(2^16)个?
(1)如果槽位为65536,发送心跳信息的消息头达8k,发送的心跳包过于庞大。
(2)redis的集群主节点数量基本不可能超过1000个。
    集群节点越多,心跳包的消息体内携带的数据越多。如果节点过1000个,也会导致网络拥堵。因此redis作者,不建议redis cluster节点数量超过1000个。 那么,对于节点数在1000以内的redis cluster集群,16384个槽位够用了。没有必要拓展到65536个。
(3)槽位越小,节点少的情况下,压缩率高。

集群功能限制

Redis 集群相对 单机,存在一些功能限制:

  1. key 批量操作 支持有限:类似 mset、mget 操作,目前只支持对具有相同 slot 值的 key 执行 批量操作。对于映射为不同 slot 值的 key 由于执行 mget、mget 等操作可能存在于多个节点上,因此不被支持。
  2. key 事务操作 支持有限:只支持 多 key 在 同一节点上 的 事务操作,当多个 key 分布在 不同 的节点上时 无法 使用事务功能。
  3. key 作为 数据分区 的最小粒度,不能将一个大的键值对象(如hash、list) 等映射到不同的节点。
  4. 不支持多数据库空间:单机 下的 Redis 可以支持 16 个数据库(db0 ~ db15),集群模式 下只能使用 一个 数据库空间,即 db0。
  5. 复制结构只支持一层:从节点只能复制主节点,不支持主从复制链
  6. Redis Cluster 非常适合构建中小规模 Redis 集群,这里的中小规模指的是,大概几个到几十个节点这样规模的 Redis 集群。但是 Redis Cluster 不太适合构建超大规模集群,主要原因是,它采用了去中心化的设计。
  7. Redis 的每个节点上,都保存了所有槽和节点的映射关系表,客户端可以访问任意一个节点,再通过重定向命令,找到数据所在的那个节点。那么,这个映射关系表是如何更新的呢?Redis Cluster 采用了一种去中心化的流言 (Gossip) 协议来传播集群配置的变化。Gossip 协议的优点是去中心化;缺点是传播速度慢,并且是集群规模越大,传播的越慢

moved和ask重定向的区别:

  • 两者都是客户端重定向
  • moved异常:槽已经确定迁移,即槽已经不在当前节点
  • ask异常:槽还在迁移中

客服端请求产生moved重定向的执行过程:

1.每个节点通过通信都会共享Redis Cluster中槽和集群中对应节点的关系。

2.客户端向Redis Cluster的任意节点发送命令,接收命令的节点会根据CRC16规则进行hash运算与16383取余,计算自己的槽和对应节点 。

3.如果保存数据的槽被分配给当前节点,则去槽中执行命令,并把命令执行结果返回给客户端。

4.如果保存数据的槽不在当前节点的管理范围内,则向客户端返回moved重定向异常 。

5.客户端接收到节点返回的结果,如果是moved异常,则从moved异常中获取目标节点的信息。

6.客户端向目标节点发送命令,获取命令执行结果。

客服端请求目标节点时,目标节点中的槽已经迁移支别的节点上了,此时目标节点会返回ask转向给客户端。

1.当客户端向集群中某个节点发送命令,节点向客户端返回moved异常,告诉客户端数据对应目标槽的节点信息。

2.客户端再向目标节点发送命令,目标节点中的槽已经迁移出别的节点上了,此时目标节点会返回ask重定向给客户端。

2.客户端向新的target节点发送Asking命令,然后再次向新节点发送请求请求命令。

3.新节点target执行命令,把命令执行结果返回给客户端。

spark exactly once 语义随笔

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

语言所蕴含的意义就是语义(semantic)。简单的说,符号是语言的载体。符号本身没有任何意义,只有被赋予含义的符号才能够被使用,这时候语言就转化为了信息,而语言的含义就是语义。
语义可以简单地看作是数据所对应的现实世界中的事物所代表的概念的含义,以及这些含义之间的关系,是数据在某个领域上的解释和逻辑表示。

一、三种语义

spark exactly once 语义是spark很重要的一个概念。分布式系统中都基本上会有三种关键的语义

  • at most once:至多一次,数据可能会丢,但不会重复
  • at least once:至少一次,数据肯定不会丢失,但可能重复
  • exactly once:有且只有一次,数据不丢失不重复,且只出现一次; 该语义是最理想的,但也难以实现

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义

二、接收数据 – 数据源的可重放性

取决于上游数据源的特性

  • HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。
  • 上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead Log 特性来实现 At-least-once 语义。
  • 非可靠的数据接收器(如 socketTextStream),当 Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。
  • Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义

SparkStreaming接入Kafka的数据有两种模式

一种为Receiver模式,一种为Direct模式。

Receiver模式

Receiver模式都不能够实现exactly once的语义,其根本原因是Kafka自己管理的offset与SparkStreaming实际处理数据的offset没有同步导致的。

Receiver模式采用Kafka的高阶consumer API,Kafka自己封装了对数据的获取逻辑,且通过Zookeeper管理offset信息,这种模式在与SparkStreaming对接时,有以下特点:

  1. Kafka中的partition数量与SparkStreaming中的并行度不是一一对应的,SparkStreaming通过创建Receiver去读取Kafka中数据,createStream()方法传入的并发参数代表的是读取Kafka中topic+partition的线程数,并不能提高SparkStreaming读取数据的并行度。
  2. Kafka自己管理offset,Receiver作为一个高层的Consumer来消费数据,其消费的偏移量(offset)由Kafka记录在Zookeeper中,一旦出现错误,那些已经标记为消费过的数据将会丢失。
  3. Receiver模式下,为了解决读取数据时的并行度问题,可以创建多个DStream,然后union起来
  4. 为了解决数据丢失的问题,可以选择开启Spark的WAL(write ahead log)机制,每次处理数据前将预写日志写入到HDFS中,如果节点出现错误,可以从WAL中恢复。这种方法其实效率低下,不仅数据冗余(Kafka中有副本机制,Spark中还要存一份),且无法保证exactly once,数据可能重复消费。

Direct模式

Spark1.3中引入了Direct模式来替代Receiver模式,它使用Kafka的Simple consumer API,由Spark应用自己管理offset信息,以达成exactly once的语义,其特点如下:

  1. Kafka中的partition与SparkStreaming中的partition一一对应,也就是SparkStreaming读取数据的并行度取决于Kafka中partition的数量。
  2. 不依赖Receiver,而是通过低阶api直接找到topic+partition的leader获取数据,并由SparkStreaming应用自己负责追踪维护消费的offset。
  3. Direct模式下,SparkStreaming应用管理offset的方法案例,其中offset依然是存放在zookeeper中,但是由应用自身来管理的,offset也可以放在Redis、MySQL、HBase中进行管理,根据具体情况进行选择。

转化或汇总 – 计算过程中的容错性

Spark RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果;撇去Driver与Executor的高可用性不说,Spark应用内部则采用checkpoint和lineage的机制来确保容错性。

lineage

一般翻译为血统,简单来说就是RDD在转化的过程中,由于父RDD与子RDD存在依赖关系(Dependency),从而形成的lineage,也可以理解为lineage串起了RDD DAG。

RDD可以进行缓存,通过调用persist或者cache方法,将RDD持久化到内存或者磁盘中,这样缓存的RDD就可以被保留在计算节点的内存中被重用,缓存是构建Spark快速迭代的关键。

当一个RDD丢失的情况下,Spark会去寻找它的父RDD是否已经缓存,如果已经缓存,就可以通过父RDD直接算出当前的RDD,从而避免了缓存之前的RDD的计算过程,且只有丢失数据的partition需要进行重算,这样Spark就避免了RDD上的重复计算,能够极大的提升计算速度。

缓存虽然可以提升Spark快速迭代计算的速度,但是缓存是会丢失的。

checkpoint

检查点机制就是为了可以切断lineage的依赖关系,在某个重要的节点,将RDD持久化到文件系统中(一般选择HDFS),这样就算之前的缓存已经丢失了,也可以保证检查点数据不会丢失,这样在恢复的时候,会直接从检查点的数据开始进行计算,检查点机制在SparkStreaming这种流式计算中发挥的作用会更大。

可以通过以下源码为入口进一步了解Spark的缓存和检查点机制,RDD在进行计算的时候会调用其iterator方法,在该方法中会首先去读取缓存的数据,如果没有缓存的数据则会去读取checkpoint的数据

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

Spark在计算过程中采用的lineage和checkpoint机制相互结合,取长补短,再加上Spark各个组件底层本身就是具有高可用性,所以在Spark应用在转化计算的过程中,可是保证数据处理的exactly once。

结果输出 – 写入存储介质的幂等性或事务性

幂等更新

  • 多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的;saveAsTextFile 就是一种典型的幂等写入
  • 幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作
  • 对 Kafka DStream 做一些额外设置:
    1. 将 enable.auto.commit 设置为 false。默认情况下,Kafka DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
    2. 打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用在数据输出之后手动提交 Kafka 偏移量。HasOffsetRanges 类,以及 commitAsync API 可以做到这一点
      messages.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition { iter =>
          // output to database
        }
        messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }

事务更新

  • 务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
  • 通常会在 foreachPartition 方法中来执行数据库写入操作。对于 Map-only 流程来说是适用的
  • 这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}
  • 对于包含 Shuffle 的计算流程(如上文的错误日志统计),我们需要先将处理结果拉取到 Driver 进程中,然后才能执行事务操作;如果偏移量写入失败,或者重复处理了某一部分数据(offset != $fromOffset 判断条件不通过),该事务就会回滚,从而做到 Exactly-once。
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = processLogs(rdd).collect() // parse log and count error
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // save to error_log table
    }
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
        throw new Exception("fail to update offset")
      }
    }
  }
}

exactly once固然是个理想的状态,但其实现成本也是非常高的,在对数据可靠性要求不是很高的场景中,at-least-once甚至丢失少量数据也是可以作为一个选项考虑的

总结

官方已经不再推荐使用Receiver模式

Receiver接收方式

  • 多个Receiver接受数据效率高,但有丢失数据的风险
  • 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
  • Zookeeper维护offset有重复消费数据可能。
  • 使用高层次的API

Direct直连方式

  • 不使用Receiver,直接到kafka分区中读取数据
  • 不使用日志(WAL)机制
  • Spark自己维护offset
  • 使用低层次的API

git rebase 变基

git-rebase-变基

rebase 变基

          A---B---C    dev
         /
    D---E---F---G    master

git rebase 变基场景描述:

  • 两个分支master、dev,其中dev分支是在master分支上的提交点E拉出的分支
  • 在两个分支合并之前,master分支有了新的提交F、G
  • 此时合并dev分支到master分支是不被允许的
  • 因为git不知道怎么处理ABC与FG的关系了,会提醒你需要先在本地rebase

变基: 简单说就是修改dev分支的基础节点由E变到G

通俗解释:

rebase,变基,可以直接理解为改变基底。dev分支是基于master分支的E拉出来的分支,dev的基底是E。而master在E之后有新的提交,就相当于此时要用master上新的提交来作为dev的新基底。实际操作为把E之后dev的提交存下来,然后删掉原来这些提交,再找到master的最新提交位置,把存下来的提交再接上去(新节点新commit id),如此dev分支的基底就相当于变成了G而不是原来的E了

具体操作

由于master是主分支,所以我们都是去改变其它分支的基,故先切到需要变基的分支

无冲突rebase

    # 没有冲突的情况下,依次执行如下步骤即可
    git checkout dev

    git rebase master -i   (-i表示交互模式)

    git push -f     (因为修改了commit记录,需要强push)

有冲突rebase

    git checkout dev

    git rebase master -i   (-i表示交互模式)
    # 有冲突时,到这一步已经开始报错,这时需要手动解决冲突才可以
    git rebase --continue
    # 如果没有冲突,则说明已经可以继续后一步了,但是如果还有冲突,则需要不断处理冲突并执行这一步,知道没有冲突报错为止
    git push -f     (因为修改了commit记录,需要强push)

git rebase 产生冲突的处理方式

  1. git rebase –skip

抛弃本地的 commit,采用远程的 commit。慎用:因为你本地的修改都会失去。

  1. git rebase –abort

效果是:终止这次 rebase 操作

  1. git rebase –continue

手动处理冲突的文件:执行git add .,再 git rebase –continue,反复操作直到解决完所有冲突,并合并到分支上。

大部分公司其实会禁用rebase,不管是拉代码还是push代码统一都使用merge,虽然会多出无意义的一条提交记录“Merge … to …”,但至少能清楚地知道主线上谁合了的代码以及他们合代码的时间先后顺序