docker compose 入门随笔

some 配置文件

Docker Compose是Docker编排服务的一部分,Compose可以让用户在集群中部署分布式应用。 Docker Compose是一个属于“应用层”的服务,用户可以定义哪个容器组运行哪个应用,它支持动态改变应用,并在需要时扩展。
Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

.dockerignore

.dockerignore 文件的作用类似于 git 工程中的 .gitignore 。不同的是 .dockerignore 应用于 docker 镜像的构建,它存在于 docker 构建上下文的根目录,用来排除不需要上传到 docker 服务端的文件或目录。

docker 在构建镜像时首先从构建上下文找有没有 .dockerignore 文件,如果有的话则在上传上下文到 docker 服务端时忽略掉 .dockerignore 里面的文件列表

  • 每行为一个条目;
  • 以 # 开头的行为注释;
  • 空行被忽略;
  • 构建上下文路径为所有文件的根路径;
规则 含义
*/temp* 匹配根路径下一级目录下所有以 temp 开头的文件或目录
*/*/temp* row 2 col 2
temp? 匹配根路径下以 temp 开头,任意一个字符结尾的文件或目录
**/*.go 匹配所有路径下以 .go 结尾的文件或目录,即递归搜索所有路径
*.md !README.md 匹配根路径下所有以 .md 结尾的文件或目录,但 README.md 除外

两个匹配语法规则有包含或者重叠关系,那么以后面的匹配规则为准

.readthedocs.yaml

.env

The .env file feature only works when you use the docker-compose up command and does not work with docker stack deploy.

  1. Compose支持在名为.env的环境文件中声明默认环境变量,该文件放置在执行docker-compose命令的文件夹(当前工作目录)中即可生效
  2. 可以将不同的.env放置在不同的目录下,在哪个目录执行docker-compose命令,哪个目录下的.env文件就会生效。
  3. 定义在.env中的环境变量主要用于Compose配置文件中的变量替换,特别是当多个Compose配置文件都用到同一个环境变量时,可以将该环境变量定义在.env中,当环境变量的值改变了也不必修改所有Compose配置文件,只需修改.env文件中的环境变量值即可,很方便维护。
  4. 也可以用于定义以下Docker Compose CLI环境变量
  5. env文件中的每一行都采用VAR=VAL格式。
  6. 以#开头的行将作为注释处理并被忽略。
  7. 忽略空白行。
  8. 引号不会被特殊处理,意味着它们是VAL的一部分。
  9. .env文件中定义的环境变量在容器内部不会自动显示

.pre-commit-config.yaml

在提交代码审查之前,Git钩子脚本可用于识别简单问题。我们在每次提交时运行我们的钩子,以自动指出代码中的问题,例如缺少分号、尾随空格和调试语句。通过在代码审查之前指出这些问题,这允许代码审查者专注于变更的体系结构,同时不会浪费时间与琐碎的样式挑剔。

.prettierignore

https://prettier.io/docs/en/index.html

代码自动格式化

.editorconfig

. EditorConfig helps maintain consistent coding styles for multiple developers working on the same project across various editors and IDEs. The EditorConfig project consists of a file format for defining coding styles and a collection of text editor plugins that enable editors to read the file format and adhere to defined styles. EditorConfig files are easily readable and they work nicely with version control systems.
. https://editorconfig.org/#file-location

  • EditorConfig 有助于为跨各种编辑器和 IDE 处理同一项目的多个开发人员保持一致的编码风格,例如下边的demo,Python 和 JavaScript 文件设置行尾和缩进样式的示例文件
  • 文件放在项目的跟目录下,很多的ide都默认支持,不默认支持的也有对应的插件
# EditorConfig is awesome: https://EditorConfig.org

# top-most EditorConfig file
root = true

# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true

# Matches multiple files with brace expansion notation
# Set default charset
[*.{js,py}]
charset = utf-8

# 4 space indentation
[*.py]
indent_style = space
indent_size = 4

# Tab indentation (no size specified)
[Makefile]
indent_style = tab

# Indentation override for all JS under lib directory
[lib/**.js]
indent_style = space
indent_size = 2

# Matches the exact files either package.json or .travis.yml
[{package.json,.travis.yml}]
indent_style = space
indent_size = 2

配置参数说明

version: '3.7' # 指定 compose 文件的版本
services: # 定义所有的 service 信息, services 下面的第一级别的 key 既是一个 service 的名称
  aaa: #服务aaa
    build: # 与image二选一,指定包含构建上下文的路径, 或作为一个对象,该对象具有 context 和指定的 dockerfile 文件以及 args 参数值
      context: .               # context: 指定 Dockerfile 文件所在的路径
      dockerfile: Dockerfile            # dockerfile: 指定 context 指定的目录下面的 Dockerfile 的名称(默认为 Dockerfile)
      args: # args: Dockerfile 在 build 过程中需要的参数 (等同于 docker container build --build-arg 的作用)
        JAR_FILE: service.jar
      cache_from:            # v3.2中新增的参数, 指定缓存的镜像列表 (等同于 docker container build --cache_from 的作用)
      labels:                # v3.3中新增的参数, 设置镜像的元数据 (等同于 docker container build --labels 的作用)
      shm_size:              # v3.5中新增的参数, 设置容器 /dev/shm 分区的大小 (等同于 docker container build --shm-size 的作用)
    #ports:
    #  - "80:80"
    #  - "443:443"
    ports: # 建立宿主机与容器间的端口映射关系,上面是短语法写法,下面是长语法写法
      - target: 80     # 容器端口
        published: 80  # 宿主机端口
        protocol: tcp  # 协议类型
        mode: host     # host在每个节点上发布主机端口,ingress 对于集群模式端口进行负载均衡
      - target: 443
        published: 443
        protocol: tcp
        mode: host
    command:               # 覆盖容器启动后默认执行的命令, 支持 shell 格式和 [] 格式
    configs:               # 不知道怎么用
    cgroup_parent:         # 为容器指定父 cgroup 组,意味着将继承该组的资源限制。
    container_name:        # 指定容器的名称 (等同于 docker run --name 的作用)
    deploy: # v3 版本以上, 指定与部署和运行服务相关的配置, deploy 部分是 docker stack 使用的, docker stack 依赖 docker swarm
      endpoint_mode: vip      # v3.3 版本中新增的功能, 指定服务暴露的方式
      #      vip                  # Docker 为该服务分配了一个虚拟 IP(VIP), 作为客户端的访问服务的地址
      #      dnsrr               # DNS轮询, Docker 为该服务设置 DNS 条目, 使得服务名称的 DNS 查询返回一个 IP 地址列表, 客户端直接访问其中的一个地址
      labels:                # 指定服务的标签,这些标签仅在服务上设置
      mode: replicated                  # 指定 deploy 的模式
      #      global              # 每个集群节点都只有一个容器
      #      replicated           # 用户可以指定集群中容器的数量(默认)
      placement:
        constraints:
          - node.role==manager             # 不知道怎么用
      replicas: 1              # deploy 的 mode 为 replicated 时, 指定容器副本的数量
      resources: # 资源限制
        limits: # 设置容器的资源限制
          cpus: "0.5"           # 设置该容器最多只能使用 50% 的 CPU
          memory: 50M           # 设置该容器最多只能使用 50M 的内存空间
        reservations: # 设置为容器预留的系统资源(随时可用)
          cpus: "0.2"           # 为该容器保留 20% 的 CPU
          memory: 20M           # 为该容器保留 20M 的内存空间
      restart_policy: # 定义容器重启策略, 用于代替 restart 参数
        condition: on-failure             # 定义容器重启策略(接受三个参数)
        #          none:                    # 不尝试重启
        #          on-failure:              # 只有当容器内部应用程序出现问题才会重启
        #          any  :                   # 无论如何都会尝试重启(默认)
        delay: 10s                  # 尝试重启的间隔时间(默认为 0s)
        max_attempts: 6           # 尝试重启次数(默认一直尝试重启)
        window: 120s              # 检查重启是否成功之前的等待时间(即如果容器启动了, 隔多少秒之后去检测容器是否正常, 默认 0s)
      update_config: # 用于配置滚动更新配置
        parallelism: 1          # 一次性更新的容器数量
        delay: 10s                 # 更新一组容器之间的间隔时间
        order: stop-first                 # v3.4 版本中新增的参数, 回滚期间的操作顺序
        #      stop-first            #旧任务在启动新任务之前停止(默认)
        #      start-first           #首先启动新任务, 并且正在运行的任务暂时重叠
        failure_action: continue       # 定义更新失败的策略
        #      continue              # 继续更新
        #      rollback              # 回滚更新
        #      pause                 # 暂停更新(默认)
        #      monitor               # 每次更新后的持续时间以监视更新是否失败(单位: ns|us|ms|s|m|h) (默认为0)
        max_failure_ratio: 0     # 回滚期间容忍的失败率(默认值为0)
      rollback_config: # v3.7 版本中新增的参数, 用于定义在 update_config 更新失败的回滚策略
        parallelism: 1         # 一次回滚的容器数, 如果设置为0, 则所有容器同时回滚
        delay: 0           # 每个组回滚之间的时间间隔(默认为0)
        failure_action: continue # 定义回滚失败的策略
        #          continue             # 继续回滚
        #          pause             # 暂停回滚
        monitor: 10s           # 每次回滚任务后的持续时间以监视失败(单位: ns|us|ms|s|m|h) (默认为0)
        max_failure_ratio: 0     # 回滚期间容忍的失败率(默认值0)
        order: stop-first            # 回滚期间的操作顺序
    #          stop-first            # 旧任务在启动新任务之前停止(默认)
    #          start-first            # 首先启动新任务, 并且正在运行的任务暂时重叠
    devices:             # 指定设备映射列表 (等同于 docker run --device 的作用)
    depends_on: #依赖容器
      - db
      - redis
    dns:                 # 设置 DNS 地址(等同于 docker run --dns 的作用)
    dns_search:           # 设置 DNS 搜索域(等同于 docker run --dns-search 的作用)
    tmpfs:               # v2 版本以上, 挂载目录到容器中, 作为容器的临时文件系统(等同于 docker run --tmpfs 的作用, 在使用 swarm 部署时将忽略该选项)
    entrypoint:          # 覆盖容器的默认 entrypoint 指令 (等同于 docker run --entrypoint 的作用)
    env_file: # 从指定文件中读取变量设置为容器中的环境变量, 可以是单个值或者一个文件列表, 如果多个文件中的变量重名则后面的变量覆盖前面的变量, environment 的值覆盖 env_file 的值
      RACK_ENV=development
    volumes: #  定义容器和宿主机的数据卷映射关系
      - "/u01:/u01"   # 映射容器内的 /u01 到宿主机的 /u01目录
    environment: # 设置环境变量, environment 的值可以覆盖 env_file 的值 (等同于 docker run --env 的作用)
      - TZ=Asia/Shanghai
      - PORT_TO_EXPOSE=80
      - LOG_PATH=/opt/proj/logs
      - PROFILES_ACTIVE=prod

    expose:              # 暴露端口, 但是不能和宿主机建立映射关系, 类似于 Dockerfile 的 EXPOSE 指令
    external_links:      # 连接不在 docker-compose.yml 中定义的容器或者不在 compose 管理的容器(docker run 启动的容器, 在 v3 版本中使用 swarm 部署时将忽略该选项)
    extra_hosts:         # 添加 host 记录到容器中的 /etc/hosts 中 (等同于 docker run --add-host 的作用)
    healthcheck: # v2.1 以上版本, 定义容器健康状态检查, 类似于 Dockerfile 的 HEALTHCHECK 指令
      test: NONE # 检查容器检查状态的命令, 该选项必须是一个字符串或者列表, 第一项必须是 NONE, CMD 或 CMD-SHELL, 如果其是一个字符串则相当于 CMD-SHELL 加该字符串
      #        NONE                  # 禁用容器的健康状态检测
      #        CMD                   # test: ["CMD", "curl", "-f", "http://localhost"]
      #        CMD-SHELL             # test: ["CMD-SHELL", "curl -f http://localhost || exit 1"] 或者 test: curl -f https://localhost || exit 1
      interval: 1m30s       # 每次检查之间的间隔时间
      timeout: 10s          # 运行命令的超时时间
      retries: 3            # 重试次数
      start_period: 40s     # v3.4 以上新增的选项, 定义容器启动时间间隔
      disable: true         # true 或 false, 表示是否禁用健康状态检测和 test: NONE 相同
    image:              # 指定 docker 镜像, 可以是远程仓库镜像、本地镜像
    init:               # v3.7 中新增的参数, true 或 false 表示是否在容器中运行一个 init, 它接收信号并传递给进程
    isolation:            # 隔离容器技术, 在 Linux 中仅支持 default 值
    labels:             # 使用 Docker 标签将元数据添加到容器, 与 Dockerfile 中的 LABELS 类似
    links:              # 链接到其它服务中的容器, 该选项是 docker 历史遗留的选项, 目前已被用户自定义网络名称空间取代, 最终有可能被废弃 (在使用 swarm 部署时将忽略该选项)
    logging: # 设置容器日志服务
      driver:              # 指定日志记录驱动程序, 默认 json-file (等同于 docker run --log-driver 的作用)
      options: # 指定日志的相关参数 (等同于 docker run --log-opt 的作用)
        max-size:             # 设置单个日志文件的大小, 当到达这个值后会进行日志滚动操作
        max-file:            # 日志文件保留的数量
    network_mode:       # 指定网络模式 (等同于 docker run --net 的作用, 在使用 swarm 部署时将忽略该选项)
networks: # 将容器加入指定网络 (等同于 docker network connect 的作用), networks 可以位于 compose 文件顶级键和 services 键的二级键
  aliases:              # 同一网络上的容器可以使用服务名称或别名连接到其中一个服务的容器
  ipv4_address          # IP V4 格式
  ipv6_address          # IP V6 格式

实例配置1

version: '3.9'
services:
  pms-tsssd-service:
    build:
      context: .
      args:
        JAR_FILE: servsssisce.jar
      dockerfile: Dockerfile
    image: pms-td-ssssservice:1.0.0
    restart: always
    ports:
      - "9199:80"
    environment:
      - TZ=Asia/Shanghai
      - PORT_TO_EXPOSE=80
      - LOG_PATH=/opt/proj/logs
      - PROFILES_ACTIVE=test
    volumes:
     ## - /opt/pro/pms-td-service.jar:/home/admin/service.jar
      - /Users/yuan/Desktop/nc/pms-td-service.jar:/home/admin/service.jar
      - /Users/yuan/Desktop/nc/logs:/opt/proj/logs
     # - /opt/pe/data/logs:/opt/proj/logs
    deploy:
      mode: replicated
      replicas: 1
      placement:
        constraints:
          - node.role==manager
      resources:
        limits:
          cpus: '2'
          memory: 4096M
        # reservations:
        #   cpus: '2'
        #   memory: 1024M
      update_config:
        parallelism: 1
        delay: 10s
        order: stop-first
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 6
        window: 120s
# networks:
#   host:
#     external: true
#   default:
#     external:
#       name: pldds_nctransfer

实例配置2


version: '3.7'   # 版本必须3.0及以上,否则无法使用docker-stack 运行
services: # 定义 service 信息
  emcs-nginx:
    image: "***/nginx:test"   # 使用指定的 docker 镜像
    #ports:
    #  - "80:80"
    #  - "443:443"
    ports: # 建立宿主机与容器间的端口映射关系,上面是短语法写法,下面是长语法写法
      - target: 80     # 容器端口
        published: 80  # 宿主机端口
        protocol: tcp  # 协议类型
        mode: host     # host在每个节点上发布主机端口,ingress 对于集群模式端口进行负载均衡
      - target: 443
        published: 443
        protocol: tcp
        mode: host
    volumes: #  定义容器和宿主机的数据卷映射关系
      - "/u01:/u01"   # 映射容器内的 /u01 到宿主机的 /u01目录
    environment: # 设置环境变量
      TZ: Asia/Shanghai   #  设置时区为上海
    deploy: # 指定与部署和运行服务相关的配置, deploy部分是docker stack使用的, docker stack依赖docker swarm
      mode: replicated  # 指定模式:global每个集群节点都只有一个容器,
      # replicated用户可以指定集群中容器的数量(默认)
      replicas: 1  # deploy的mode为 replicated 时, 指定容器副本的数量
      update_config: #  用于配置滚动更新的配置
        parallelism: 1 # 一次性更新的容器数量
        delay: 10s     # 更新一组容器之间的间隔10s
        order: stop-first  # 指定回滚期间的操作顺序:stop-first旧任务在启动新任务之前停止(默认),
        # start-first首先启动新任务, 并且正在运行的任务暂时重叠
      restart_policy: # 定义容器重启策略, 用于代替 restart 参数
        condition: on-failure   # 定义容器重启策略(接受三个参数):
        # none不尝试重启;any无论如何都会尝试重启(默认)
        # on-failure只有当容器内部应用程序出现问题才会重启

  emcs-front: #  第二个服务
    image: "***/front"
    volumes:
      - "/u01/log:/u01/log"
    environment:
      TZ: Asia/Shanghai
    deploy:
      mode: replicated
      replicas: 1
      update_config:
        parallelism: 1
        delay: 20s
        order: stop-first
      restart_policy:
        condition: on-failure

Spark ShuffeManager 基本介绍

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle

负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

  1. 在Spark1.2以前,默认的shuffle计算引擎是HashShuffleManager。
  2. 在Spark1.2以后的版本中,默认的ShuffleManager变成SortShuffleManager
  3. SortShuffleManager有两种机制,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制

HashShuffeManager VS SortShuffeManager

  • HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
  • SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle-read-task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可

HashShuffleManager运行原理:

  1. 未经优化的HashShuffleManager:

    • shuffle-write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子
      (比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算
      法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。

    • 在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    • 那么每个执行shuffle-write的task,要为下一个stage创建多少个磁盘文件呢?下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。

      比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。

    • shuffle-read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。

      由于shufflewrite的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffleread的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

  2. 优化后的HashShuffleManager:

    • spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
    • 开启consolidate机制之后,在shuffle-write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPUcore,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
    • 当Executor的CPU-core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。
    • 因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能

      • 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。
      • 但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:*CPUcore的数量下一个stage的task数量**。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager基本原理

  1. 普通运行机制:
    • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。
    • 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
    • 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
    • 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
    • 写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
    • 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。
    • 此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。

比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

  1. bypass运行机制:

    bypass运行机制的触发条件如下:

    1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    2. 不是聚合类的shuffle算子(比如reduceByKey)。
    • 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。
    • 当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
    • 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
    • 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。
    • 也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

spark stages 并行度 shuffle 宽窄依赖 随笔

spark stages 一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

stage

stage是什么?

Stage:调度阶段

一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

  • stage是由一组并行的task组成,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行
  • stage的划分依据就是看是否产生了shuflle(即宽依赖–reduceByKey, groupByKey等算子),遇到一个shuffle操作就划分为前后两 个stage
  • spark job是根据action算子触发的,遇到action算子就会起一个job
  • 同一个Stage内的所有Transformation算子所操作的RDD都是具有相同的Partition数量的

stage划分

关键点是Spark Stage划分依据主要是基于Shuffle

Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链(就是从最后一个RDD往前推算),遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。

每个Stage里task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。

  • ShuffleMapTask的计算结果需要shuffle到下一个Stage,其本质上相当于MapReduce中的mapper
  • ResultTask则相当于MapReduce中的reducer

ShuffleMapTask is a Task to produce a MapStatus (Task[MapStatus]).

ShuffleMapTask is one of the two types of Tasks. When executed, ShuffleMapTask writes the result of executing a serialized task code over the records (of a RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and estimated size of the result shuffle blocks).

ResultTask[T, U] is a Task that executes a partition processing function on a partition with records (of type T) to produce a result (of type U) that is sent back to the driver.

  1. job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。
  2. ResultTask执行task并将task输出返回给driver Application。
  3. ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。

为什么是从后往前推导?

因为RDD之间是有血缘关系的,后面的RDD依赖前面的RDD,也就是说后面的RDD要等前面的RDD执行完才会执行。 所以从后往前遇到宽依赖就划分为两个stage,shuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage。

Stage的调度是由DAG Scheduler完成的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG

从后往前遍历到最开始执行的Stage执行,如果提交的Stage仍有未完成的父Stage,则Stage需要等待其父Stage执行完才能执行。

spark的作业调度

RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行:

  1. 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
  2. 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
  3. 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行

宽窄依赖

Spark中RDD的粗粒度操作,每一次transformation都会生成一个新的RDD,这样就会建立RDD之间的前后依赖关系,在Spark中,依赖关系被定义为两种类型:宽依赖(Shuffle Dependency)与窄依赖(Narrow Dependency)

  1. 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用
  2. 宽依赖,父RDD的一个分区会被子RDD的多个分区使用(宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作)

区分宽窄依赖,我们主要从父RDD的Partition流向来看:流向单个RDD就是窄依赖,流向多个RDD就是宽依赖。

  1. 对于窄依赖,子rdd一个分区数据丢失只需要对一个父rdd进行重算,重算利用率100%。
  2. 对于宽依赖,子rdd一个分区数据丢失需要多该分区依赖的所有父rdd分区进行重算,重算利用率低。

并行度

并行度(paralleism):在分布式计算框架中,一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正实现多个任务并行执行,记住,这里是并行,而不是并发,这里我们将整个集群并行执行任务的数量,成为并行度。

spark中的并行度和分区之间是有关系的,rdd的每一个分区都是一个task,然后传送到对应的executor中进行计算。如果资源充足(executor core数=task数)并行度就等于分区数,如果(executor core数< task数)就是并发执行。

spark根据分区数来决定task的个数,而task的个数和executor所拥有的core数来决定着spark的并行度,当task数多余core数时,就会产生并发操作

改变并行度(parallelism)

  1. 设置合理的task数量,至少设置成与spark Application (executor)的总cpu core 数量相同。比如:150个分区,150个task,150个core,差不多每个task同时运行完毕。(官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300 ~ 500)
  2. 重新设置RDD的分区数,常见的方法有repartitions 、 coalesce、join、以及一些会产生宽依赖的算子。
  3. 一个stage的并行度由stage的最后一个rdd的分区决定。可以通过spark.default.parallelism可以设置当前stage的并行度

spark shuffle

shuffle是spark中数据重分发的一种机制,以便于在跨分区进行数据的分组。shuffle通常会引起executor
与节点之间的数据复制,这期间会有大量的网络I/O,磁盘I/O和数据的序列化。

  1. 在shuffle内部,单个map tasks的结果被保存在内存中,直到放不下为止。然后,根据目标分区对它们进行
    排序,并将它们写入单个文件。在reduce端,tasks会读取相关的经过排序的数据块。
  2. shuffle还会在磁盘上产生大量的中间文件,这样做是为了当触发重算的时候这些中间文件不用被重新创建。
  3. 垃圾收集可能会发生在很长的一段时间之后,如果应用程序保留了对这些RDD的引用,或者垃圾收集不经常启动的话这
    意味着对于一个运行时长较长的spark作业,它可能会消耗大量的磁盘空间。这些中间文件的存储目录在配置Spark
    Context时由spark.local.dir参数明确指定。