I'm trying to use the Dataproc submit job operator from Airflow (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/dataproc/index.html#airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator) to submit a spark job to a Dataproc cluster. I've given all of the necessary arguments specified in the documentation and I'm getting the the below error: error message from airflow
There is no field called cluster name that is required for the operator but i've tried adding the cluster name field in all possible places in the task and nothing is working. I've found similar errors but none have this exact one.
Edit: Complete error log message
[2025-03-04, 21:37:45 UTC] {taskinstance.py:1938} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
return callable_(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
return _end_unary_response_blocking(state, call, False, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Cluster name is required"
debug_error_string = "UNKNOWN:Error received from peer ipv4:199.36.153.8:443 {created_time:"2025-03-04T21:37:45.543592704+00:00", grpc_status:3, grpc_message:"Cluster name is required"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2211, in execute
job_object = self.hook.submit_job(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 790, in submit_job
return client.submit_job(
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 547, in submit_job
response = rpc(
^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 366, in retry_wrapped_func
return retry_target(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 204, in retry_target
return target()
^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 77, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Cluster name is required
Edit 2:
Dataproc submit job task:
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
dp_submit_job = DataprocSubmitJobOperator(
task_id="dp_submit_job",
gcp_conn_id='', #conn id for af instace
project_id='', #has gcp project ID
region='us-east4',
job={'spark_job': {'main_class': 'WordCount',
'jar_file_uris': 'gs://<gcs-bucket-name>/wordcount_2.12-0.1.0-SNAPSHOT.jar'}},
#jobPlacement={'clusterName' : 'cluster-name'},
request_id='48c07706-feb6-4aaa-9df0-c95ccc1e2b99', #to ignore duplicate submit job requests
Rest of the code is just setting up the DAG.
Edit 3:
Fixes suggested by https://stackoverflow.com/users/609290/dazwilkin worked for my original issue.
Though when i was trying to create a cluster to submit and run the spark job using the DataprocCreateClusterOperator the Operator was unable to use my specified gcp_conn_id to access the project. I verified the conn_id's validity by running other operators using the same conn id and also verified that the associated service account has the necessary roles and permissions. I circumvented this by using an alternate create cluster operator used specifically in my org. But I'm facing the same issue when i used the DataprocCreateBatchOperator. Adding the DAG code and the error logs below
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator # type: ignore # noqa: I001
dp_create_batch = DataprocCreateBatchOperator(
task_id="dp_create_batch",
gcp_conn_id='<conn-id>',
project_id='<project-id>',
region='us-east4',
batch={
"spark_batch": {
'main_class': 'WordCount',
'jar_file_uris': ['gs://<bucket-name>/wordcount_2.12-0.1.0-SNAPSHOT.jar'],
},
},
batch_id='testingbatchoperator',
)
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
return callable_(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
return _end_unary_response_blocking(state, call, False, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.NOT_FOUND
details = "The resource 'projects/<project-name>/global/networks/default' was not found"
debug_error_string = "UNKNOWN:Error received from peer ipv4:199.36.153.8:443 {grpc_message:"The resource \'projects/<project-name>/global/networks/default\' was not found", grpc_status:5, created_time:"2025-03-10T19:18:21.022463126+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 2522, in execute
self.operation = hook.create_batch(
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 863, in create_batch
result = client.create_batch(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/batch_controller/client.py", line 601, in create_batch
response = rpc(
^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 77, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 The resource 'projects/<project-name>/global/networks/default' was not found
Jobandplacement(JobPlacement) which expectsclusterNamejob