1

I'm trying to use the Dataproc submit job operator from Airflow (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/dataproc/index.html#airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator) to submit a spark job to a Dataproc cluster. I've given all of the necessary arguments specified in the documentation and I'm getting the the below error: error message from airflow

There is no field called cluster name that is required for the operator but i've tried adding the cluster name field in all possible places in the task and nothing is working. I've found similar errors but none have this exact one.

Edit: Complete error log message

[2025-03-04, 21:37:45 UTC] {taskinstance.py:1938} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
    return callable_(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Cluster name is required"
    debug_error_string = "UNKNOWN:Error received from peer ipv4:199.36.153.8:443 {created_time:"2025-03-04T21:37:45.543592704+00:00", grpc_status:3, grpc_message:"Cluster name is required"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2211, in execute
    job_object = self.hook.submit_job(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 790, in submit_job
    return client.submit_job(
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 547, in submit_job
    response = rpc(
               ^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 366, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 204, in retry_target
    return target()
           ^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 77, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Cluster name is required

Edit 2:

Dataproc submit job task:

from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator

    dp_submit_job = DataprocSubmitJobOperator(
        task_id="dp_submit_job",
        gcp_conn_id='', #conn id for af instace
        project_id='', #has gcp project ID
        region='us-east4',
        
        job={'spark_job': {'main_class': 'WordCount',
                   'jar_file_uris': 'gs://<gcs-bucket-name>/wordcount_2.12-0.1.0-SNAPSHOT.jar'}},

        #jobPlacement={'clusterName' : 'cluster-name'},

        request_id='48c07706-feb6-4aaa-9df0-c95ccc1e2b99', #to ignore duplicate submit job requests

Rest of the code is just setting up the DAG.


Edit 3:

Fixes suggested by https://stackoverflow.com/users/609290/dazwilkin worked for my original issue.

Though when i was trying to create a cluster to submit and run the spark job using the DataprocCreateClusterOperator the Operator was unable to use my specified gcp_conn_id to access the project. I verified the conn_id's validity by running other operators using the same conn id and also verified that the associated service account has the necessary roles and permissions. I circumvented this by using an alternate create cluster operator used specifically in my org. But I'm facing the same issue when i used the DataprocCreateBatchOperator. Adding the DAG code and the error logs below

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator  # type: ignore # noqa: I001

    dp_create_batch = DataprocCreateBatchOperator(
        task_id="dp_create_batch",
        gcp_conn_id='<conn-id>',

        project_id='<project-id>',
        region='us-east4',
        batch={
        "spark_batch": {
            'main_class': 'WordCount',
            'jar_file_uris': ['gs://<bucket-name>/wordcount_2.12-0.1.0-SNAPSHOT.jar'],
        },
    },
        batch_id='testingbatchoperator',
    )

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
    return callable_(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.NOT_FOUND
    details = "The resource 'projects/<project-name>/global/networks/default' was not found"
    debug_error_string = "UNKNOWN:Error received from peer ipv4:199.36.153.8:443 {grpc_message:"The resource \'projects/<project-name>/global/networks/default\' was not found", grpc_status:5, created_time:"2025-03-10T19:18:21.022463126+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2522, in execute
    self.operation = hook.create_batch(
                     ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 863, in create_batch
    result = client.create_batch(
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/batch_controller/client.py", line 601, in create_batch
    response = rpc(
               ^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 77, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 The resource 'projects/<project-name>/global/networks/default' was not found

7
  • See Job and placement (JobPlacement) which expects clusterName Commented Mar 4 at 22:20
  • Please don't use images of text in questions when it's trivial to copy-paste the text directly. Text can be useful when responding to questions. Images may not outlast questions. Commented Mar 4 at 22:21
  • tried that. the operator will not accept anything other than Job. I've tried adding clusterName in every way, including inside the Job dict. doesnt work, and the error logs arent specific on where it expects cluster name. I'll add the entire error message now Commented Mar 4 at 22:28
  • Thanks for replacing the image. Please include your code that includes the value of job Commented Mar 5 at 0:22
  • The docs weren't too specific about exactly how the job argument should look so i've given it as a dict inside a dict from what i understood in cloud.google.com/dataproc/docs/reference/rest/v1/… and cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob Commented Mar 5 at 0:42

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.