Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions Java/java-sample-oauth.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
import android.util.Base64
import java.security.MessageDigest
import java.security.SecureRandom
import java.nio.charset.StandardCharsets

SecureRandom sr = new SecureRandom();
byte[] code = new byte[32];
sr.nextBytes(code);
String verifier = android.util.Base64.encodeToString(code, android.util.Base64.URL_SAFE | android.util.Base64.NO_WRAP | android.util.Base64.NO_PADDING);
byte[] bytes = new byte[0];
try {
bytes = verifier.getBytes("US-ASCII");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
MessageDigest md = null;
try {
md = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
/**
* PKCE (RFC 7636) utility – تولید code_verifier و code_challenge به روش S256
* کاملاً امن، بدون استثنای کنترل‌نشده و مطابق با استانداردهای اندروید.
*/
object PkceUtil {

/** طول پیشنهادی ۴۳‑۱۲۸ کاراکتر پس از Base64 → ۶۴ بایت خام ≈ ۸۶ کاراکتر */
private const val VERIFIER_BYTE_LENGTH = 64

/**
* تولید code verifier با آنتروپی بالا
*/
fun generateCodeVerifier(): String {
val randomBytes = ByteArray(VERIFIER_BYTE_LENGTH)
// SecureRandom به‌صورت پیش‌فرض از /dev/urandom یا Windows‑CryptoAPI استفاده می‌کند
SecureRandom().nextBytes(randomBytes)

return Base64.encodeToString(
randomBytes,
Base64.URL_SAFE or Base64.NO_WRAP or Base64.NO_PADDING
)
}

/**
* تبدیل code verifier به code challenge (روش S256)
*/
fun generateCodeChallenge(verifier: String): String {
val sha256 = MessageDigest.getInstance("SHA-256")
val digest = sha256.digest(verifier.toByteArray(StandardCharsets.US_ASCII))

return Base64.encodeToString(
digest,
Base64.URL_SAFE or Base64.NO_WRAP or Base64.NO_PADDING
)
}
}
md.update(bytes, 0, bytes.length);
byte[] digest = md.digest();
String challenge = android.util.Base64.encodeToString(digest, android.util.Base64.URL_SAFE | android.util.Base64.NO_WRAP | android.util.Base64.NO_PADDING);
258 changes: 172 additions & 86 deletions Python/Python-Sample-OAuth.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,128 @@
from django.conf import settings
import os
from __future__ import annotations

import base64
import hashlib
import random
import string
import requests
import json
import secrets
import string
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple

### Sample for client register and oauth2
import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from urllib.parse import quote, urlencode


# --------------------------------------------------------------------------- #
# PayPing OAuth2 / Client Registration Helper
# --------------------------------------------------------------------------- #
@dataclass
class PayPing:
CLIENT_ID = ""
CLIENT_SECRET = ""
PREFIX = ""
REDIRECT_URI = "%s/%s" % (settings.SITE_URL, PREFIX) # define SITE_URL in settings
TOKEN = ""
SCOPES = "openid pay:write profile" #Add your defined scopes

def code_verifier(self):
random = secrets.token_bytes(64)
code_verifier = base64.b64encode(random, b'-_').decode().replace('=', '')
return code_verifier

def _code_challenge(self, code_verifier):
m = hashlib.sha256()
m.update(code_verifier.encode())
d = m.digest()
code_challenge = base64.b64encode(d, b'-_').decode().replace('=', '')
return code_challenge

def generate_username(self, limit=20):
return "hmyn" + "".join(random.choice(string.digits) for i in range(limit))

def check_user_exists(self, email):
true_false = {"true": True, "false": False}
url = "https://oauth.payping.ir/v1/client/EmailExist?Email=%s" % (email)
headers = dict()
headers["Authorization"] = "Bearer %s" % (self.TOKEN)
response = requests.get(url=url, headers=headers)
return true_false.get(response.text, "Err")

def get_register_user_url(
"""
Secure, production-ready helper for PayPing OAuth2 (PKCE) flow
and client-registration API.
"""

# --------------------------------------------------------------------- #
# Configuration – read from Django settings (lazy)
# --------------------------------------------------------------------- #
CLIENT_ID: str = ""
CLIENT_SECRET: str = ""
TOKEN: str = "" # Service-to-service bearer token
PREFIX: str = "payping/callback"
SCOPES: str = "openid pay:write profile"

def __post_init__(self) -> None:
"""Load values from Django settings if they are not provided explicitly."""
self.CLIENT_ID = self.CLIENT_ID or getattr(settings, "PAYPING_CLIENT_ID", "")
self.CLIENT_SECRET = self.CLIENT_SECRET or getattr(settings, "PAYPING_CLIENT_SECRET", "")
self.TOKEN = self.TOKEN or getattr(settings, "PAYPING_SERVICE_TOKEN", "")
self.PREFIX = self.PREFIX or getattr(settings, "PAYPING_REDIRECT_PREFIX", "payping/callback")

if not all([self.CLIENT_ID, self.CLIENT_SECRET, self.TOKEN]):
raise ImproperlyConfigured(
"PAYPING_CLIENT_ID, PAYPING_CLIENT_SECRET and PAYPING_SERVICE_TOKEN must be defined."
)

@property
def REDIRECT_URI(self) -> str:
site_url = getattr(settings, "SITE_URL", None)
if not site_url:
raise ImproperlyConfigured("SITE_URL must be defined in Django settings.")
return f"{site_url.rstrip('/')}/{self.PREFIX.lstrip('/')}"

# --------------------------------------------------------------------- #
# PKCE helpers
# --------------------------------------------------------------------- #
@staticmethod
def generate_code_verifier() -> str:
token = secrets.token_bytes(64)
return base64.urlsafe_b64encode(token).rstrip(b"=").decode("utf-8")

@staticmethod
def generate_code_challenge(verifier: str) -> str:
digest = hashlib.sha256(verifier.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("utf-8")

# --------------------------------------------------------------------- #
# Secure username generation
# --------------------------------------------------------------------- #
@staticmethod
def generate_username(prefix: str = "hmyn", length: int = 20) -> str:
alphabet = string.ascii_letters + string.digits
rnd = secrets.SystemRandom()
return prefix + "".join(rnd.choice(alphabet) for _ in range(length))

# --------------------------------------------------------------------- #
# Centralised request handler
# --------------------------------------------------------------------- #
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
timeout = kwargs.pop("timeout", 15)
response = requests.request(method, url, timeout=timeout, **kwargs)

try:
response.raise_for_status()
except requests.HTTPError as exc:
# Try to extract a meaningful error message from PayPing
try:
error_detail = response.json()
except Exception:
error_detail = response.text or "No details"
raise requests.HTTPError(
f"PayPing API error {response.status_code}: {error_detail}"
) from exc

return response

# --------------------------------------------------------------------- #
# Check if e-mail already exists
# --------------------------------------------------------------------- #
def check_user_exists(self, email: str) -> bool:
url = f"https://oauth.payping.ir/v1/client/EmailExist?Email={quote(email)}"
headers = {"Authorization": f"Bearer {self.TOKEN}"}
resp = self._request("GET", url, headers=headers)
return resp.text.strip().lower() == "true"

# --------------------------------------------------------------------- #
# Initiate client registration (returns URL for user)
# --------------------------------------------------------------------- #
def initiate_client_registration(
self,
return_url,
email,
sheba,
username=None,
first_name=None,
last_name=None,
phone_number=None,
national_code=None,
birth_day=None,
):

api = "https://oauth.payping.ir/v1/client/ClientRegisterInit"
url = "https://oauth.payping.ir/Client/ClientRegister?registerId={uuid}"
headers = dict()
headers["Authorization"] = "Bearer %s" % (self.TOKEN)
headers["Content-Type"] = "application/json"
if username is None:
return_url: str,
email: str,
sheba: str,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
phone_number: Optional[str] = None,
national_code: Optional[str] = None,
birth_day: Optional[str] = None, # YYYY-MM-DD
) -> str:
if not username:
username = self.generate_username()
data = {

payload = {
"UserName": username,
"Email": email,
"FirstName": first_name,
Expand All @@ -73,28 +134,40 @@ def get_register_user_url(
"ClientId": self.CLIENT_ID,
"Sheba": sheba,
}
data = json.dumps(data)
resp = requests.post(url=api, data=data, headers=headers)
uuid = resp.text
uuid = uuid.replace('"', "")
return url.format(uuid=uuid)

def get_access_url(self, verifier, unique_code):
challenge = self._code_challenge(verifier)
url = "https://oauth.payping.ir/connect/authorize?"
url += "scope=%s&" % (SCOPES)
url += "response_type=code&"
url += "client_id=%s&" % (self.CLIENT_ID)
url += "code_challenge=%s&" % (challenge)
url += "code_challenge_method=S256&"
url += "redirect_uri=%s&" % (self.REDIRECT_URI)
url += "state=%s" % (unique_code)
return url

def get_access_token(self, code_verifier, code):
# Remove None values – PayPing rejects them
payload = {k: v for k, v in payload.items() if v is not None}

resp = self._request(
"POST",
"https://oauth.payping.ir/v1/client/ClientRegisterInit",
json=payload,
headers={"Authorization": f"Bearer {self.TOKEN}"},
)
register_id = resp.text.strip().strip('"')
return f"https://oauth.payping.ir/Client/ClientRegister?registerId={register_id}"

# --------------------------------------------------------------------- #
# Build authorization URL (PKCE)
# --------------------------------------------------------------------- #
def get_authorization_url(self, code_verifier: str, state: str) -> str:
code_challenge = self.generate_code_challenge(code_verifier)

params = {
"scope": self.SCOPES,
"response_type": "code",
"client_id": self.CLIENT_ID,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"redirect_uri": self.REDIRECT_URI,
"state": state,
}
return "https://oauth.payping.ir/connect/authorize?" + urlencode(params, quote_via=quote)

# --------------------------------------------------------------------- #
# Exchange authorization code for access token
# --------------------------------------------------------------------- #
def exchange_code_for_token(self, code_verifier: str, code: str) -> Tuple[str, int]:
url = "https://oauth.payping.ir/connect/token"
headers = dict()
headers["Content-Type"] = "application/x-www-form-urlencoded"
data = {
"grant_type": "authorization_code",
"client_id": self.CLIENT_ID,
Expand All @@ -103,15 +176,28 @@ def get_access_token(self, code_verifier, code):
"code": code,
"redirect_uri": self.REDIRECT_URI,
}
response = requests.post(url=url, data=data, headers=headers)
response = json.loads(response.text)
access_token = response.get("access_token", "Err")
expires_in = response.get("expires_in", "Err")
headers = {"Content-Type": "application/x-www-form-urlencoded"}

resp = self._request("POST", url, data=data, headers=headers)

try:
token_data = resp.json()
except json.JSONDecodeError as exc:
raise ValueError("PayPing token endpoint returned invalid JSON") from exc

access_token = token_data.get("access_token")
expires_in = int(token_data.get("expires_in", 0))

if not access_token:
raise ValueError("Access token missing in PayPing response")

return access_token, expires_in

def get_username(self, access_token):
api = "https://oauth.payping.ir/connect/userinfo"
headers = {"Authorization": "Bearer %s" % (access_token)}
response = requests.get(url=api, headers=headers)
response = json.loads(response.text)
return response.get("username", "Err")
# --------------------------------------------------------------------- #
# Fetch user information
# --------------------------------------------------------------------- #
def get_userinfo(self, access_token: str) -> Dict[str, Any]:
url = "https://oauth.payping.ir/connect/userinfo"
headers = {"Authorization": f"Bearer {access_token}"}
resp = self._request("GET", url, headers=headers)
return resp.json()