文章目录
官网参考
# https://ci.apache.org/projects/flink/flink-docs-release- 1.11/concepts/flink-architecture.html
1.任务
Task:任务 是一个阶段多个相同功能的subtask的集合 ==》 对比于spark中的taskSet
SubTask:子任务 subTask是Flink中任务的最小单元,subtask=并行度
2.通过flinkUI可以看到 有几个框就是几个Task
Flink的task如何划分的
(1)并行度发生变化 都会产生一个taks 比如 parallelism1—reblance—>parallelism4
(2)算子KeyBy 走的 —Hash—>
(3)startNewChain 开启一个新的chain
(4)disableChaining 当前算子操作单独出来
subtask=并行度,就像spark中并行度等于partition
3.程序模型
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/programming-model.html
DAG
source transformation sink
each operator has one or more operator subtasks
一般subTask就是并行度的个数
4.流的分类
Streams can transport data between two operators in a one-to-one (or forwarding) pattern, or in a redistributing pattern:
(1)one-to-one (or forwarding)
(2)redistributing pattern( keyBy/window 类似于shuffle)
5.Operator Chains
(1)操作链(相当于spark的pipeline操作)
Flink chains operator subtasks together into tasks.
Each task is executed by one thread. Chaining operators together into tasks is a useful optimization:
it reduces the overhead of thread-to-thread handover and buffering,
and increases overall throughput while decreasing latency.
比如flatMap->filter->map 组成到一个task中,其中又有4个subTask.
在一个线程里面执行,可以减少线程之间的切换,增加吞吐量,减少延迟。
(2)关于操作链的两个算子
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/
a.Start new chain:
flatMap->filter->map原来是在一个chain中,是一个task.开启新链后,redistributing。
flatMap filter->map 变成了2个task
b.Disable chaining: 某一个算子计算复杂度非常高,可以单一一个task,这样会更好
flatMap->filter->map原来是在一个chain中,是一个task.
flatMap filter map 现在变成3个task
6.Task Slot (TM = JVM)
Each worker (TaskManager) is a JVM process,
and may execute one or more subtasks in separate threads.
To control how many tasks a worker accepts, a worker has so called task slots (at least one).
槽:有名字,默认叫default
对于同一个task(框)的不同subtask可以运行在同一个slot(pipeline)
对于同一个task(框)的相同subtask不能运行在同一个slot(因为分布式调度)
//更改slot的名字,默认default
.slotSharingGroup("slots")
此时在代码中更改slot的名字(slot的名字默认为default)
stream
.flatMap(_.split(","))
//更改slot的名字,默认default ,这样写作业一致crate,运行不了,等5分钟左右作业就会被干掉
.slotSharingGroup("customName")
.map((_,1))
.keyBy(1) //spark中是 shuffle 2个stage | flink中叫task
.sum(1)
.print
7.获取整个算子的执行计划
相当于mysql/phoenix/hive中的explain
a.获取执行计划
println(env.getExecutionPlan)
b.将打印json放到官方,画出DAG图
https://flink.apache.org/visualizer/
8.flink通过webUI的Jar包传到哪里去了 java.io.tmpdir=/tmp
(1)flink类WebOptions.java
JOB_MANAGER_WEB_TMPDIR_KEY
ConfigConstants
flinkUI的Configuration中有web.tmpdir=/tmp/flink-web-32位uuid
/tmp 下看tree命令
(2)官网中的flink的所有配置
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
Web UI
web.submit.enable: Enables uploading and starting jobs through the Flink UI (true by default). Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
web.upload.dir: The directory where to store uploaded jobs. Only used when web.submit.enable is true.
(3)在flink-conf.yml中配置
web.upload.dir=/flink/app/flink-jars
web.tmpdir=/flink/app/flink-jars
先去web.upload.dir去找,再去web.tmpdir中找
RestServerEndpointConfiguration类