0

I am doing local development. Wish to run spark job locally on my desktop, and access files in storage account from my spark job.

I don't have an option to use SAS tokens or access-keys for my storage account. Is there a way out.

2 Answers 2

1

I am using Java11:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-azure</artifactId>
    <version>3.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.4.1</version>
</dependency>

Need to implement below for custom token provider:

package com.jp;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.OffsetDateTime;
import java.util.Date;

class CustomToken implements  org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee {
    private Logger log = LoggerFactory.getLogger(getClass());
    private String accountName;
    private volatile AccessToken token;

    @Override
    public void initialize(Configuration configuration, String accountName) {
        log.info("Custom Token to be initialized. Config: " + configuration + ". AccountName: " + accountName);
        this.accountName = accountName;
    }

    @Override
    public String getAccessToken() {
        if (token != null && OffsetDateTime.now().isBefore(token.getExpiresAt().minusHours(2))) {
            return token.getToken();
        } else {
            log.info("token has expired or not been set. " +  token);
            fetchAndSetToken();
            return token.getToken();
        }
    }

    private void fetchAndSetToken() {
        DefaultAzureCredential creds = new DefaultAzureCredentialBuilder()
                .build();
        TokenRequestContext request = new TokenRequestContext();
        request.addScopes("https://" + accountName);
        this.token = creds.getToken(request).block();
        log.info("Token has been set. Expires at: " + token.getExpiresAt() + " . " + token.isExpired());
    }

    @Override
    public Date getExpiryTime() {
        return new Date(token.getExpiresAt().toInstant().toEpochMilli());
    }
}

Below is the spark code

import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ReadFromADLS")
      .master("local[*]")
      .getOrCreate()

    spark.conf.set("fs.azure.account.auth.type", "Custom")
    spark.conf.set("fs.azure.account.oauth.provider.type", "com.jp.CustomToken")

    // Define folder path (ADLS Gen2 syntax)
    val folderPath = s"abfss://<container>@<storage-account>.dfs.core.windows.net/path/to/folder"

    // Read files recursively
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("recursiveFileLookup", "true")
      .csv(folderPath)

    // Preview the data
    df.printSchema()
    df.show(10)
  }

}

Need to use below SparkVM args:

-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
Sign up to request clarification or add additional context in comments.

Comments

0

Use Azurite emulator or a Docker container.

2 Comments

How does docker container help. It still wont be able to access from spark directly (without using key)
Azurite has default keys, you can either use with Docker or install t locally.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.