基于 kubernetes 的工作流任务调度工具 argo

知乎 · · 3845 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

argo 是一个基于 kubernetes CRD 实现的一个 Workflow(工作流) 工具,基于 kubernetes 的调度能力实现了工作流的控制和任务的运行,
同时提供了一个 UI 来方便我们查看任务的进程和详情等等;因为基于 kubernetes CRD 来进行 Workflow 的配置,
所以我们使用标准的 kubernetes 来进行相关的配置,并可以直接 kubectl 来进行管理;在 argo 配置 Workflow 的实现中,
template 是一个重要的概念,他是我们配置 Workflow 的支撑;

template

template 在 argo 中代表可运行的节点,一共有 6 种分类,分别是 Container, Script, Resource, Suspend, Steps, DAG;

实际运行类 Type

这类型代表了我们运行任务的基本单元

Container Type

该类型定义了我们一个容器运行的基础,会对应于我们在 kubernetes 中产生一个世纪的 pod,
该实体的类型对应于标准 kubernetes 的 Container resource,kubernetes Container 具有的参数我们都可以在其中使用;

简单实例:

name: sleep-n-sec
container:
    image: alpine:latest
    command: [sh, -c]
    args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

Script Type

该类型支持我们直接在 template 中定义并允许一段脚本,该 template 类型也是基于 Container 的,
不过在 Container 的类型上面添加了一个 Source 字段来表示我们的脚本,我们在 Container 的运行配置中应该注释初始化我们脚本运行的环境;

简单实例:

name: gen-number-list
script:
    image: python:alpine3.6
    command: [python]
    source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

Resource Type

该类型支持我们在 tempalte 中对 kubernetes 中资源操作的能力,我们可以 create, apply, delete, replace
(对应该模板的action字段) k8s 中的资源;并且支持设定相关的成功与失败条件用于判断该 tempalte 的成功与失败;

简单实例:

name: pi-tmpl
resource:                   #indicates that this is a resource template
    action: create            #can be any kubectl action (e.g. create, delete, apply, patch)
    # The successCondition and failureCondition are optional expressions.
    # If failureCondition is true, the step is considered failed.
    # If successCondition is true, the step is considered successful.
    # They use kubernetes label selection syntax and can be applied against any field
    # of the resource (not just labels). Multiple AND conditions can be represented by comma
    # delimited expressions.
    # For more details: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
    successCondition: status.succeeded > 0
    failureCondition: status.failed > 3
    manifest: |               #put your kubernetes spec here
        apiVersion: batch/v1
        kind: Job
        metadata:
            generateName: pi-job-
        spec:
            template:
            metadata:
                name: pi
            spec:
                containers:
                - name: pi
                image: perl
                command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
                restartPolicy: Never
            backoffLimit: 4

Suspend Type

该类型可以对我们的 Workflow 进行挂起操作,当我们运行到该类型的 template 时,我们的 Workflow 将会挂起,
等待我们运行 argo resume {name} Workflow 才会继续运行下去;同时对于挂起操作,
我们也可以直接运行 argo suspend {name} 来挂起某个 Workflow;

简单实例:

name: suspend-test
suspend: {}

控制流类 Type

这类型可以指定我们的控制流,一般在其中会引用 tempalte 来进行基础单元进行工作

Steps Type

该类型可以让我们把 template 组合成一条 pipeline 来进行运行,并且在流程中还支持 tempalte 间参数的传递,条件的判断,递归调用...; 它使用一个二维数组来进行定义,在第一级数组串行运行,在第二级数组并行运行;

简单实例:

name: coinflip
steps:
# flip a coin
- - name: flip-coin
    template: flip-coin
# evaluate the result in parallel
- - name: heads
    template: heads                 # 条件判断, 目前只支持 `==` 与 `!=`
    when: "{{steps.flip-coin.outputs.result}} == heads"
  - name: tails                   
    template: coinflip              # 递归调用
    when: "{{steps.flip-coin.outputs.result}} == tails"  # 条件判断
  - name: heads1
    template: heads

DAG type

该类型可以定义我们的 DAG 类型 workflow, 在其中我们根据 依赖 来进行 DAG 的定制,没有 依赖Task 将会作为 DAG 的 root 首先运行; 但是目前 DAG 中 Task 还不支持相关条件判断表达式来进行条件判断;

简单实例:

name: multiroot
dag:
    tasks:
    - name: A
      template: echo
      arguments:
        parameters: [{name: message, value: A}]
    - name: B
      dependencies:
      template: echo
      arguments:
        parameters: [{name: message, value: B}]
    - name: C
      dependencies: [A]
      template: echo
      arguments:
        parameters: [{name: message, value: C}]
    - name: D
      dependencies: [A, B]
      template: echo
      arguments:
        parameters: [{name: message, value: D}]

inputs

在配置我们的 template 时,我们都可以配置我们的 inputs 参数,他代表了我们该模板的输入参数,在其他模板引用该模板是可以达到参数传递的效果;同时我们可以在我们的 tempalte 配置中引用这些变量;

简单实例:

templates:
  - name: test
    steps:
    - - name: generate-parameter
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "test"
  - name: print-message
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

outputs

outputs 可以让我们获取我们容器运行完毕后的一些相关的信息,他有一个默认的 result 字段,代表容器运行过后程勋运行时输出到控制台的信息;
然后我们也可以配置参数来把运行容器中的某个文件读取出来作为参数;

简单实例:

name: whalesay
container:
  image: docker/whalesay:latest
  command: [sh, -c]
  args: ["echo -n {\"list\": [1,2,3,4,5,6,7,8,9]} > /tmp/hello_world.txt"]  #generate the content of hello_world.txt
outputs:
  parameters:
  - name: hello-param       #name of output parameter
    valueFrom:
      path: /tmp/hello_world.txt    #set the value of hello-param to the contents of this hello-world.txt

然后我们在 steps 或者 DAG 的 tasks 中可以通过 {{steps.{step_name}.outputs.parameters.hello-param}} 或者 {{tasks.{task_name}.outputs.parameters.hello-param}} 来获取任务的输出参数;

argo ui

目前 argo ui 只提供了查看相关状态的功能,同时没有鉴权相关的功能,可以让我们比较实时的看到我们 workflow 的变化;

参考

本文来自:知乎

感谢作者:知乎

查看原文:基于 kubernetes 的工作流任务调度工具 argo

3845 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传