程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Google Cloud Composer BigQuery运算子-Get Jobs API HTTPError 404大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决Google Cloud Composer BigQuery运算子-Get Jobs API httpError 404?

开发过程中遇到Google Cloud Composer BigQuery运算子-Get Jobs API httpError 404的问题如何解决?下面主要结合日常开发的经验,给出你关于Google Cloud Composer BigQuery运算子-Get Jobs API httpError 404的解决方法建议,希望对你解决Google Cloud Composer BigQuery运算子-Get Jobs API httpError 404有所启发或帮助;

由于您的Bigquery数据集位于asia-southeast1中,因此默认情况下,Bigquery在相同的位置(asia- southeast1)创建了一个作业。但是,您的Composer环境中的Airflow试图获取作业的状态而未指定位置字段。

https ://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get

此问题已由我的PR修复,并已合并到master,并将在v2.0.0中发布。但是,Composer的最新Airflow版本是v1.10.2,因此您需要解决一些问题才能使其正常工作。

要解决此问题,您可以扩展Bigquerycursor并使用位置支持覆盖run_with_configuration()函数。请参https ://github.com/apache/airflow/pull/4695/files#diff- ee06f8fcbc476ea65446a30160c2a2b2R1213并检查如何打补丁。

解决方法

我正在尝试在GCC上运行BigQueryOperator。我已经成功运行BigQueryCreateEmptyTabLeoperator和BigQueryTabledeleteOperator。

这是我给dag的代码:

import datetiR_137_11845@e
import os
import logging


from airflow import configuration
from airflow import models
from airflow import DAG
from airflow.operators import email_operator
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_check_operator
from airflow.utils import trigger_rule
from contextlib import suppress
import json
from airflow.operators import python_operator

yesterday = datetiR_137_11845@e.datetiR_137_11845@e.combine(
    datetiR_137_11845@e.datetiR_137_11845@e.today() - datetiR_137_11845@e.timedelta(1),datetiR_137_11845@e.datetiR_137_11845@e.min.time())

default_dag_args = {
    # SetTing start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,'email_on_failure': True,'email_on_retry': True,'project_id' : 'censored','retries': 1,'retry_delay': datetiR_137_11845@e.timedelta(minutes=5),}

bq_dataset_name= 'test_tf_blocket'
bq_githib_table_id = bq_dataset_name + '.trialtable'

# [START composer_quickstart_schedule]
with models.DAG(
        dag_id='composer_nicholas',# ConTinue to run DAG once per day
        schedule_interval=datetiR_137_11845@e.timedelta(days=1),default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]

    def greeTing():
        logging.info('Hello World!')

    Hello_python = python_operator.PythonOperator(
            task_id='Hello',python_callable=greeTing)

    bq_union_query = bigquery_operator.bigQueryOperator(
        task_id='bq_union_query',bql="""
        SELEct * FROM test_tf_blocket.nicholas_union_query;
        """,query_params={})

    email_start = email_operator.EmailOperator(
        task_id='email_it',to='nicholas@censored.my',subject='Sample temail',html_content="""
        Done.
        """)

    Hello_python >> bq_union_query >> email_start

当dag碰到bigqueryOperator并返回error(log)时,它将失败:

*** Reading remote log from gs://asia-south1-staging-b017f2bf-bucket/logs/composer_nicholas/bq_union_query/2019-03-21T14:56:45.453098+00:00/30.log.
[2019-03-22 13:12:54,129] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,167] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,168] {models.py:1573} INFO -
-------------------------------------------------------------------------------
StarTing attempt 30 of 3
-------------------------------------------------------------------------------

[2019-03-22 13:12:54,199] {models.py:1595} INFO - ExecuTing <Task(BigQueryOperator): bq_union_query> on 2019-03-21T14:56:45.453098+00:00
[2019-03-22 13:12:54,200] {Base_task_runner.py:118} INFO - Running: ['bash','-c','airflow run composer_nicholas bq_union_query 2019-03-21T14:56:45.453098+00:00 --@R_685_6186@ 571 --raw -sd DAGS_FOLDER/nicholas_union_query.py --cfg_path /tmp/tmpn1ic1w_6']
[2019-03-22 13:13:06,400] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:06,400] {setTings.py:176} INFO - setTing.configure_orm(): Using pool setTings. pool_size=5,pool_recycle=1800
[2019-03-22 13:13:08,433] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,431] {default_celery.py:80} WARNING - You have configured a resulT_Backend of redis://airflow-redis-service:6379/0,it is highly recommended to use an alternative resulT_Backend (i.e. a databasE).
[2019-03-22 13:13:08,435] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,435] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-03-22 13:13:09,182] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,181] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2019-03-22 13:13:09,198] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,198] {Configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,210] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,210] {Configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,873] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,873] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/nicholas_union_query.py
[2019-03-22 13:13:12,207] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/models.py:2412: PendingDeprecationWarning: Invalid arguments were passed to BigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2019-03-22 13:13:12,208] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query *args: ()
[2019-03-22 13:13:12,208] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query **kwargs: {'api_resource_config': {'useQueryCache': True,'jobType': 'QUERY'}}
[2019-03-22 13:13:12,208] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   category=PendingDeprecationWarning
[2019-03-22 13:13:12,209] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py:151: DeprecationWarning: Deprecated parameter `bql` used in Task id: bq_union_query. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:12,210] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   category=DeprecationWarning)
[2019-03-22 13:13:16,838] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:16,838] {Cli.py:484} INFO - Running <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [running]> on host airflow-worker-7c9b9c7f86-xwhg5
[2019-03-22 13:13:17,455] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,453] {Bigquery_operator.py:159} INFO - ExecuTing: 
[2019-03-22 13:13:17,457] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query         SELEct * FROM test_tf_blocket.nicholas_union_query;
[2019-03-22 13:13:17,457] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query         
[2019-03-22 13:13:17,632] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,632] {gcp_api_base_hook.py:92} INFO - GetTing connection using `google.auth.default()` since no key file is defined for hook.
[2019-03-22 13:13:17,657] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,656] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBasecursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow
  category=DeprecationWarning)
[2019-03-22 13:13:18,338] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBasecursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:18,338] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   category=DeprecationWarning)
[2019-03-22 13:13:18,360] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,359] {discovery.py:873} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs?alt=json
[2019-03-22 13:13:18,885] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,884] {discovery.py:873} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json
[2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s',404)
TraceBACk (most recent call last)
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",line 1014,in run_with_configuratio
    jobId=self.running_@R_685_6186@).execute(
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_Helpers.py",line 130,in positional_wrappe
    return wrapped(*args,**kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py",line 851,in execut
    raise httpError(resp,content,uri=self.uri
googleapiclient.errors.httpError: <httpError 404 when requesTing https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS"

During handling of the above exception,another exception occurred

TraceBACk (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py",line 1659,in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py",line 180,in execut
    time_partitioning=self.time_partitionin
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",line 651,in run_quer
    return self.run_with_configuration(configuration
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",line 1036,in run_with_configuratio
    err.resp.status
Exception: ('BigQuery job status check failed. Final error was: %s',404
[2019-03-22 13:13:20,347] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,404)
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query TraceBACk (most recent call last):
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",in run_with_configuration
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     jobId=self.running_@R_685_6186@).execute()
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_Helpers.py",in positional_wrapper
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return wrapped(*args,**kwargs)
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py",in execute
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     raise httpError(resp,uri=self.uri)
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.httpError: <httpError 404 when requesTing https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,348] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query 
[2019-03-22 13:13:20,349] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception,another exception occurred:
[2019-03-22 13:13:20,349] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query 
[2019-03-22 13:13:20,349] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query TraceBACk (most recent call last):
[2019-03-22 13:13:20,349] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/models.py",in _run_raw_task
[2019-03-22 13:13:20,350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     result = task_copy.execute(context=context)
[2019-03-22 13:13:20,350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py",350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",in run_query
[2019-03-22 13:13:20,350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,350] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     err.resp.status)
[2019-03-22 13:13:20,351] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s',352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY
[2019-03-22 13:13:20,352] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,400] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query TraceBACk (most recent call last):
[2019-03-22 13:13:20,400] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",403] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     jobId=self.running_@R_685_6186@).execute()
[2019-03-22 13:13:20,405] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_Helpers.py",406] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return wrapped(*args,407] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py",408] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     raise httpError(resp,409] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.httpError: <httpError 404 when requesTing https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,409] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query 
[2019-03-22 13:13:20,410] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception,411] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query 
[2019-03-22 13:13:20,411] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query TraceBACk (most recent call last):
[2019-03-22 13:13:20,411] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/bin/airflow",line 7,in <module>
[2019-03-22 13:13:20,412] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     exec(compile(f.read(),__file__,'exec'))
[2019-03-22 13:13:20,412] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/bin/airflow",line 32,413] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     args.func(args)
[2019-03-22 13:13:20,414] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/utils/cli.py",line 74,in wrapper
[2019-03-22 13:13:20,414] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return f(*args,415] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/bin/cli.py",line 490,in run
[2019-03-22 13:13:20,416] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     _run(args,dag,ti)
[2019-03-22 13:13:20,416] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/bin/cli.py",line 406,in _run
[2019-03-22 13:13:20,417] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     pool=args.pool,[2019-03-22 13:13:20,418] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/utils/db.py",420] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return func(*args,421] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/models.py",421] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     result = task_copy.execute(context=context)
[2019-03-22 13:13:20,421] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py",422] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,422] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",425] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,425] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py",427] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query     err.resp.status)
[2019-03-22 13:13:20,427] {Base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s',404)

如果我键入了另一个SQL查询,例如(delete
table),则该查询将起作用。为了简单起见,我在这里进行选择查询。关键是,此处的sql查询有效,但dag失败。似乎dag无法从BQ检索查询历史/作业历史。我检查了json文件是否存在,是的。截屏

BQ SS

最初,我认为这是一个权限问题,但是我检查了一下,并且云编写器生成的服务帐户具有项目所有者权限和BQ管理员权限。我已经尝试过搜索,但是似乎找不到答案。

任何帮助表示赞赏。

大佬总结

以上是大佬教程为你收集整理的Google Cloud Composer BigQuery运算子-Get Jobs API HTTPError 404全部内容,希望文章能够帮你解决Google Cloud Composer BigQuery运算子-Get Jobs API HTTPError 404所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。