Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1112,9 +1112,11 @@ public int getNumLeaseThreads() {
}

public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
// we do not support creating the filesystem when AuthType is SAS or UserboundSASWithOAuth
return this.createRemoteFileSystemDuringInitialization
&& this.getAuthType(this.accountName) != AuthType.SAS;
&& this.getAuthType(this.accountName) != AuthType.SAS
&& this.getAuthType(this.accountName)
!= AuthType.UserboundSASWithOAuth;
}

public boolean getSkipUserGroupMetadataDuringInitialization() {
Expand Down Expand Up @@ -1279,7 +1281,7 @@ public boolean shouldTrackLatency() {

public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) {
if (authType == AuthType.OAuth || authType == AuthType.UserboundSASWithOAuth) {
try {
Class<? extends AccessTokenProvider> tokenProviderClass =
getTokenProviderClass(authType,
Expand Down Expand Up @@ -1474,6 +1476,68 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
}
}

/**
* Returns the SASTokenProvider implementation to be used to generate user-bound SAS token.<br>
* Custom implementation of {@link SASTokenProvider} under th config
* "fs.azure.sas.token.provider.type" needs to be provided.<br>
* @return sasTokenProvider object based on configurations provided
* @throws AzureBlobFileSystemException
*/
public SASTokenProvider getUserBoundSASTokenProvider(AuthType authType) throws AzureBlobFileSystemException {

try {
Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
null, SASTokenProvider.class);

if (customSasTokenProviderImplementation == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar check for OAuth also needed right?
For UBS, Cx must configure OAuth as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were getting OAuth token provider separately. Added a single method to get both now

throw new SASTokenProviderException(String.format(
"\"%s\" must be set for user-bound SAS auth type.",
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE));
}

SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
customSasTokenProviderImplementation, rawConfig);
if (sasTokenProvider == null) {
throw new SASTokenProviderException(String.format(
"Failed to initialize %s", customSasTokenProviderImplementation));
}
LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName());
sasTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", customSasTokenProviderImplementation.getName());
return sasTokenProvider;
} catch (SASTokenProviderException e) {
throw e;
} catch (Exception e) {
throw new SASTokenProviderException(
"Unable to load user-bound SAS token provider class: " + e, e);
}
}

/**
* Returns both the AccessTokenProvider and the SASTokenProvider
* when auth type is UserboundSASWithOAuth.
*
* @return Object[] where:
* [0] = AccessTokenProvider
* [1] = SASTokenProvider
* @throws AzureBlobFileSystemException if provider initialization fails
*/
public Object[] getUserBoundSASBothTokenProviders()
throws AzureBlobFileSystemException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
AuthType.SharedKey);
if (authType != AuthType.UserboundSASWithOAuth) {
throw new SASTokenProviderException(String.format(
"Invalid auth type: %s is being used, expecting user-bound SAS.",
authType));
}

AccessTokenProvider tokenProvider = getTokenProvider();
SASTokenProvider sasTokenProvider = getUserBoundSASTokenProvider(authType);
return new Object[]{tokenProvider, sasTokenProvider};
}

public EncryptionContextProvider createEncryptionContextProvider() {
try {
String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,11 +1741,20 @@ private void initializeClient(URI uri, String fileSystemName,
} else if (authType == AuthType.SAS) {
LOG.trace("Fetching SAS Token Provider");
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
} else if (authType == AuthType.UserboundSASWithOAuth) {
LOG.trace("Fetching SAS and OAuth Token Provider for user bound SAS");
AzureADAuthenticator.init(abfsConfiguration);
Object[] providers
= abfsConfiguration.getUserBoundSASBothTokenProviders();
tokenProvider = (AccessTokenProvider) providers[0];
sasTokenProvider = (SASTokenProvider) providers[1];
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
} else {
LOG.trace("Fetching token provider");
tokenProvider = abfsConfiguration.getTokenProvider();
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
abfsConfiguration.getRawConfiguration());
}

// Encryption setup
Expand All @@ -1769,16 +1778,11 @@ private void initializeClient(URI uri, String fileSystemName,
}
}

LOG.trace("Initializing AbfsClient for {}", baseUrl);
if (tokenProvider != null) {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
} else {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
}
LOG.trace("Initializing AbfsClientHandler for {}", baseUrl);
this.clientHandler = new AbfsClientHandler(baseUrl, creds,
abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());

this.setClient(getClientHandler().getClient());
LOG.trace("AbfsClient init complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class AbfsHttpConstants {
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String APPLICATION_XML = "application/xml";
public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";

Expand Down Expand Up @@ -187,7 +188,8 @@ public enum ApiVersion {
DEC_12_2019("2019-12-12"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03"),
NOV_04_2024("2024-11-04");
NOV_04_2024("2024-11-04"),
JUL_05_2025("2025-07-05");

private final String xMsApiVersion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public AbfsBlobClient(final URL baseUrl,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, null,
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
Expand All @@ -201,7 +201,21 @@ public AbfsBlobClient(final URL baseUrl,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
super(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider,
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
}

public AbfsBlobClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, sasTokenProvider,
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,22 +337,24 @@ private AbfsClient(final URL baseUrl,
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}

public AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.tokenProvider = tokenProvider;
}

/**
* Constructs an AbfsClient instance with all authentication and configuration options.
*
* @param baseUrl The base URL for the ABFS endpoint.
* @param sharedKeyCredentials Shared key credentials for authentication.
* @param abfsConfiguration The ABFS configuration.
* @param tokenProvider The access token provider for OAuth authentication.
* @param sasTokenProvider The SAS token provider for SAS authentication.
* @param encryptionContextProvider The encryption context provider.
* @param abfsClientContext The client context
* @param abfsServiceType The ABFS service type (e.g., Blob, DFS).
* @throws IOException if initialization fails.
*/
public AbfsClient(final URL baseUrl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simply combine the 3 constructors to accept both AccessTokenProvider, SASTokenProvider and the caller can set what it has and null as other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, added

final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
Expand All @@ -361,6 +363,7 @@ public AbfsClient(final URL baseUrl,
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.sasTokenProvider = sasTokenProvider;
this.tokenProvider = tokenProvider;
}

@Override
Expand Down Expand Up @@ -1157,7 +1160,7 @@ protected String appendSASTokenToQuery(String path,
String cachedSasToken)
throws SASTokenProviderException {
String sasToken = null;
if (this.authType == AuthType.SAS) {
if (this.authType == AuthType.SAS || this.authType == AuthType.UserboundSASWithOAuth) {
try {
LOG.trace("Fetch SAS token for {} on {}", operation, path);
if (cachedSasToken == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,26 @@ public class AbfsClientHandler implements Closeable {
private final AbfsDfsClient dfsAbfsClient;
private final AbfsBlobClient blobAbfsClient;

public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}

/**
* Constructs an AbfsClientHandler instance.
*
* Initializes the default and ingress service types from the provided configuration,
* then creates both DFS and Blob clients using the given params
*
* @param baseUrl the base URL for the file system.
* @param sharedKeyCredentials credentials for shared key authentication.
* @param abfsConfiguration the ABFS configuration.
* @param tokenProvider the access token provider, may be null.
* @param sasTokenProvider the SAS token provider, may be null.
* @param encryptionContextProvider the encryption context provider
* @param abfsClientContext the ABFS client context.
* @throws IOException if client creation or URL conversion fails.
*/
public AbfsClientHandler(final URL baseUrl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc missing for the constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. We can combine into a single constructor.
But do check if there are any caveats there or a risk of running into NPE

final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
Expand All @@ -73,10 +75,10 @@ public AbfsClientHandler(final URL baseUrl,
// only for default client.
initServiceType(abfsConfiguration);
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}

Expand Down Expand Up @@ -154,7 +156,15 @@ private AbfsDfsClient createDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
if (tokenProvider != null) {
if (tokenProvider != null && sasTokenProvider != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from UDS, can there be a case where caller can send both as not null? Makes ure no flow leads to this and also add a test around it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are initialising both the token providers only for UBS auth type. Added a test for token provider expectations (null or non-null) according to the auth type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these conditions can again be removed by creating a single constructor for AbfsDfsClient and AbfsBlobClient that sends both access token provider and sas token provider (like we did for client handler and parent constructors)
But would it be better to have it this way to separate out the logging here?

LOG.debug(
"Creating AbfsDfsClient with both access token provider and SAS token provider using the URL: {}",
dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
else if (tokenProvider != null) {
LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
Expand Down Expand Up @@ -188,12 +198,21 @@ private AbfsBlobClient createBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
if (tokenProvider != null) {
if (tokenProvider != null && sasTokenProvider != null) {
LOG.debug(
"Creating AbfsBlobClient with both access token provider and SAS token provider using the URL: {}",
blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
tokenProvider, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
else if (tokenProvider != null) {
LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
}
else {
LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
Expand Down
Loading