Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/pulp-glue/+api_spec_quirks.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `api_quirk` decorator has been deprecated in favor of the `api_spec_quirk` decorator.
Quirks are now supposed to work on the raw spec before it is parsed by the `openapi` layer.
1 change: 1 addition & 0 deletions CHANGES/pulp-glue/+auth_recontract.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Breaking change: Reworked the contract around the `AuthProvider` to allow authentication to be coded independently of the underlying library.
211 changes: 127 additions & 84 deletions pulp-glue/pulp_glue/common/authentication.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,140 @@
import typing as t
from datetime import datetime, timedelta

import requests


class OAuth2ClientCredentialsAuth(requests.auth.AuthBase):
"""
This implements the OAuth2 ClientCredentials Grant authentication flow.
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4
class AuthProviderBase:
"""
Base class for auth providers.

def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scopes: list[str] | None = None,
verify_ssl: str | bool | None = None,
):
self._token_server_auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
self._token_url = token_url
self._scopes = scopes
self._verify_ssl = verify_ssl
This abstract base class will analyze the authentication proposals of the openapi specs.
Different authentication schemes can be implemented in subclasses.
"""

self._access_token: str | None = None
self._expire_at: datetime | None = None
def can_complete_http_basic(self) -> bool:
return False

def can_complete_mutualTLS(self) -> bool:
return False

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool:
return False

def can_complete_scheme(self, scheme: dict[str, t.Any], scopes: list[str]) -> bool:
if scheme["type"] == "http":
if scheme["scheme"] == "basic":
return self.can_complete_http_basic()
elif scheme["type"] == "mutualTLS":
return self.can_complete_mutualTLS()
elif scheme["type"] == "oauth2":
for flow_name, flow in scheme["flows"].items():
if flow_name == "clientCredentials" and self.can_complete_oauth2_client_credentials(
flow["scopes"]
):
return True
return False

def can_complete(
self, proposal: dict[str, list[str]], security_schemes: dict[str, dict[str, t.Any]]
) -> bool:
for name, scopes in proposal.items():
scheme = security_schemes.get(name)
if scheme is None or not self.can_complete_scheme(scheme, scopes):
return False
# This covers the case where `[]` allows for no auth at all.
return True

async def auth_success_hook(
self, proposal: dict[str, list[str]], security_schemes: dict[str, dict[str, t.Any]]
) -> None:
pass

async def auth_failure_hook(
self, proposal: dict[str, list[str]], security_schemes: dict[str, dict[str, t.Any]]
) -> None:
pass

async def http_basic_credentials(self) -> tuple[bytes, bytes]:
raise NotImplementedError()

async def oauth2_client_credentials(self) -> tuple[bytes, bytes]:
raise NotImplementedError()

def tls_credentials(self) -> tuple[str, str | None]:
raise NotImplementedError()


class BasicAuthProvider(AuthProviderBase):
"""
AuthProvider providing basic auth with fixed `username`, `password`.
"""

def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
if self._expire_at is None or self._expire_at < datetime.now():
self._retrieve_token()
def __init__(self, username: t.AnyStr, password: t.AnyStr):
super().__init__()
self.username: bytes = username.encode("latin1") if isinstance(username, str) else username
self.password: bytes = password.encode("latin1") if isinstance(password, str) else password

assert self._access_token is not None
def can_complete_http_basic(self) -> bool:
return True

request.headers["Authorization"] = f"Bearer {self._access_token}"
async def http_basic_credentials(self) -> tuple[bytes, bytes]:
return self.username, self.password

# Call to untyped function "register_hook" in typed context
request.register_hook("response", self._handle401) # type: ignore[no-untyped-call]

return request
class GlueAuthProvider(AuthProviderBase):
"""
AuthProvider allowing to be used with prepared credentials.
"""

def _handle401(
def __init__(
self,
response: requests.Response,
**kwargs: t.Any,
) -> requests.Response:
if response.status_code != 401:
return response

# If we get this far, probably the token is not valid anymore.

# Try to reach for a new token once.
self._retrieve_token()

assert self._access_token is not None

# Consume content and release the original connection
# to allow our new request to reuse the same one.
response.content
response.close()
prepared_new_request = response.request.copy()

prepared_new_request.headers["Authorization"] = f"Bearer {self._access_token}"

# Avoid to enter into an infinity loop.
# Call to untyped function "deregister_hook" in typed context
prepared_new_request.deregister_hook( # type: ignore[no-untyped-call]
"response", self._handle401
)

# "Response" has no attribute "connection"
new_response: requests.Response = response.connection.send(prepared_new_request, **kwargs)
new_response.history.append(response)
new_response.request = prepared_new_request

return new_response

def _retrieve_token(self) -> None:
data = {
"grant_type": "client_credentials",
}

if self._scopes:
data["scope"] = " ".join(self._scopes)

response: requests.Response = requests.post(
self._token_url,
data=data,
auth=self._token_server_auth,
verify=self._verify_ssl,
)

response.raise_for_status()

token = response.json()
self._expire_at = datetime.now() + timedelta(seconds=token["expires_in"])
self._access_token = token["access_token"]
*,
username: t.AnyStr | None = None,
password: t.AnyStr | None = None,
client_id: t.AnyStr | None = None,
client_secret: t.AnyStr | None = None,
cert: str | None = None,
key: str | None = None,
):
super().__init__()
self.username: bytes | None = None
self.password: bytes | None = None
self.client_id: bytes | None = None
self.client_secret: bytes | None = None
self.cert: str | None = cert
self.key: str | None = key

if username is not None:
assert password is not None
self.username = username.encode("latin1") if isinstance(username, str) else username
self.password = password.encode("latin1") if isinstance(password, str) else password
if client_id is not None:
assert client_secret is not None
self.client_id = client_id.encode("latin1") if isinstance(client_id, str) else client_id
self.client_secret = (
client_secret.encode("latin1") if isinstance(client_secret, str) else client_secret
)

if cert is None and key is not None:
raise RuntimeError("Key can only be used together with a cert.")

def can_complete_http_basic(self) -> bool:
return self.username is not None

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool:
return self.client_id is not None

def can_complete_mutualTLS(self) -> bool:
return self.cert is not None

async def http_basic_credentials(self) -> tuple[bytes, bytes]:
assert self.username is not None
assert self.password is not None
return self.username, self.password

async def oauth2_client_credentials(self) -> tuple[bytes, bytes]:
assert self.client_id is not None
assert self.client_secret is not None
return self.client_id, self.client_secret

def tls_credentials(self) -> tuple[str, str | None]:
assert self.cert is not None
return (self.cert, self.key)
Loading
Loading