diff --git a/Java/java-sample-oauth.java b/Java/java-sample-oauth.java index 96236e2..17be788 100644 --- a/Java/java-sample-oauth.java +++ b/Java/java-sample-oauth.java @@ -1,20 +1,41 @@ +import android.util.Base64 +import java.security.MessageDigest +import java.security.SecureRandom +import java.nio.charset.StandardCharsets -SecureRandom sr = new SecureRandom(); -byte[] code = new byte[32]; -sr.nextBytes(code); -String verifier = android.util.Base64.encodeToString(code, android.util.Base64.URL_SAFE | android.util.Base64.NO_WRAP | android.util.Base64.NO_PADDING); -byte[] bytes = new byte[0]; -try { - bytes = verifier.getBytes("US-ASCII"); -} catch (UnsupportedEncodingException e) { - e.printStackTrace(); -} -MessageDigest md = null; -try { - md = MessageDigest.getInstance("SHA-256"); -} catch (NoSuchAlgorithmException e) { - e.printStackTrace(); +/** + * PKCE (RFC 7636) utility – تولید code_verifier و code_challenge به روش S256 + * کاملاً امن، بدون استثنای کنترل‌نشده و مطابق با استانداردهای اندروید. + */ +object PkceUtil { + + /** طول پیشنهادی ۴۳‑۱۲۸ کاراکتر پس از Base64 → ۶۴ بایت خام ≈ ۸۶ کاراکتر */ + private const val VERIFIER_BYTE_LENGTH = 64 + + /** + * تولید code verifier با آنتروپی بالا + */ + fun generateCodeVerifier(): String { + val randomBytes = ByteArray(VERIFIER_BYTE_LENGTH) + // SecureRandom به‌صورت پیش‌فرض از /dev/urandom یا Windows‑CryptoAPI استفاده می‌کند + SecureRandom().nextBytes(randomBytes) + + return Base64.encodeToString( + randomBytes, + Base64.URL_SAFE or Base64.NO_WRAP or Base64.NO_PADDING + ) + } + + /** + * تبدیل code verifier به code challenge (روش S256) + */ + fun generateCodeChallenge(verifier: String): String { + val sha256 = MessageDigest.getInstance("SHA-256") + val digest = sha256.digest(verifier.toByteArray(StandardCharsets.US_ASCII)) + + return Base64.encodeToString( + digest, + Base64.URL_SAFE or Base64.NO_WRAP or Base64.NO_PADDING + ) + } } -md.update(bytes, 0, bytes.length); -byte[] digest = md.digest(); -String challenge = android.util.Base64.encodeToString(digest, android.util.Base64.URL_SAFE | android.util.Base64.NO_WRAP | android.util.Base64.NO_PADDING); diff --git a/Python/Python-Sample-OAuth.py b/Python/Python-Sample-OAuth.py index 5d388b8..7513f8c 100644 --- a/Python/Python-Sample-OAuth.py +++ b/Python/Python-Sample-OAuth.py @@ -1,67 +1,128 @@ -from django.conf import settings -import os +from __future__ import annotations + import base64 import hashlib -import random -import string -import requests import json import secrets +import string +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple -### Sample for client register and oauth2 +import requests +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from urllib.parse import quote, urlencode + +# --------------------------------------------------------------------------- # +# PayPing OAuth2 / Client Registration Helper +# --------------------------------------------------------------------------- # +@dataclass class PayPing: - CLIENT_ID = "" - CLIENT_SECRET = "" - PREFIX = "" - REDIRECT_URI = "%s/%s" % (settings.SITE_URL, PREFIX) # define SITE_URL in settings - TOKEN = "" - SCOPES = "openid pay:write profile" #Add your defined scopes - - def code_verifier(self): - random = secrets.token_bytes(64) - code_verifier = base64.b64encode(random, b'-_').decode().replace('=', '') - return code_verifier - - def _code_challenge(self, code_verifier): - m = hashlib.sha256() - m.update(code_verifier.encode()) - d = m.digest() - code_challenge = base64.b64encode(d, b'-_').decode().replace('=', '') - return code_challenge - - def generate_username(self, limit=20): - return "hmyn" + "".join(random.choice(string.digits) for i in range(limit)) - - def check_user_exists(self, email): - true_false = {"true": True, "false": False} - url = "https://oauth.payping.ir/v1/client/EmailExist?Email=%s" % (email) - headers = dict() - headers["Authorization"] = "Bearer %s" % (self.TOKEN) - response = requests.get(url=url, headers=headers) - return true_false.get(response.text, "Err") - - def get_register_user_url( + """ + Secure, production-ready helper for PayPing OAuth2 (PKCE) flow + and client-registration API. + """ + + # --------------------------------------------------------------------- # + # Configuration – read from Django settings (lazy) + # --------------------------------------------------------------------- # + CLIENT_ID: str = "" + CLIENT_SECRET: str = "" + TOKEN: str = "" # Service-to-service bearer token + PREFIX: str = "payping/callback" + SCOPES: str = "openid pay:write profile" + + def __post_init__(self) -> None: + """Load values from Django settings if they are not provided explicitly.""" + self.CLIENT_ID = self.CLIENT_ID or getattr(settings, "PAYPING_CLIENT_ID", "") + self.CLIENT_SECRET = self.CLIENT_SECRET or getattr(settings, "PAYPING_CLIENT_SECRET", "") + self.TOKEN = self.TOKEN or getattr(settings, "PAYPING_SERVICE_TOKEN", "") + self.PREFIX = self.PREFIX or getattr(settings, "PAYPING_REDIRECT_PREFIX", "payping/callback") + + if not all([self.CLIENT_ID, self.CLIENT_SECRET, self.TOKEN]): + raise ImproperlyConfigured( + "PAYPING_CLIENT_ID, PAYPING_CLIENT_SECRET and PAYPING_SERVICE_TOKEN must be defined." + ) + + @property + def REDIRECT_URI(self) -> str: + site_url = getattr(settings, "SITE_URL", None) + if not site_url: + raise ImproperlyConfigured("SITE_URL must be defined in Django settings.") + return f"{site_url.rstrip('/')}/{self.PREFIX.lstrip('/')}" + + # --------------------------------------------------------------------- # + # PKCE helpers + # --------------------------------------------------------------------- # + @staticmethod + def generate_code_verifier() -> str: + token = secrets.token_bytes(64) + return base64.urlsafe_b64encode(token).rstrip(b"=").decode("utf-8") + + @staticmethod + def generate_code_challenge(verifier: str) -> str: + digest = hashlib.sha256(verifier.encode("utf-8")).digest() + return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("utf-8") + + # --------------------------------------------------------------------- # + # Secure username generation + # --------------------------------------------------------------------- # + @staticmethod + def generate_username(prefix: str = "hmyn", length: int = 20) -> str: + alphabet = string.ascii_letters + string.digits + rnd = secrets.SystemRandom() + return prefix + "".join(rnd.choice(alphabet) for _ in range(length)) + + # --------------------------------------------------------------------- # + # Centralised request handler + # --------------------------------------------------------------------- # + def _request(self, method: str, url: str, **kwargs) -> requests.Response: + timeout = kwargs.pop("timeout", 15) + response = requests.request(method, url, timeout=timeout, **kwargs) + + try: + response.raise_for_status() + except requests.HTTPError as exc: + # Try to extract a meaningful error message from PayPing + try: + error_detail = response.json() + except Exception: + error_detail = response.text or "No details" + raise requests.HTTPError( + f"PayPing API error {response.status_code}: {error_detail}" + ) from exc + + return response + + # --------------------------------------------------------------------- # + # Check if e-mail already exists + # --------------------------------------------------------------------- # + def check_user_exists(self, email: str) -> bool: + url = f"https://oauth.payping.ir/v1/client/EmailExist?Email={quote(email)}" + headers = {"Authorization": f"Bearer {self.TOKEN}"} + resp = self._request("GET", url, headers=headers) + return resp.text.strip().lower() == "true" + + # --------------------------------------------------------------------- # + # Initiate client registration (returns URL for user) + # --------------------------------------------------------------------- # + def initiate_client_registration( self, - return_url, - email, - sheba, - username=None, - first_name=None, - last_name=None, - phone_number=None, - national_code=None, - birth_day=None, - ): - - api = "https://oauth.payping.ir/v1/client/ClientRegisterInit" - url = "https://oauth.payping.ir/Client/ClientRegister?registerId={uuid}" - headers = dict() - headers["Authorization"] = "Bearer %s" % (self.TOKEN) - headers["Content-Type"] = "application/json" - if username is None: + return_url: str, + email: str, + sheba: str, + username: Optional[str] = None, + first_name: Optional[str] = None, + last_name: Optional[str] = None, + phone_number: Optional[str] = None, + national_code: Optional[str] = None, + birth_day: Optional[str] = None, # YYYY-MM-DD + ) -> str: + if not username: username = self.generate_username() - data = { + + payload = { "UserName": username, "Email": email, "FirstName": first_name, @@ -73,28 +134,40 @@ def get_register_user_url( "ClientId": self.CLIENT_ID, "Sheba": sheba, } - data = json.dumps(data) - resp = requests.post(url=api, data=data, headers=headers) - uuid = resp.text - uuid = uuid.replace('"', "") - return url.format(uuid=uuid) - - def get_access_url(self, verifier, unique_code): - challenge = self._code_challenge(verifier) - url = "https://oauth.payping.ir/connect/authorize?" - url += "scope=%s&" % (SCOPES) - url += "response_type=code&" - url += "client_id=%s&" % (self.CLIENT_ID) - url += "code_challenge=%s&" % (challenge) - url += "code_challenge_method=S256&" - url += "redirect_uri=%s&" % (self.REDIRECT_URI) - url += "state=%s" % (unique_code) - return url - - def get_access_token(self, code_verifier, code): + # Remove None values – PayPing rejects them + payload = {k: v for k, v in payload.items() if v is not None} + + resp = self._request( + "POST", + "https://oauth.payping.ir/v1/client/ClientRegisterInit", + json=payload, + headers={"Authorization": f"Bearer {self.TOKEN}"}, + ) + register_id = resp.text.strip().strip('"') + return f"https://oauth.payping.ir/Client/ClientRegister?registerId={register_id}" + + # --------------------------------------------------------------------- # + # Build authorization URL (PKCE) + # --------------------------------------------------------------------- # + def get_authorization_url(self, code_verifier: str, state: str) -> str: + code_challenge = self.generate_code_challenge(code_verifier) + + params = { + "scope": self.SCOPES, + "response_type": "code", + "client_id": self.CLIENT_ID, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "redirect_uri": self.REDIRECT_URI, + "state": state, + } + return "https://oauth.payping.ir/connect/authorize?" + urlencode(params, quote_via=quote) + + # --------------------------------------------------------------------- # + # Exchange authorization code for access token + # --------------------------------------------------------------------- # + def exchange_code_for_token(self, code_verifier: str, code: str) -> Tuple[str, int]: url = "https://oauth.payping.ir/connect/token" - headers = dict() - headers["Content-Type"] = "application/x-www-form-urlencoded" data = { "grant_type": "authorization_code", "client_id": self.CLIENT_ID, @@ -103,15 +176,28 @@ def get_access_token(self, code_verifier, code): "code": code, "redirect_uri": self.REDIRECT_URI, } - response = requests.post(url=url, data=data, headers=headers) - response = json.loads(response.text) - access_token = response.get("access_token", "Err") - expires_in = response.get("expires_in", "Err") + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + resp = self._request("POST", url, data=data, headers=headers) + + try: + token_data = resp.json() + except json.JSONDecodeError as exc: + raise ValueError("PayPing token endpoint returned invalid JSON") from exc + + access_token = token_data.get("access_token") + expires_in = int(token_data.get("expires_in", 0)) + + if not access_token: + raise ValueError("Access token missing in PayPing response") + return access_token, expires_in - def get_username(self, access_token): - api = "https://oauth.payping.ir/connect/userinfo" - headers = {"Authorization": "Bearer %s" % (access_token)} - response = requests.get(url=api, headers=headers) - response = json.loads(response.text) - return response.get("username", "Err") + # --------------------------------------------------------------------- # + # Fetch user information + # --------------------------------------------------------------------- # + def get_userinfo(self, access_token: str) -> Dict[str, Any]: + url = "https://oauth.payping.ir/connect/userinfo" + headers = {"Authorization": f"Bearer {access_token}"} + resp = self._request("GET", url, headers=headers) + return resp.json()