1

I was trying to suppress the spark logging and specifying my own log4j.properties file.

gcloud dataproc jobs submit spark \
--cluster test-dataproc-cluster \
--region europe-north1 \
--files  gs://test-spark-logging-bucket/log4j.properties \
--properties spark.sql.legacy.allowUntypedScalaUDF=true,'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties,spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties' \
--class com.pythian.edp.pm.spark.job.TestSparkJob \
--jars gs://18621ad39476e29c-test-static/spark-jobs/sparkBigQueryConnector/spark-bigquery-assembly-0.11.1-beta-SNAPSHOT.jar,gs://18621ad39476e29c-test-static/spark-jobs/digiSparkPmCmProcessing/digiSparkPmCmProcessing-assembly-0.1.0-SNAPSHOT.jar \
-- --configurationURI gs://18621ad39476e29c-test-static/spark-job-configs/f9fabc4f-162e-40f3-af69-237e4c464c9e-PM-LTE-CELLCQI/ing-15min-cell-1719203688539.yml \
--jobType "ingestion" --dataType "PM" \
--sendPubSubNotification --pubSubProjectId bmas-eu-digi-pipe-uat --pubSubTopicName data-notifications \
--traceDatasets > test_log_ingestion_job_log 2>&1

above command is working fine but I am struggling to set it via python code. below is the snippet of the code I am working with.

   job_properties.update(
            {"spark.dynamicAllocation.enabled":"true",
             "spark.dynamicAllocation.minExecutors" : "0",
             "spark.dynamicAllocation.maxExecutors" : "5",
             "spark.executor.instances": "0",
             "spark.sql.legacy.allowUntypedScalaUDF":"true",
             "spark.executor.extraJavaOptions":"-Dlog4j.configuration=file:log4j.properties",
             "spark.sql.legacy.allowUntypedScalaUDF":"-Dlog4j.configuration=file:log4j.properties"
             })
    # log_file_location='gs://test-spark-logging-bucket/log4j.properties'
    job_details = {
        'placement': {
            'cluster_name': cluster_name,
        },
        'reference': {
            'job_id': job_id,
        },
        'scheduling': {
            'max_failures_per_hour': 1,
        },
        'labels': labels,
        'spark_job': {
            'args': spark_job_arguments,
            'main_class': SPARK_JOB_CLASSNAME,
            'jar_file_uris': [
                os.path.join(self.dataproc_job_jar_file_prefix, file) for file in jar_files
            ],
            'properties': job_properties,
        }
    }
1
  • Are you using the Dataproc_v1 library to submit the job? Commented Jul 22, 2024 at 11:33

1 Answer 1

0
  1. I assume you meant to set spark.executor.extraJavaOptions instead of this:

"spark.sql.legacy.allowUntypedScalaUDF":"-Dlog4j.configuration=file:log4j.properties"

  1. log4j.properties file needs to be made physically available on driver/executor nodes of the cluster. In CLI invocation (gcloud dataproc jobs submit spark) you're doing this using --files gs://test-spark-logging-bucket/log4j.properties. You need to do something similar while invoking using python. It would depend on what API you're using (AirFlow/google.cloud/...). Post rest of your python code.

  2. Have you tried providing logging_config as part of job details? Not sure if this is applicable only to driver (as the name suggests) or both driver and executor.

job_details = {
    'placement': {
        'cluster_name': cluster_name,
    },
    'reference': {
        'job_id': job_id,
    },
    'scheduling': {
        'max_failures_per_hour': 1,
    },
    'labels': labels,
    'spark_job': {
        'args': spark_job_arguments,
        'main_class': SPARK_JOB_CLASSNAME,
        'jar_file_uris': [
            os.path.join(self.dataproc_job_jar_file_prefix, file) for file in jar_files
        ],
        'properties': job_properties,
        'logging_config': {
            'driver_log_levels': {
                'org.apache': 'OFF'
            }
        }
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.