程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了气流工人:他们怎么知道该怎么做? + 问题大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决气流工人:他们怎么知道该怎么做? + 问题?

开发过程中遇到气流工人:他们怎么知道该怎么做? + 问题的问题如何解决?下面主要结合日常开发的经验,给出你关于气流工人:他们怎么知道该怎么做? + 问题的解决方法建议,希望对你解决气流工人:他们怎么知道该怎么做? + 问题有所启发或帮助;

我使用 Airflow(在 Cloud Composer 上)已经一年了,我很难弄清楚(Celery)工作人员在收到要执行的任务时如何知道要执行的操作。

据我所知:

  • 我们将一些 DAG 放在 /dags 文件夹中。
  • 调度程序通过循环过程解析 DAG 并将结果保存在元数据数据库中,它还根据依赖关系确定 DAG 中的任务是否必须运行。
  • 如果某些任务必须运行,Executor 会将任务发送到由 Celery 工作人员侦听的队列。
  • 其中一名 Celery 工人获得执行任务并完成工作。

但是 Celery 工人如何知道要执行什么? 我可以看到有一个日志说:

[2021-06-30 12:58:59,814] {standard_task_runner.py:77} INFO - Running: ['airflow','run','dag_to_exec','task_to_exec','2021-06-30T12:57:09+00:00','--@R_601_6186@','2822201','--pool','default_pool','--raw','-sd','DAGS_FolDER/dag_to_exec.py','--cfg_path','/tmp/tmpank91zop']

如果我错了,请纠正我,但是 '-sd','DAGS_FolDER/dag_to_exec.py' 部分是在这里对这个 Airflow 工作人员说“从保存在那里的这个 dag 执行这个任务”吗?所以 Airflow 工作人员也需要解析 DAG 才能理解它,对吗?我说“也”是因为调度程序确实过早地解析了它。

如果您有要共享的链接或要查看的部分源代码以了解这一点,请提前致谢!

解决方法

是的。你有正确的理解。

DAG 由工作人员和调度程序两者解析。调度程序永远不会执行 execute() 定义对象的 BaseOperator 方法。它将解析 DAG 文件,将 DAG 和运算符构建为 Python 对象,并在它们之间建立关系,以便能够知道应该安排什么。

这个解析/创建步骤由每个工作人员在执行任务之前重新执行,以便能够构建“BaseOperator”派生对象(包括依赖项,但对工作人员来说并不重要),选择正确的“任务” (即由 task_id 标识的 BaseOperator 派生对象并运行它的 execute() 方法(也有一些细微差别,例如 pre-executepost-execute 方法也在执行)。

这在从 https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads 最容易到达的几个地方进行了高级描述

如果你觉得这样的“类/解析”关系的描述不容易找到/理解,我诚挚地邀请你帮助社区并从新人的角度添加这样的描述。如果第一次尝试理解它的人为其他人提供更多背景信息,这总是最好的(作为长期的 Airflow 提交者,我们脑子里有很多假设)。

这实际上非常简单 https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads - 在底部包含“建议更改”链接,您可以使用它为文档制作 PR(确保您先 fork 气流 repo)。

大佬总结

以上是大佬教程为你收集整理的气流工人:他们怎么知道该怎么做? + 问题全部内容,希望文章能够帮你解决气流工人:他们怎么知道该怎么做? + 问题所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。