1

I am a relatively new user to Python and Airflow and am having a very difficult time getting spark-submit to run in an Airflow task. My goal is to get the following DAG task to run successfully

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'matthew',
    'start_date': datetime(2019, 7, 8)
}

dag = DAG('CustomCreate_test2',
          default_args=default_args,
          schedule_interval=timedelta(days=1))

t3 = BashOperator(
    task_id='run_test',
    bash_command='spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar',
    dag=dag
)

I know the problem lies with Airflow and not with the bash because when I run the command spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar in the terminal it runs successfully.

I have been getting the following error from the Airflow logs

...
[2019-08-28 15:55:34,750] {bash_operator.py:132} INFO - Command exited with return code 1
[2019-08-28 15:55:34,764] {taskinstance.py:1047} ERROR - Bash command failed
Traceback (most recent call last):
  File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 136, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
...

I have also tried working with the SparkSubmitOperator(...) but have had no successful runs using it, I have only ever ended up with error logs like the following

...
[2019-08-28 15:54:49,749] {logging_mixin.py:95} INFO - [[34m2019-08-28 15:54:49,749[0m] {[34mspark_submit_hook.py:[0m427} INFO[0m - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)[0m
[2019-08-28 15:54:49,803] {taskinstance.py:1047} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--num-executors', '2', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'CustomCreate', '--class', 'CLASSPATH.CustomCreate', '--verbose', '--queue', 'root.default', '--deploy-mode', 'cluster', '~/IdeaProjects/custom-create-job/build/libs/custom-create.jar']. Error code is: 1.
...

Is there something I have to do using SparkSubmitOperator(...) before I can run the spark-submit ... command in a BashOperator(...) task?

Is there a way to run my spark-submit command directly from the SparkSubmitOperator(...) task?

Is there anything that I have to do to spark_default in the Admin->Connections page of Airflow?

Is there anything that must be set in the Admin->Users page of Airflow? Is there anything that must be set to allow Airflow to run spark or run a jar file created by a specific user? If so, what/how?

1
  • While i can't predict the problem with your set-up (sounds like an issue in environment variable / binaries discoverability on shells like bash / zsh), if you need workarounds, see this Commented Aug 29, 2019 at 4:17

3 Answers 3

4

I found a workaround that solved this problem.

Create a new ssh connection (or edit the default) like the one below in the Airflow Admin->Connection page Airflow SSH Connection Example

Below is a text version if you cannot see the image
Conn ID: ssh_connection
Conn Type: SSH
Host: HOST IP ADDRESS
Username: HOST USERNAME
Password: HOST PASSWORD
Port:
Extra: {"key_file": "/PATH TO HOME DIR/airflow/.ssh/id_rsa", "allow_host_key_change": "true", "no_host_key_check": "true"}

Then make the proper adjustments to your python script

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'matthew',
    'start_date': datetime(2019, 8, 28)
}

dag = DAG('custom-create',
          default_args=default_args,
          schedule_interval=timedelta(days=1),
          params={'project_source': '~/IdeaProjects/custom-create-job',
                  'spark_submit': '/usr/local/bin/spark-submit',
                  'classpath': 'CLASSPATH.CustomCreate',
                  'jar_file': 'build/libs/custom-create.jar'}
          )

templated_bash_command = """
    echo 'HOSTNAME: $HOSTNAME' #To check that you are properly connected to the host
    cd {{ params.project_source }}
    {{ params.spark_submit }} --class {{ classpath }} {{ jar_file }}
"""

t1 = SSHOperator(
    task_id="SSH_task",
    ssh_conn_id='ssh_connection',
    command=templated_bash_command,
    dag=dag
)

I hope this solution helps other people who may be running into a similar problem like I was.

Sign up to request clarification or add additional context in comments.

Comments

0

A similar question has already been answered - StackOverFlow Link

I guess the above link will help you.

In the future if you would like to implement the same thing on AWS EMR Or AZURE, then there you have an a beautiful way to schedule spark jobs - Airflow Documentation

An Example for the above - (AWS EMR)

 <airflow_EMR_task> =cover_open(json.load(open(airflow_home+'/<tasks_json_containing_all_spark_configurations>')))
 <airflow_EMR_task>['Job']['Name'] =  <airflow_EMR_task>['Job']['Name'] + <'optional_postfix'>
airflow_swperformance_cpu_creator = EmrRunJobFlowOperator(
    task_id='<task_id>',
    job_flow_overrides= <airflow_EMR_task>['Job'],
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    retries=1,
    dag=dag
)

And A Simple JSON would be - (the same json file as mentioned above )

{
    "Job": {
        "Name": "<task_name>",
        "LogUri": "<task_log_uri>",
        "ReleaseLabel": "emr-5.6.0",
        "Applications": [
            {
                "Name": "Spark"
            },
            {
                "Name": "Hive"
            }
        ],
        "Tags": [
            {
                "Key" : "<any_tag>",
                "Value" : "<any_tag>"
            },
            {
                "Key" : "<any tag>",
                "Value": "<any_tag>"
            },
            {
                "Key" : "<any_tag>",
                "Value": "<any_tag value>"
            }
        ],
        "JobFlowRole": "EMR_EC2_DefaultRole_Stable",
        "ServiceRole": "EMR_DefaultRole",
        "VisibleToAllUsers": true,
        "Configurations": [
            {
                "Classification": "spark-defaults",
                "Properties": {
                    "spark.driver.extraJavaOptions":"-XX:+UseParallelGC -XX:+UseParallelOldGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:+ExitOnOutOfMemoryError -Dlog4j.configuration=log4j-custom.properties",
                    "spark.executor.extraJavaOptions":"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:+ExitOnOutOfMemoryError -Dlog4j.configuration=log4j-custom.properties",
                    "spark.scheduler.mode": "FAIR",
                    "spark.eventLog.enabled": "true",
                    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                    "spark.sql.orc.filterPushdown": "true",
                    "spark.dynamicAllocation.enabled": "false"
                },
                "Configurations": []
            },
            {
                "Classification": "spark",
                "Properties": {
                    "maximizeResourceAllocation": "true"
                },
                "Configurations": []
            },
            {
                "Classification": "hive-site",
                "Properties": {
                    "javax.jdo.option.ConnectionUserName": "<HIVE USERNAME IF ANY>",
                    "javax.jdo.option.ConnectionPassword": "<<hive_connection_password>>",
                    "javax.jdo.option.ConnectionURL": "<Hive_URL_IF_ANY"
                },
                "Configurations": []
            },
            {
                "Classification": "emrfs-site",
                "Properties": {
                    "fs.s3.serverSideEncryption.kms.keyId": "<<encryption_key>>",
                    "fs.s3.enableServerSideEncryption": "true"
                },
                "Configurations": []
            },
            {
                "Classification":"spark-env",
                "Configurations":[{
                    "Classification":"export",
                    "Configurations":[],
                    "Properties": {
                        "ANY_ENV_VARIABLE_REQUIRED_FOR_SPECIFIC_JOB",
                        "ANY_ENV_VARIABLE_REQUIRED_FOR_SPECIFIC_JOB",
                        "ANY_ENV_VARIABLE_REQUIRED_FOR_SPECIFIC_JOB",
                        "ANY_ENV_VARIABLE_REQUIRED_FOR_SPECIFIC_JOB",
                        "ANY_ENV_VARIABLE_REQUIRED_FOR_SPECIFIC_JOB"
            "S3_BUCKET_NAME":"<S3_bucekt_naem_if_Required>"
                    }
                }
                ]}
        ],
        "Instances": {
            "Ec2KeyName": "<ssh_key>",
            "KeepJobFlowAliveWhenNoSteps": false,
            "Ec2SubnetId": "<subnet>",
            "EmrManagedSlaveSecurityGroup": "<security_group>",
            "EmrManagedMasterSecurityGroup": "<security_group_parameter>",
            "AdditionalSlaveSecurityGroups": [
                "<self_explanatory>"
            ],
            "AdditionalMasterSecurityGroups": [
                "<self_explanatory>"
            ],
            "InstanceGroups": [
                {
                    "InstanceCount": 4,
                    "InstanceRole": "CORE",
                    "InstanceType": "r3.xlarge",
                    "Name": "Core instance group - 2"
                },
                {
                    "InstanceCount": 1,
                    "InstanceRole": "MASTER",
                    "InstanceType": "r3.xlarge",
                    "Name": "Master instance group - 1"
                }
            ]
        },
        "BootstrapActions": [],
        "Steps": [
            {
                "Name": "download-dependencies",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "aws",
                        "s3",
                        "cp",
                        "<appropriate_s3_location>",
                        "/home/hadoop",
                        "--recursive"
                    ],
                    "Properties": []
                },
                "ActionOnFailure": "TERMINATE_CLUSTER"
            },
            {
                "Name": "run-script",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "sudo",
                        "/bin/sh",
                        "/home/hadoop/pre-executor.sh"
                    ],
                    "Properties": []
                },
                "ActionOnFailure": "TERMINATE_CLUSTER"
            },
            {
                "Name": "spark-submit",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "spark-submit",
                        "/home/hadoop/analytics-job.jar",
            "--run-gold-job-only"
                    ],
                    "Properties": []
                },
                "ActionOnFailure": "TERMINATE_CLUSTER"
            }
        ]
    }
}

And that will be all.

Comments

0

Not sure if still relevant, but I solved a similar problem simply by using the absolute path to the file (full path without the '~'). Not sure why, maybe airflow consider 'home' as another place.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.