I am doing local development. Wish to run spark job locally on my desktop, and access files in storage account from my spark job.
I don't have an option to use SAS tokens or access-keys for my storage account. Is there a way out.
I am using Java11:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.4.1</version>
</dependency>
Need to implement below for custom token provider:
package com.jp;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.OffsetDateTime;
import java.util.Date;
class CustomToken implements org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee {
private Logger log = LoggerFactory.getLogger(getClass());
private String accountName;
private volatile AccessToken token;
@Override
public void initialize(Configuration configuration, String accountName) {
log.info("Custom Token to be initialized. Config: " + configuration + ". AccountName: " + accountName);
this.accountName = accountName;
}
@Override
public String getAccessToken() {
if (token != null && OffsetDateTime.now().isBefore(token.getExpiresAt().minusHours(2))) {
return token.getToken();
} else {
log.info("token has expired or not been set. " + token);
fetchAndSetToken();
return token.getToken();
}
}
private void fetchAndSetToken() {
DefaultAzureCredential creds = new DefaultAzureCredentialBuilder()
.build();
TokenRequestContext request = new TokenRequestContext();
request.addScopes("https://" + accountName);
this.token = creds.getToken(request).block();
log.info("Token has been set. Expires at: " + token.getExpiresAt() + " . " + token.isExpired());
}
@Override
public Date getExpiryTime() {
return new Date(token.getExpiresAt().toInstant().toEpochMilli());
}
}
Below is the spark code
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ReadFromADLS")
.master("local[*]")
.getOrCreate()
spark.conf.set("fs.azure.account.auth.type", "Custom")
spark.conf.set("fs.azure.account.oauth.provider.type", "com.jp.CustomToken")
// Define folder path (ADLS Gen2 syntax)
val folderPath = s"abfss://<container>@<storage-account>.dfs.core.windows.net/path/to/folder"
// Read files recursively
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("recursiveFileLookup", "true")
.csv(folderPath)
// Preview the data
df.printSchema()
df.show(10)
}
}
Need to use below SparkVM args:
-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED