Skip to content

Commit ce016ad

Browse files
committed
refactor(api): extracted common code in utils.py
1 parent c2160ee commit ce016ad

File tree

4 files changed

+84
-96
lines changed

4 files changed

+84
-96
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "cardo-python-utils"
7-
version = "0.5.dev10"
7+
version = "0.5.dev11"
88
description = "Python library enhanced with a wide range of functions for different scenarios."
99
readme = "README.rst"
1010
requires-python = ">=3.8"

python_utils/django/keycloak/api/drf.py

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,19 @@
1-
import jwt
2-
31
from django.conf import settings
4-
from django.contrib.auth import get_user_model
5-
from jwt import PyJWKClient
62
from jwt.exceptions import InvalidTokenError
73

84
from rest_framework import authentication
95
from rest_framework.exceptions import AuthenticationFailed
106
from rest_framework.permissions import BasePermission
117

12-
13-
jwks_client = PyJWKClient(getattr(settings, "JWKS_URL", ""))
8+
from .utils import create_or_update_user, decode_jwt
149

1510

1611
class AuthenticationBackend(authentication.TokenAuthentication):
1712
keyword = "Bearer"
1813

1914
def authenticate_credentials(self, token: str):
20-
signing_key = jwks_client.get_signing_key_from_jwt(token)
21-
2215
try:
23-
payload = jwt.decode(
24-
token,
25-
signing_key.key,
26-
algorithms=["RS256"],
27-
audience=getattr(settings, "JWT_AUDIENCE", None),
28-
)
16+
payload = decode_jwt(token)
2917
except InvalidTokenError as e:
3018
raise AuthenticationFailed(f"Invalid token: {str(e)}") from e
3119

@@ -36,43 +24,9 @@ def authenticate_credentials(self, token: str):
3624
"Invalid token: preferred_username not present."
3725
) from e
3826

39-
user = self._get_user(username, payload)
27+
user = create_or_update_user(username, payload)
4028
return user, payload
4129

42-
def _get_user(self, username: str, payload: dict):
43-
"""
44-
Get or create a user based on the JWT payload.
45-
If the user exists, update their details.
46-
"""
47-
user_model = get_user_model()
48-
user_data = {
49-
"first_name": payload.get("given_name") or "",
50-
"last_name": payload.get("family_name") or "",
51-
"email": payload.get("email") or "",
52-
"is_staff": payload.get("is_staff", False),
53-
}
54-
if hasattr(user_model, "is_demo"):
55-
user_data["is_demo"] = payload.get("is_demo", False)
56-
57-
user = user_model.objects.filter(username=username).first()
58-
if user:
59-
update_needed = False
60-
61-
for field, value in user_data.items():
62-
if getattr(user, field) != value:
63-
setattr(user, field, value)
64-
update_needed = True
65-
66-
if update_needed:
67-
user.save(update_fields=list(user_data.keys()))
68-
69-
return user
70-
else:
71-
return user_model.objects.create(
72-
username=username,
73-
**user_data,
74-
)
75-
7630

7731
class HasScope(BasePermission):
7832
"""

python_utils/django/keycloak/api/ninja.py

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,18 @@
11
from typing import Literal
22

3-
from jwt import PyJWKClient, decode as jwt_decode
43
from jwt.exceptions import InvalidTokenError
54

65
from django.conf import settings
7-
from django.contrib.auth import get_user_model
86
from ninja.security import HttpBearer
97
from ninja.errors import AuthenticationError, HttpError
108

11-
jwks_client = PyJWKClient(getattr(settings, "JWKS_URL", ""))
9+
from .utils import create_or_update_user, decode_jwt
1210

1311

1412
class AuthBearer(HttpBearer):
1513
def authenticate(self, request, token):
16-
signing_key = jwks_client.get_signing_key_from_jwt(token)
17-
1814
try:
19-
payload = jwt_decode(
20-
token,
21-
signing_key.key,
22-
algorithms=["RS256"],
23-
audience=getattr(settings, "JWT_AUDIENCE", None),
24-
)
15+
payload = decode_jwt(token)
2516
except InvalidTokenError as e:
2617
raise AuthenticationError(f"Invalid token: {str(e)}") from e
2718

@@ -32,46 +23,12 @@ def authenticate(self, request, token):
3223
"Invalid token: preferred_username not present."
3324
) from e
3425

35-
user = self._get_user(username, payload)
26+
user = create_or_update_user(username, payload)
3627

3728
self._verify_scopes(request, payload)
3829

3930
return user
4031

41-
def _get_user(self, username: str, payload: dict):
42-
"""
43-
Get or create a user based on the JWT payload.
44-
If the user exists, update their details.
45-
"""
46-
user_model = get_user_model()
47-
user_data = {
48-
"first_name": payload.get("given_name") or "",
49-
"last_name": payload.get("family_name") or "",
50-
"email": payload.get("email") or "",
51-
"is_staff": payload.get("is_staff", False),
52-
}
53-
if hasattr(user_model, "is_demo"):
54-
user_data["is_demo"] = payload.get("is_demo", False)
55-
56-
user = user_model.objects.filter(username=username).first()
57-
if user:
58-
update_needed = False
59-
60-
for field, value in user_data.items():
61-
if getattr(user, field) != value:
62-
setattr(user, field, value)
63-
update_needed = True
64-
65-
if update_needed:
66-
user.save(update_fields=list(user_data.keys()))
67-
68-
return user
69-
else:
70-
return user_model.objects.create(
71-
username=username,
72-
**user_data,
73-
)
74-
7532
def _verify_scopes(self, request, token_payload):
7633
allowed_scopes = self._get_view_allowed_scopes(request)
7734

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from typing import TypedDict
2+
3+
from django.conf import settings
4+
from django.contrib.auth import get_user_model
5+
from jwt import decode, PyJWKClient
6+
7+
jwks_client = PyJWKClient(getattr(settings, "JWKS_URL", ""))
8+
9+
10+
class TokenPayload(TypedDict, total=False):
11+
exp: int
12+
iat: int
13+
jti: str
14+
iss: str
15+
aud: str | list[str]
16+
typ: str
17+
azp: str
18+
sid: str
19+
scope: str
20+
preferred_username: str
21+
given_name: str
22+
family_name: str
23+
email: str
24+
is_staff: bool
25+
is_demo: bool
26+
groups: list[str] # Full path of the user group, e.g. "/group1/subgroup1"
27+
28+
29+
def decode_jwt(token: str) -> TokenPayload:
30+
"""
31+
Decode a JWT token using the public certificate of the Auth Server.
32+
33+
Raises:
34+
jwt.exceptions.InvalidTokenError: If the token is invalid or cannot be decoded.
35+
"""
36+
signing_key = jwks_client.get_signing_key_from_jwt(token)
37+
38+
return decode(
39+
token,
40+
signing_key.key,
41+
algorithms=["RS256"],
42+
audience=getattr(settings, "JWT_AUDIENCE", None),
43+
)
44+
45+
46+
def create_or_update_user(username: str, payload: TokenPayload):
47+
"""
48+
Create or update a user based on the JWT payload.
49+
"""
50+
user_model = get_user_model()
51+
user_data = {
52+
"first_name": payload.get("given_name") or "",
53+
"last_name": payload.get("family_name") or "",
54+
"email": payload.get("email") or "",
55+
"is_staff": payload.get("is_staff", False),
56+
}
57+
if hasattr(user_model, "is_demo"):
58+
user_data["is_demo"] = payload.get("is_demo", False)
59+
60+
user = user_model.objects.filter(username=username).first()
61+
if user:
62+
update_needed = False
63+
64+
for field, value in user_data.items():
65+
if getattr(user, field) != value:
66+
setattr(user, field, value)
67+
update_needed = True
68+
69+
if update_needed:
70+
user.save(update_fields=list(user_data.keys()))
71+
72+
return user
73+
else:
74+
return user_model.objects.create(
75+
username=username,
76+
**user_data,
77+
)

0 commit comments

Comments
 (0)