TaskManger与Slots
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。
默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。 这样的结果是,一个slot可以保存作业的整个管道。
Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。
也就是说,假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
flink 中并行任务的分配
- Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask
- 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
slot 主要隔离内存,cpu 是slot之间共享的。也就是说4核的机器 ,内存足够,可以把slot设置为8。最多能同时运行8个任务。建议一个核心数分配一个slot
这种图中 source、map 合成的task的并行度为6
keyby 、window、apply合成的task的并行度为6
sink的并行度为1
总共有13个task
但是不是需要13个slot才能满足这个并行度的要求
不同的算子操作复杂度不同
我们可以称像source map sink 这种 计算不复杂的算子称为非资源密集型的算子 aggregate reduce sum window 这种计算复杂的算子称为为资源密集型的算子
如果把这两种算子的优先级看作相同,平等的分配到slo中,当数据流source 来的数据速率相同时,会造成有些slot一直在跑复杂的算子,一直在运行中,当时一直跑简单算子的slot就会很空闲。
flink 这里是 非资源密集型的 算子和资源密集型的算子可以分配到同一个slot中 ,这样所有的slot之间任务就会平等,不会存在一直空闲一直高负载。
一个task的并行度是6 就会分为6个并行的task来跑,这六个task不能分配到同一个slot中必须一个slot只有一个。 也就是说 当你的集群的slot只有6 ,你不能设置算子的 并行度超过6。
flink 也能做到把非资源密集型和资源密集型的算子分到不同的slot中 这里需要设置共享组,非资源 密集型 的算子在一个共享组,资源密集 型的算子在一个共享组,这样这两种算子就不会共享的使用slot。默认情况下算有算子都属于同一个共享组,共享所有slot。
默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务但是可以分配到同一个slot上。 这样的结果是,一个 slot 可以保存多个作业的整个管道
Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力 。
下面看几个例子
并行可以分为两个方面
- 数据并行
source 并行拉数据 map 并行处理数据
- 计算并行
source 在拉新数据,map 在处理source 之前拉的数据
两个 job 的并行执行
一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。
一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度
idea里运行flink程序默认并行度是运行程序机器的核心数量。
每一个算子都可以单独设置并行。
.map((_, 1)).setParallelism(2)
也可以全局指定并行度。
val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2)
此时不支持并行的算子 比如env.readTextFile(inputpath) 就会报错
具体情况调整source和sink的并行度
val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2) 此时不支持并行的算子 比如env.readTextFile(inputpath) 就会报错 具体情况调整source和sink的并行度
三个位置可以配置并行度
- flink配置文件中
- 代码里
- flink任务提交时
优先级
代码>提交>配置文件
代码里设置用代码里的,代码里没设置用提交时设置的,都没设置用配置文件中的配置。
代码里算子单独设置优先级高于全局设置优先级
可以设置共享组 把 task 尽量均匀的分配到整个集群中
任务链
合理的设置并行度
- 减少本地通信的开销
- 减少序列化和反序列化
把多个算子合并为一个task,原本的算子成为里面的subtask
满足任务链需要一下条件
- 算子具有相同并行度(具有相同的分区数)
- 算子属于one-to-one
one-to-one :stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
并行度不同的算子之前传递数据会进行重分区,Redistributing类型的算子也会进行重分区。
例子
配置文件中默认并行度设置为2 ,提交代码是并行度设置为2
socket source 并行度只能是1
flatmap fliter map 并行度都是2 且属于one-to-one 合成任务链
keyby 属于redistrubuting hash 重分区
sum print 并行度为2 属于one-to-one
执行图如下
当然还可以禁止掉合成任务链
单个算子不参与合成任务链
.flatMap(_.split(" ")).disableChaining()
从单个算子开启一个新的任务链
.startNewChain()
全局不合成任务链
env.disableOperatorChaining()
下面是一个全局不合成任务链的job执行图,只是在上一个例子的基础上添加了全局不合成任务链。
算子设置并行度
- source 文件保证数顺序需要并行度为 1
- sink 只输出到一个文件需要并行度为 1
- socketsource 并行度只能为1