From e3baac3fca790decfb0c291ea02960f1fee1f91f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Jun 2025 13:17:38 +0000 Subject: [PATCH 1/3] Initial plan for issue From 10c57beaad066f769bd1c231c1c7fee583497272 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Jun 2025 13:33:31 +0000 Subject: [PATCH 2/3] Add comprehensive testing suite for all components Co-authored-by: batonac <4996285+batonac@users.noreply.github.com> --- .../test_jwt_auth_settings.py | 114 +++- tests/README.md | 247 ++++++++ tests/__init__.py | 1 + tests/test_auth.py | 532 ++++++++++++++++++ tests/test_cloudflare_simulation.py | 422 ++++++++++++++ tests/test_hooks.py | 358 ++++++++++++ tests/test_providers.py | 321 +++++++++++ tests/test_utils.py | 356 ++++++++++++ 8 files changed, 2346 insertions(+), 5 deletions(-) create mode 100644 tests/README.md create mode 100644 tests/__init__.py create mode 100644 tests/test_auth.py create mode 100644 tests/test_cloudflare_simulation.py create mode 100644 tests/test_hooks.py create mode 100644 tests/test_providers.py create mode 100644 tests/test_utils.py diff --git a/jwt_auth/jwt_auth/doctype/jwt_auth_settings/test_jwt_auth_settings.py b/jwt_auth/jwt_auth/doctype/jwt_auth_settings/test_jwt_auth_settings.py index 2feb39d..87b341a 100644 --- a/jwt_auth/jwt_auth/doctype/jwt_auth_settings/test_jwt_auth_settings.py +++ b/jwt_auth/jwt_auth/doctype/jwt_auth_settings/test_jwt_auth_settings.py @@ -1,8 +1,9 @@ # Copyright (c) 2024, Avunu LLC and Contributors # See license.txt -# import frappe +import frappe from frappe.tests import IntegrationTestCase, UnitTestCase +from jwt_auth.jwt_auth.doctype.jwt_auth_settings.jwt_auth_settings import JWTAuthSettings # On IntegrationTestCase, the doctype test records and all @@ -12,19 +13,122 @@ IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"] -class TestJWTAuthSettings(UnitTestCase): +class TestJWTAuthSettingsUnit(UnitTestCase): """ Unit tests for JWTAuthSettings. Use this class for testing individual functions and methods. """ - pass + def test_doctype_creation(self): + """Test that JWT Auth Settings doctype can be created.""" + settings = frappe.get_doc("JWT Auth Settings") + self.assertIsInstance(settings, JWTAuthSettings) + def test_default_values(self): + """Test default values are set correctly.""" + settings = frappe.get_doc("JWT Auth Settings") + # Test default values match the DocType definition + self.assertEqual(settings.enabled, 1) + self.assertEqual(settings.enable_user_reg, 1) + self.assertEqual(settings.enable_login, 1) -class TestJWTAuthSettings(IntegrationTestCase): + def test_mandatory_fields_validation(self): + """Test that mandatory fields are validated when enabled.""" + settings = frappe.get_doc("JWT Auth Settings") + settings.enabled = 1 + + # These fields should be mandatory when enabled=1 + mandatory_fields = ['jwt_header', 'jwks_url', 'jwt_private_secret'] + + for field in mandatory_fields: + with self.subTest(field=field): + # Clear the field and expect validation error + setattr(settings, field, None) + with self.assertRaises(frappe.ValidationError): + settings.validate() + + def test_enable_login_dependencies(self): + """Test that login URL and redirect param are mandatory when enable_login is set.""" + settings = frappe.get_doc("JWT Auth Settings") + settings.enable_login = 1 + + # These fields should be mandatory when enable_login=1 + mandatory_fields = ['login_url', 'redirect_param'] + + for field in mandatory_fields: + with self.subTest(field=field): + setattr(settings, field, None) + with self.assertRaises(frappe.ValidationError): + settings.validate() + + +class TestJWTAuthSettingsIntegration(IntegrationTestCase): """ Integration tests for JWTAuthSettings. Use this class for testing interactions between multiple components. """ - pass + def setUp(self): + """Set up test environment.""" + # Create test settings + self.test_settings = { + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'test-secret', + 'login_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/login/test', + 'logout_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url' + } + + def test_settings_persistence(self): + """Test that settings can be saved and retrieved.""" + # Get or create the single doctype + try: + settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + settings = frappe.new_doc("JWT Auth Settings") + + # Update with test values + settings.update(self.test_settings) + settings.save() + + # Retrieve and verify + saved_settings = frappe.get_doc("JWT Auth Settings") + for key, value in self.test_settings.items(): + if key != 'jwt_private_secret': # Password field handled differently + self.assertEqual(getattr(saved_settings, key), value) + + def test_password_field_handling(self): + """Test that password fields are handled correctly.""" + try: + settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + settings = frappe.new_doc("JWT Auth Settings") + + settings.update(self.test_settings) + settings.save() + + # Test password retrieval + password = settings.get_password('jwt_private_secret') + self.assertEqual(password, 'test-secret') + + def tearDown(self): + """Clean up after tests.""" + # Reset settings to defaults + try: + settings = frappe.get_doc("JWT Auth Settings") + settings.enabled = 0 + settings.enable_user_reg = 0 + settings.enable_login = 0 + settings.jwt_header = None + settings.jwks_url = None + settings.jwt_private_secret = None + settings.login_url = None + settings.logout_url = None + settings.redirect_param = None + settings.save() + except frappe.DoesNotExistError: + pass diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..6271284 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,247 @@ +# JWT Auth Testing Suite + +This directory contains comprehensive tests for the JWT Auth Frappe app, covering all doctypes, functions, and API interactions. + +## Test Structure + +### 1. Doctype Tests +- `jwt_auth/jwt_auth/doctype/jwt_auth_settings/test_jwt_auth_settings.py` + - Unit tests for JWT Auth Settings doctype + - Integration tests for settings persistence and validation + - Tests for mandatory field validation and default values + +### 2. Core Functionality Tests +- `tests/test_auth.py` + - Unit tests for `JWTAuth` class methods + - Unit tests for `SessionJWTAuth` class + - Unit tests for utility functions (jwt_logout, on_logout, web_logout, etc.) + - Integration tests for complete authentication flow + +### 3. Provider Tests +- `tests/test_providers.py` + - Unit tests for `BaseProvider` class + - Unit tests for `CloudflareAccessProvider` class + - Integration tests for provider functionality with real settings + - Tests for URL generation and public key retrieval + +### 4. Hooks Tests +- `tests/test_hooks.py` + - Unit tests for contact hooks (`on_update` function) + - Unit tests for error page hooks (`JWTAuthErrorPage` class) + - Integration tests for hooks with real Frappe documents + +### 5. Cloudflare Access Simulation Tests +- `tests/test_cloudflare_simulation.py` + - Comprehensive simulation of Cloudflare Access API interactions + - Mock JWT token validation with various scenarios + - Complete authentication flow testing without external calls + - Error handling and edge case testing + +### 6. Test Utilities +- `tests/test_utils.py` + - Mock classes for common objects (MockCloudflareAccess, MockJWTAuthSettings, etc.) + - Helper functions for creating test data + - Common assertions for Cloudflare URL validation + - Test data constants + +## Test Coverage + +### Functions Tested +- **auth.py**: All functions and class methods + - `JWTAuth.__init__()`, `auth()`, `validate_auth()`, `can_auth()`, `update()` + - `get_login_url()`, `get_logout_url()`, `get_public_keys()`, `get_token()`, `is_valid_token()`, `register_user()` + - `SessionJWTAuth.__init__()`, `__getattr__()` + - `handle_redirects()`, `jwt_logout()`, `on_logout()`, `web_logout()`, `validate_auth()` + +- **providers.py**: All provider classes and methods + - `BaseProvider.__init__()`, properties, `get_public_keys()` + - `CloudflareAccessProvider.__init__()`, properties, `get_jwks_url()`, `get_login_url()`, `get_logout_url()` + +- **hooks/contact.py**: Contact update hooks + - `on_update()` function with all scenarios + +- **hooks/error_page.py**: Error page handling + - `JWTAuthErrorPage.__init__()`, `can_render()`, `init_context()` + +### Doctypes Tested +- **JWT Auth Settings**: Creation, validation, persistence, password fields + +### API Testing +- **Cloudflare Access Integration**: Complete simulation without external calls + - JWKS endpoint responses + - JWT token validation (valid, expired, invalid audience) + - Login/logout URL generation + - User registration flow + - Error handling + +## Running Tests + +### Using Frappe Test Runner +```bash +# Run all tests +bench --site [site-name] run-tests --app jwt_auth + +# Run specific test file +bench --site [site-name] run-tests jwt_auth.jwt_auth.doctype.jwt_auth_settings.test_jwt_auth_settings + +# Run specific test class +bench --site [site-name] run-tests jwt_auth.tests.test_auth.TestJWTAuthUnit + +# Run with verbose output +bench --site [site-name] run-tests --app jwt_auth --verbose +``` + +### Using pytest (if available) +```bash +# Run all tests +pytest jwt_auth/tests/ + +# Run specific test file +pytest jwt_auth/tests/test_auth.py + +# Run with coverage +pytest jwt_auth/tests/ --cov=jwt_auth +``` + +## Test Design Principles + +### 1. Frappe Conventions +- Uses `frappe.tests.UnitTestCase` for unit tests +- Uses `frappe.tests.IntegrationTestCase` for integration tests +- Follows Frappe naming conventions and patterns +- Properly handles test record dependencies + +### 2. No External Dependencies +- All external API calls are mocked +- Cloudflare Access interactions are simulated +- No actual network requests are made during testing +- JWT token validation uses mock keys and payloads + +### 3. Comprehensive Coverage +- Tests all public methods and functions +- Tests both success and failure scenarios +- Tests edge cases and error conditions +- Tests integration between components + +### 4. Isolation and Cleanup +- Each test is isolated and doesn't affect others +- Proper setup and teardown methods +- Test data is cleaned up after each test +- Mock objects are reset between tests + +## Mock Strategy + +### External APIs +- **Cloudflare JWKS endpoint**: Mocked with realistic response structure +- **JWT token validation**: Mocked with various token scenarios +- **Network requests**: All `requests.get()` calls are mocked + +### Frappe Framework +- **frappe.local**: Mocked for request/session simulation +- **frappe.db**: Mocked for database operations +- **frappe.cache**: Mocked for caching operations +- **Document operations**: Mocked for create/read/update/delete + +### JWT Library +- **jwt.decode()**: Mocked to return various claim scenarios +- **jwt.algorithms.RSAAlgorithm**: Mocked for key handling +- **JWT exceptions**: Simulated for error testing + +## Test Data + +### Valid JWT Claims +```python +{ + "aud": ["test-secret"], + "email": "test@example.com", + "exp": , + "iat": , + "iss": "https://test-team.cloudflareaccess.com", + "sub": "1234567890", + "common_name": "test@example.com" +} +``` + +### Cloudflare JWKS Response +```python +{ + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "kid": "cloudflare-key-1", + "n": "example_n_parameter", + "e": "AQAB" + } + ] +} +``` + +## Test Scenarios Covered + +### Authentication Flow +1. Valid JWT token authentication +2. Expired JWT token handling +3. Invalid audience token handling +4. Missing token scenarios +5. User registration with/without existing contacts +6. Login/logout redirects + +### Provider Integration +1. Cloudflare URL generation +2. JWKS key retrieval +3. Multiple key handling +4. Network error handling +5. URL encoding edge cases + +### Settings Validation +1. Mandatory field validation +2. Conditional field requirements +3. Password field handling +4. Default value verification + +### Hooks Functionality +1. Contact field synchronization +2. Contact renaming logic +3. User profile updates +4. Error page rendering + +## Contributing + +When adding new functionality to JWT Auth: + +1. **Add corresponding tests** for new functions/methods +2. **Update existing tests** if changing behavior +3. **Add mock scenarios** for new external dependencies +4. **Test both success and failure paths** +5. **Follow existing naming conventions** +6. **Add docstrings** explaining test purpose +7. **Update this README** if adding new test categories + +## Common Test Patterns + +### Mocking External Calls +```python +@patch('jwt_auth.auth.requests.get') +def test_external_api(self, mock_requests): + mock_requests.return_value.json.return_value = {...} + # Test implementation +``` + +### Testing JWT Validation +```python +@patch('jwt_auth.auth.jwt.decode') +def test_jwt_validation(self, mock_jwt_decode): + mock_jwt_decode.return_value = {'email': 'test@example.com'} + # Test implementation +``` + +### Testing Frappe Documents +```python +def test_document_operation(self): + doc = frappe.get_doc('DocType', 'name') + # Modify doc + doc.save() + # Assert changes +``` \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..f3bbedc --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Test package for jwt_auth \ No newline at end of file diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..0c1266c --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,532 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +import json +import jwt +from unittest.mock import Mock, patch, MagicMock +from urllib.parse import quote + +import frappe +from frappe.tests import IntegrationTestCase, UnitTestCase + +from jwt_auth.auth import JWTAuth, SessionJWTAuth, handle_redirects, jwt_logout, on_logout, web_logout, validate_auth + + +class TestJWTAuthUnit(UnitTestCase): + """Unit tests for JWTAuth class methods.""" + + def setUp(self): + """Set up test environment.""" + # Mock settings + self.mock_settings = Mock() + self.mock_settings.enabled = True + self.mock_settings.enable_user_reg = True + self.mock_settings.enable_login = True + self.mock_settings.jwt_header = 'Cf-Access-Token' + self.mock_settings.jwks_url = 'https://test.cloudflareaccess.com/cdn-cgi/access/certs' + self.mock_settings.login_url = 'https://test.cloudflareaccess.com/cdn-cgi/access/login/test' + self.mock_settings.logout_url = 'https://test.cloudflareaccess.com/cdn-cgi/access/logout' + self.mock_settings.redirect_param = 'redirect_url' + self.mock_settings.get_password.return_value = 'test-secret' + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_jwt_auth_initialization(self, mock_get_cached_doc): + """Test JWTAuth initialization.""" + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth('/test/path', 200) + + self.assertEqual(auth.path, '/test/path') + self.assertEqual(auth.http_status_code, 200) + self.assertEqual(auth.settings, self.mock_settings) + self.assertIsNone(auth.claims) + self.assertIsNone(auth.user_email) + self.assertIsNone(auth.token) + self.assertIsNone(auth.redirect_to) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_update_method(self, mock_get_cached_doc): + """Test the update method.""" + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + auth.update('/new/path', 404) + + self.assertEqual(auth.path, '/new/path') + self.assertEqual(auth.http_status_code, 404) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_get_login_url_without_redirect(self, mock_get_cached_doc): + """Test get_login_url without redirect parameter.""" + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + login_url = auth.get_login_url() + + expected_url = 'https://test.cloudflareaccess.com/cdn-cgi/access/login/test' + self.assertEqual(login_url, expected_url) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_get_login_url_with_redirect(self, mock_get_cached_doc): + """Test get_login_url with redirect parameter.""" + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth('/test/path') + login_url = auth.get_login_url('/dashboard') + + expected_path = '%2F' + quote('/dashboard', safe='') + expected_url = f'https://test.cloudflareaccess.com/cdn-cgi/access/login/test?redirect_url={expected_path}' + self.assertEqual(login_url, expected_url) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + @patch('jwt_auth.auth.frappe.local') + def test_get_logout_url(self, mock_local, mock_get_cached_doc): + """Test get_logout_url method.""" + mock_get_cached_doc.return_value = self.mock_settings + mock_local.request.url = 'https://example.com/dashboard' + + auth = JWTAuth() + logout_url = auth.get_logout_url() + + expected_url = 'https://test.cloudflareaccess.com/cdn-cgi/access/logout?redirect_url=https://example.com/dashboard' + self.assertEqual(logout_url, expected_url) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + @patch('jwt_auth.auth.requests.get') + def test_get_public_keys(self, mock_requests_get, mock_get_cached_doc): + """Test get_public_keys method.""" + mock_get_cached_doc.return_value = self.mock_settings + + # Mock JWKS response + mock_jwks_response = { + "keys": [ + { + "kty": "RSA", + "kid": "test-key-id", + "use": "sig", + "alg": "RS256", + "n": "test-n-value", + "e": "AQAB" + } + ] + } + mock_requests_get.return_value.json.return_value = mock_jwks_response + + with patch('jwt_auth.auth.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_public_key = Mock() + mock_from_jwk.return_value = mock_public_key + + auth = JWTAuth() + public_keys = auth.get_public_keys() + + self.assertEqual(len(public_keys), 1) + self.assertEqual(public_keys[0], mock_public_key) + mock_requests_get.assert_called_once_with(self.mock_settings.jwks_url) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_get_token_from_cookie(self, mock_get_cached_doc): + """Test get_token method with token in cookie.""" + mock_get_cached_doc.return_value = self.mock_settings + + mock_request = Mock() + mock_request.cookies.get.return_value = 'test-jwt-token' + mock_request.headers.get.return_value = None + + auth = JWTAuth() + token = auth.get_token(mock_request) + + self.assertEqual(token, 'test-jwt-token') + mock_request.cookies.get.assert_called_once_with('Cf-Access-Token') + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_get_token_from_header(self, mock_get_cached_doc): + """Test get_token method with token in header.""" + mock_get_cached_doc.return_value = self.mock_settings + + mock_request = Mock() + mock_request.cookies.get.return_value = None + mock_request.headers.get.return_value = 'test-jwt-token' + + auth = JWTAuth() + token = auth.get_token(mock_request) + + self.assertEqual(token, 'test-jwt-token') + mock_request.headers.get.assert_called_once_with('Cf-Access-Token') + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_get_token_not_found(self, mock_get_cached_doc): + """Test get_token method when token is not found.""" + mock_get_cached_doc.return_value = self.mock_settings + + mock_request = Mock() + mock_request.cookies.get.return_value = None + mock_request.headers.get.return_value = None + + auth = JWTAuth() + token = auth.get_token(mock_request) + + self.assertIsNone(token) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_is_valid_token_success(self, mock_get_cached_doc): + """Test is_valid_token method with valid token.""" + mock_get_cached_doc.return_value = self.mock_settings + + # Mock JWT decode success + mock_claims = {'email': 'test@example.com', 'sub': '12345'} + mock_public_key = Mock() + + with patch.object(JWTAuth, 'get_public_keys', return_value=[mock_public_key]): + with patch('jwt_auth.auth.jwt.decode', return_value=mock_claims): + auth = JWTAuth() + result = auth.is_valid_token('test-token') + + self.assertTrue(result) + self.assertEqual(auth.claims, mock_claims) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_is_valid_token_failure(self, mock_get_cached_doc): + """Test is_valid_token method with invalid token.""" + mock_get_cached_doc.return_value = self.mock_settings + + mock_public_key = Mock() + + with patch.object(JWTAuth, 'get_public_keys', return_value=[mock_public_key]): + with patch('jwt_auth.auth.jwt.decode', side_effect=jwt.InvalidTokenError): + auth = JWTAuth() + result = auth.is_valid_token('invalid-token') + + self.assertFalse(result) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + def test_can_auth_conditions(self, mock_get_cached_doc): + """Test can_auth method various conditions.""" + mock_get_cached_doc.return_value = self.mock_settings + + with patch('jwt_auth.auth.frappe.local') as mock_local: + mock_local.session.user = 'Guest' + mock_local.request = Mock() + + auth = JWTAuth() + + # Test redirect_to condition + auth.redirect_to = '/somewhere' + self.assertFalse(auth.can_auth()) + + # Test authenticated user condition + auth.redirect_to = None + mock_local.session.user = 'test@example.com' + self.assertFalse(auth.can_auth()) + + # Test disabled settings condition + mock_local.session.user = 'Guest' + self.mock_settings.enabled = False + self.assertFalse(auth.can_auth()) + + # Test jwt_logout_redirect flag + self.mock_settings.enabled = True + with patch('jwt_auth.auth.frappe.flags.get', return_value=True): + self.assertFalse(auth.can_auth()) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + @patch('jwt_auth.auth.frappe.db') + @patch('jwt_auth.auth.frappe.get_doc') + def test_register_user_with_contact(self, mock_get_doc, mock_db, mock_get_cached_doc): + """Test register_user method when contact exists.""" + mock_get_cached_doc.return_value = self.mock_settings + + # Mock contact exists + mock_db.get_value.return_value = 'Contact-001' + + # Mock contact document + mock_contact = Mock() + mock_contact.first_name = 'John' + mock_contact.last_name = 'Doe' + mock_contact.full_name = 'John Doe' + mock_contact.phone = '123-456-7890' + mock_contact.mobile_no = '987-654-3210' + mock_contact.gender = 'Male' + mock_contact.company_name = 'Test Company' + mock_contact.middle_name = None + + # Mock user document + mock_user = Mock() + + mock_get_doc.side_effect = [mock_contact, mock_user] + + with patch('jwt_auth.auth.frappe.db.commit'): + auth = JWTAuth() + auth.register_user('test@example.com') + + # Verify user document creation + user_doc_args = mock_get_doc.call_args_list[1][0][0] + self.assertEqual(user_doc_args['doctype'], 'User') + self.assertEqual(user_doc_args['email'], 'test@example.com') + self.assertEqual(user_doc_args['first_name'], 'John') + self.assertEqual(user_doc_args['last_name'], 'Doe') + + # Verify contact is linked to user + mock_contact.save.assert_called_once_with(ignore_permissions=True) + mock_user.insert.assert_called_once_with(ignore_permissions=True) + + @patch('jwt_auth.auth.frappe.get_cached_doc') + @patch('jwt_auth.auth.frappe.db') + @patch('jwt_auth.auth.frappe.get_doc') + def test_register_user_without_contact(self, mock_get_doc, mock_db, mock_get_cached_doc): + """Test register_user method when contact doesn't exist.""" + mock_get_cached_doc.return_value = self.mock_settings + + # Mock no contact exists + mock_db.get_value.return_value = None + + # Mock user document + mock_user = Mock() + mock_get_doc.return_value = mock_user + + with patch('jwt_auth.auth.frappe.db.commit'): + auth = JWTAuth() + auth.register_user('test@example.com') + + # Verify user document creation + user_doc_args = mock_get_doc.call_args[0][0] + self.assertEqual(user_doc_args['doctype'], 'User') + self.assertEqual(user_doc_args['email'], 'test@example.com') + self.assertEqual(user_doc_args['first_name'], '[Change Me]') + + # Verify redirect is set for profile update + self.assertEqual(auth.redirect_to, '/update-profile/test@example.com/edit') + + mock_user.insert.assert_called_once_with(ignore_permissions=True) + + +class TestSessionJWTAuthUnit(UnitTestCase): + """Unit tests for SessionJWTAuth class.""" + + @patch('jwt_auth.auth.frappe.local') + def test_session_jwt_auth_initialization(self, mock_local): + """Test SessionJWTAuth initialization.""" + mock_local.jwt_auth = None + + with patch('jwt_auth.auth.JWTAuth') as mock_jwt_auth: + mock_jwt_auth_instance = Mock() + mock_jwt_auth.return_value = mock_jwt_auth_instance + + session_auth = SessionJWTAuth('/test', 200) + + mock_jwt_auth.assert_called_once_with('/test', 200) + self.assertEqual(mock_local.jwt_auth, mock_jwt_auth_instance) + + @patch('jwt_auth.auth.frappe.local') + def test_session_jwt_auth_reuse_existing(self, mock_local): + """Test SessionJWTAuth reuses existing instance.""" + mock_existing_auth = Mock() + mock_local.jwt_auth = mock_existing_auth + + session_auth = SessionJWTAuth('/new', 404) + + mock_existing_auth.update.assert_called_once_with('/new', 404) + + @patch('jwt_auth.auth.frappe.local') + def test_session_jwt_auth_getattr(self, mock_local): + """Test SessionJWTAuth __getattr__ delegation.""" + mock_jwt_auth = Mock() + mock_jwt_auth.test_method.return_value = 'test_result' + mock_local.jwt_auth = mock_jwt_auth + + session_auth = SessionJWTAuth() + result = session_auth.test_method() + + self.assertEqual(result, 'test_result') + mock_jwt_auth.test_method.assert_called_once() + + +class TestUtilityFunctionsUnit(UnitTestCase): + """Unit tests for utility functions.""" + + @patch('jwt_auth.auth.SessionJWTAuth') + @patch('jwt_auth.auth.frappe.local') + def test_jwt_logout(self, mock_local, mock_session_jwt_auth): + """Test jwt_logout function.""" + mock_auth = Mock() + mock_auth.settings.enabled = True + mock_auth.get_logout_url.return_value = 'https://logout.url' + mock_session_jwt_auth.return_value = mock_auth + + mock_login_manager = Mock() + mock_local.login_manager = mock_login_manager + + result = jwt_logout() + + mock_login_manager.logout.assert_called_once() + self.assertEqual(result, {'redirect_url': 'https://logout.url'}) + + @patch('jwt_auth.auth.SessionJWTAuth') + @patch('jwt_auth.auth.frappe.local') + def test_jwt_logout_disabled(self, mock_local, mock_session_jwt_auth): + """Test jwt_logout function when JWT auth is disabled.""" + mock_auth = Mock() + mock_auth.settings.enabled = False + mock_session_jwt_auth.return_value = mock_auth + + mock_login_manager = Mock() + mock_local.login_manager = mock_login_manager + + result = jwt_logout() + + mock_login_manager.logout.assert_called_once() + self.assertEqual(result, {'redirect_url': '/login'}) + + @patch('jwt_auth.auth.SessionJWTAuth') + @patch('jwt_auth.auth.frappe.flags') + def test_on_logout(self, mock_flags, mock_session_jwt_auth): + """Test on_logout function.""" + mock_auth = Mock() + mock_auth.get_logout_url.return_value = 'https://logout.url' + mock_session_jwt_auth.return_value = mock_auth + + on_logout() + + mock_flags.__setitem__.assert_called_once_with('jwt_logout_redirect', 'https://logout.url') + + @patch('jwt_auth.auth.SessionJWTAuth') + @patch('jwt_auth.auth.frappe.local') + def test_web_logout(self, mock_local, mock_session_jwt_auth): + """Test web_logout function.""" + mock_auth = Mock() + mock_auth.settings.enabled = True + mock_auth.get_logout_url.return_value = 'https://logout.url' + mock_session_jwt_auth.return_value = mock_auth + + mock_login_manager = Mock() + mock_local.login_manager = mock_login_manager + mock_local.response = {} + + web_logout() + + mock_login_manager.logout.assert_called_once() + self.assertEqual(mock_local.response['type'], 'redirect') + self.assertEqual(mock_local.response['location'], 'https://logout.url') + + @patch('jwt_auth.auth.SessionJWTAuth') + def test_validate_auth(self, mock_session_jwt_auth): + """Test validate_auth function.""" + mock_auth = Mock() + mock_session_jwt_auth.return_value = mock_auth + + validate_auth() + + mock_auth.validate_auth.assert_called_once() + + @patch('jwt_auth.auth.frappe.session') + @patch('jwt_auth.auth.frappe.flags') + def test_handle_redirects_logout_redirect(self, mock_flags, mock_session): + """Test handle_redirects with logout redirect.""" + mock_response = Mock() + mock_request = Mock() + + mock_session.get.return_value = 'Guest' + mock_flags.get.return_value = 'https://logout.url' + mock_flags.pop.return_value = 'https://logout.url' + + handle_redirects(mock_response, mock_request) + + self.assertEqual(mock_response.status_code, 302) + self.assertEqual(mock_response.headers['Location'], 'https://logout.url') + + @patch('jwt_auth.auth.frappe.session') + @patch('jwt_auth.auth.frappe.cache') + def test_handle_redirects_auth_redirect(self, mock_cache, mock_session): + """Test handle_redirects with auth redirect.""" + mock_response = Mock() + mock_request = Mock() + mock_request.path = '/dashboard' + + mock_session.get.return_value = 'test@example.com' + mock_session.data = {'jwt_auth_redirect': '/dashboard'} + + with patch('jwt_auth.auth.frappe.flags.get', return_value=False): + handle_redirects(mock_response, mock_request) + + self.assertEqual(mock_response.status_code, 302) + self.assertEqual(mock_response.headers['Location'], '/dashboard') + + @patch('jwt_auth.auth.frappe.session') + @patch('jwt_auth.auth.frappe.cache') + def test_handle_redirects_cache_redirect(self, mock_cache, mock_session): + """Test handle_redirects with cache-based redirect.""" + mock_response = Mock() + mock_request = Mock() + mock_request.path = '/me' + + mock_session.get.return_value = 'test@example.com' + mock_session.user = 'test@example.com' + mock_session.data = {} + + mock_cache_instance = Mock() + mock_cache_instance.get_value.return_value = '/original/location' + mock_cache.return_value = mock_cache_instance + + with patch('jwt_auth.auth.frappe.flags.get', return_value=False): + handle_redirects(mock_response, mock_request) + + self.assertEqual(mock_response.status_code, 302) + self.assertEqual(mock_response.headers['Location'], '/original/location') + mock_cache_instance.delete_value.assert_called_once() + + +class TestJWTAuthIntegration(IntegrationTestCase): + """Integration tests for JWT authentication flow.""" + + def setUp(self): + """Set up test environment.""" + # Create test JWT Auth Settings + try: + self.settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + self.settings = frappe.new_doc("JWT Auth Settings") + + self.settings.update({ + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'test-secret', + 'login_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/login/test', + 'logout_url': 'https://test.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url' + }) + self.settings.save() + + def test_full_authentication_flow(self): + """Test complete authentication flow.""" + # Mock external dependencies + with patch('jwt_auth.auth.requests.get') as mock_requests: + with patch('jwt_auth.auth.jwt.decode') as mock_jwt_decode: + with patch('jwt_auth.auth.frappe.local') as mock_local: + # Setup mocks + mock_requests.return_value.json.return_value = { + "keys": [{"kty": "RSA", "kid": "test", "use": "sig", "alg": "RS256", "n": "test", "e": "AQAB"}] + } + mock_jwt_decode.return_value = {'email': 'test@example.com'} + + mock_local.session.user = 'Guest' + mock_local.request = Mock() + mock_local.request.cookies.get.return_value = 'test-jwt-token' + mock_local.request.headers.get.return_value = None + + # Test authentication + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + can_auth = auth.can_auth() + self.assertTrue(can_auth) + self.assertEqual(auth.token, 'test-jwt-token') + + def tearDown(self): + """Clean up after tests.""" + # Reset settings + self.settings.enabled = 0 + self.settings.enable_user_reg = 0 + self.settings.enable_login = 0 + self.settings.save() \ No newline at end of file diff --git a/tests/test_cloudflare_simulation.py b/tests/test_cloudflare_simulation.py new file mode 100644 index 0000000..c94bfbb --- /dev/null +++ b/tests/test_cloudflare_simulation.py @@ -0,0 +1,422 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +import json +import jwt +import time +from datetime import datetime, timedelta +from unittest.mock import Mock, patch, MagicMock + +import frappe +from frappe.tests import IntegrationTestCase, UnitTestCase + +from jwt_auth.auth import JWTAuth, SessionJWTAuth +from jwt_auth.providers import CloudflareAccessProvider + + +class TestCloudflareAccessSimulation(UnitTestCase): + """Unit tests simulating Cloudflare Access API interactions without external calls.""" + + def setUp(self): + """Set up test environment with mock Cloudflare responses.""" + self.team_name = 'test-team' + self.aud_tag = 'test-aud-tag' + self.secret = 'test-audience-secret' + + # Mock JWT Auth Settings + self.mock_settings = Mock() + self.mock_settings.enabled = True + self.mock_settings.enable_user_reg = True + self.mock_settings.enable_login = True + self.mock_settings.jwt_header = 'Cf-Access-Token' + self.mock_settings.team_name = self.team_name + self.mock_settings.aud_tag = self.aud_tag + self.mock_settings.get_password.return_value = self.secret + + # Mock Cloudflare JWKS response + self.mock_jwks_response = { + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": [ + "MIIC+DCCAeCgAwIBAgIJAKZ7...example_certificate_data" + ], + "n": "example_n_parameter_base64url_encoded", + "e": "AQAB", + "kid": "cloudflare-access-key-1", + "x5t": "example_x5t_thumbprint" + } + ] + } + + # Mock valid JWT token payload from Cloudflare Access + self.mock_valid_jwt_payload = { + "aud": [self.secret], + "email": "user@example.com", + "exp": int(time.time()) + 3600, # Expires in 1 hour + "iat": int(time.time()), + "iss": f"https://{self.team_name}.cloudflareaccess.com", + "sub": "1234567890abcdef", + "custom": { + "name": "John Doe", + "groups": ["Developers", "Users"] + }, + "identity_nonce": "example_nonce", + "common_name": "user@example.com", + "country": "US" + } + + # Mock expired JWT token payload + self.mock_expired_jwt_payload = { + **self.mock_valid_jwt_payload, + "exp": int(time.time()) - 3600, # Expired 1 hour ago + "iat": int(time.time()) - 7200 # Issued 2 hours ago + } + + # Mock invalid audience JWT token payload + self.mock_invalid_aud_jwt_payload = { + **self.mock_valid_jwt_payload, + "aud": ["wrong-audience"] + } + + def test_cloudflare_jwks_endpoint_simulation(self): + """Test simulation of Cloudflare JWKS endpoint call.""" + provider = CloudflareAccessProvider(self.mock_settings) + + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_response = Mock() + mock_response.json.return_value = self.mock_jwks_response + mock_requests.return_value = mock_response + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_public_key = Mock() + mock_from_jwk.return_value = mock_public_key + + public_keys = provider.get_public_keys() + + # Verify correct Cloudflare JWKS URL was called + expected_url = f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/certs' + mock_requests.assert_called_once_with(expected_url) + + # Verify public key was extracted + self.assertEqual(len(public_keys), 1) + self.assertEqual(public_keys[0], mock_public_key) + + def test_cloudflare_login_redirect_simulation(self): + """Test simulation of Cloudflare Access login redirect.""" + provider = CloudflareAccessProvider(self.mock_settings) + + # Test login URL without redirect + login_url = provider.get_login_url() + expected_base_url = f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/login/{self.aud_tag}' + self.assertEqual(login_url, expected_base_url) + + # Test login URL with redirect + redirect_to = '/app/dashboard' + login_url_with_redirect = provider.get_login_url(redirect_to) + self.assertIn('redirect_url=', login_url_with_redirect) + self.assertIn('%2Fapp%2Fdashboard', login_url_with_redirect) + + def test_cloudflare_logout_redirect_simulation(self): + """Test simulation of Cloudflare Access logout redirect.""" + provider = CloudflareAccessProvider(self.mock_settings) + + with patch('jwt_auth.providers.frappe.utils.get_url') as mock_get_url: + mock_get_url.return_value = 'https://myapp.example.com' + + logout_url = provider.get_logout_url() + + # Verify logout URL structure + expected_base = f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/logout' + self.assertIn(expected_base, logout_url) + self.assertIn('redirect_url=', logout_url) + self.assertIn('https%3A//myapp.example.com', logout_url) + + def test_valid_cloudflare_jwt_token_simulation(self): + """Test simulation of valid Cloudflare Access JWT token validation.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys') as mock_get_keys: + mock_public_key = Mock() + mock_get_keys.return_value = [mock_public_key] + + with patch('jwt_auth.auth.jwt.decode') as mock_jwt_decode: + mock_jwt_decode.return_value = self.mock_valid_jwt_payload + + # Test token validation + result = auth.is_valid_token('mock.jwt.token') + + self.assertTrue(result) + self.assertEqual(auth.claims, self.mock_valid_jwt_payload) + + # Verify JWT decode was called with correct parameters + mock_jwt_decode.assert_called_once_with( + 'mock.jwt.token', + key=mock_public_key, + audience=self.secret, + algorithms=['RS256'] + ) + + def test_expired_cloudflare_jwt_token_simulation(self): + """Test simulation of expired Cloudflare Access JWT token.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys') as mock_get_keys: + mock_public_key = Mock() + mock_get_keys.return_value = [mock_public_key] + + with patch('jwt_auth.auth.jwt.decode') as mock_jwt_decode: + # Simulate expired token error + mock_jwt_decode.side_effect = jwt.ExpiredSignatureError('Token expired') + + result = auth.is_valid_token('expired.jwt.token') + + self.assertFalse(result) + + def test_invalid_audience_cloudflare_jwt_token_simulation(self): + """Test simulation of Cloudflare Access JWT token with invalid audience.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys') as mock_get_keys: + mock_public_key = Mock() + mock_get_keys.return_value = [mock_public_key] + + with patch('jwt_auth.auth.jwt.decode') as mock_jwt_decode: + # Simulate invalid audience error + mock_jwt_decode.side_effect = jwt.InvalidAudienceError('Invalid audience') + + result = auth.is_valid_token('invalid.aud.token') + + self.assertFalse(result) + + def test_cloudflare_token_extraction_simulation(self): + """Test simulation of extracting Cloudflare Access token from request.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + + # Test token in Cf-Access-Token cookie (Cloudflare's preferred method) + mock_request_cookie = Mock() + mock_request_cookie.cookies.get.return_value = 'cookie.jwt.token' + mock_request_cookie.headers.get.return_value = None + + token = auth.get_token(mock_request_cookie) + self.assertEqual(token, 'cookie.jwt.token') + mock_request_cookie.cookies.get.assert_called_once_with('Cf-Access-Token') + + # Test token in Cf-Access-Token header (fallback method) + mock_request_header = Mock() + mock_request_header.cookies.get.return_value = None + mock_request_header.headers.get.return_value = 'header.jwt.token' + + token = auth.get_token(mock_request_header) + self.assertEqual(token, 'header.jwt.token') + mock_request_header.headers.get.assert_called_once_with('Cf-Access-Token') + + def test_cloudflare_user_registration_simulation(self): + """Test simulation of user registration with Cloudflare Access data.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + auth = JWTAuth() + auth.claims = self.mock_valid_jwt_payload + + # Mock database and document operations + with patch('jwt_auth.auth.frappe.db') as mock_db: + with patch('jwt_auth.auth.frappe.get_doc') as mock_get_doc: + with patch('jwt_auth.auth.frappe.db.commit'): + # Simulate no existing contact + mock_db.get_value.return_value = None + + # Mock user document creation + mock_user = Mock() + mock_get_doc.return_value = mock_user + + auth.register_user('user@example.com') + + # Verify user document was created with correct data + user_doc_args = mock_get_doc.call_args[0][0] + self.assertEqual(user_doc_args['doctype'], 'User') + self.assertEqual(user_doc_args['email'], 'user@example.com') + self.assertEqual(user_doc_args['first_name'], '[Change Me]') + + # Verify redirect is set for profile completion + self.assertEqual(auth.redirect_to, '/update-profile/user@example.com/edit') + + mock_user.insert.assert_called_once_with(ignore_permissions=True) + + def test_full_cloudflare_authentication_flow_simulation(self): + """Test simulation of complete Cloudflare Access authentication flow.""" + with patch('jwt_auth.auth.frappe.get_cached_doc') as mock_get_cached_doc: + mock_get_cached_doc.return_value = self.mock_settings + + with patch('jwt_auth.auth.frappe.local') as mock_local: + with patch('jwt_auth.auth.frappe.qb') as mock_qb: + # Setup mock request with Cloudflare token + mock_local.session.user = 'Guest' + mock_local.request = Mock() + mock_local.request.cookies.get.return_value = 'cloudflare.jwt.token' + mock_local.request.headers.get.return_value = None + + # Setup mock login manager + mock_login_manager = Mock() + mock_local.login_manager = mock_login_manager + + # Mock existing user lookup + mock_contact = Mock() + mock_contact_email = Mock() + mock_qb.DocType.side_effect = [mock_contact, mock_contact_email] + mock_qb.from_.return_value.select.return_value.join.return_value.on.return_value.where.return_value.run.return_value = [ + {'user': 'user@example.com'} + ] + + auth = JWTAuth() + + # Mock token validation + with patch.object(auth, 'get_public_keys') as mock_get_keys: + with patch('jwt_auth.auth.jwt.decode') as mock_jwt_decode: + mock_get_keys.return_value = [Mock()] + mock_jwt_decode.return_value = self.mock_valid_jwt_payload + + # Test can_auth + can_authenticate = auth.can_auth() + self.assertTrue(can_authenticate) + self.assertEqual(auth.token, 'cloudflare.jwt.token') + self.assertEqual(auth.claims, self.mock_valid_jwt_payload) + + # Test auth + auth.auth() + + # Verify user was logged in + mock_login_manager.login_as.assert_called_once_with('user@example.com') + + +class TestCloudflareAccessIntegration(IntegrationTestCase): + """Integration tests for Cloudflare Access simulation with real Frappe environment.""" + + def setUp(self): + """Set up test environment with Cloudflare settings.""" + # Create JWT Auth Settings with Cloudflare configuration + try: + self.settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + self.settings = frappe.new_doc("JWT Auth Settings") + + # Configure for Cloudflare Access + self.team_name = 'test-integration-team' + self.aud_tag = 'test-integration-aud' + + self.settings.update({ + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'integration-test-secret', + 'login_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/login/{self.aud_tag}', + 'logout_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url' + }) + + # Add Cloudflare-specific fields + if not hasattr(self.settings, 'team_name'): + setattr(self.settings, 'team_name', self.team_name) + if not hasattr(self.settings, 'aud_tag'): + setattr(self.settings, 'aud_tag', self.aud_tag) + + self.settings.save() + + def test_cloudflare_provider_integration_with_settings(self): + """Test CloudflareAccessProvider integration with real JWT Auth Settings.""" + provider = CloudflareAccessProvider(self.settings) + + # Test that provider uses settings correctly + self.assertEqual(provider.team_name, self.team_name) + self.assertEqual(provider.aud_tag, self.aud_tag) + self.assertEqual(provider.jwt_header, 'Cf-Access-Token') + + # Test URL generation + login_url = provider.get_login_url('/dashboard') + expected_login = f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/login/{self.aud_tag}?redirect_url=%2F%2Fdashboard' + self.assertEqual(login_url, expected_login) + + jwks_url = provider.get_jwks_url() + expected_jwks = f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/certs' + self.assertEqual(jwks_url, expected_jwks) + + def test_cloudflare_error_handling_simulation(self): + """Test error handling in Cloudflare Access simulation.""" + provider = CloudflareAccessProvider(self.settings) + + # Test JWKS endpoint error + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_requests.side_effect = Exception('Network error') + + with self.assertRaises(Exception): + provider.get_public_keys() + + def test_cloudflare_jwt_signature_verification_simulation(self): + """Test JWT signature verification simulation with multiple keys.""" + provider = CloudflareAccessProvider(self.settings) + + # Mock JWKS response with multiple keys (common in production) + mock_jwks_response = { + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "kid": "old-cloudflare-key", + "n": "old_key_n_value", + "e": "AQAB" + }, + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "kid": "current-cloudflare-key", + "n": "current_key_n_value", + "e": "AQAB" + } + ] + } + + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_response = Mock() + mock_response.json.return_value = mock_jwks_response + mock_requests.return_value = mock_response + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_key_1 = Mock() + mock_key_2 = Mock() + mock_from_jwk.side_effect = [mock_key_1, mock_key_2] + + public_keys = provider.get_public_keys() + + # Verify both keys were processed + self.assertEqual(len(public_keys), 2) + self.assertEqual(public_keys[0], mock_key_1) + self.assertEqual(public_keys[1], mock_key_2) + + # Verify correct number of from_jwk calls + self.assertEqual(mock_from_jwk.call_count, 2) + + def tearDown(self): + """Clean up after tests.""" + # Reset settings + self.settings.enabled = 0 + self.settings.enable_user_reg = 0 + self.settings.enable_login = 0 + self.settings.save() \ No newline at end of file diff --git a/tests/test_hooks.py b/tests/test_hooks.py new file mode 100644 index 0000000..9c1180f --- /dev/null +++ b/tests/test_hooks.py @@ -0,0 +1,358 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +from unittest.mock import Mock, patch, MagicMock + +import frappe +from frappe.tests import IntegrationTestCase, UnitTestCase + +from jwt_auth.jwt_auth.hooks.contact import on_update +from jwt_auth.jwt_auth.hooks.error_page import JWTAuthErrorPage + + +class TestContactHooksUnit(UnitTestCase): + """Unit tests for contact hooks.""" + + def test_on_update_rename_contact_with_change_me(self): + """Test contact rename when name contains '[Change Me]'.""" + mock_doc = Mock() + mock_doc.name = 'Contact [Change Me]' + mock_doc.get.return_value = 'John Doe' + mock_doc.full_name = 'John Doe' + + with patch('jwt_auth.jwt_auth.hooks.contact.frappe.enqueue') as mock_enqueue: + on_update(mock_doc, 'on_update') + + mock_enqueue.assert_called_once_with( + "frappe.model.rename_doc.rename_doc", + doctype=mock_doc.get("doctype"), + old=mock_doc.get("name"), + new=mock_doc.get("full_name"), + force=False, + show_alert=True, + ) + + def test_on_update_no_rename_without_change_me(self): + """Test that contact is not renamed when name doesn't contain '[Change Me]'.""" + mock_doc = Mock() + mock_doc.name = 'Normal Contact Name' + mock_doc.get.return_value = 'John Doe' + + with patch('jwt_auth.jwt_auth.hooks.contact.frappe.enqueue') as mock_enqueue: + on_update(mock_doc, 'on_update') + + mock_enqueue.assert_not_called() + + def test_on_update_no_rename_without_full_name(self): + """Test that contact is not renamed when full_name is not provided.""" + mock_doc = Mock() + mock_doc.name = 'Contact [Change Me]' + mock_doc.get.return_value = None + + with patch('jwt_auth.jwt_auth.hooks.contact.frappe.enqueue') as mock_enqueue: + on_update(mock_doc, 'on_update') + + mock_enqueue.assert_not_called() + + def test_on_update_user_fields_update(self): + """Test user fields update when contact has a linked user.""" + mock_doc = Mock() + mock_doc.name = 'Test Contact' + mock_doc.get.side_effect = lambda field, default=None: { + 'user': 'test@example.com', + 'first_name': 'John', + 'middle_name': 'Michael', + 'last_name': 'Doe', + 'full_name': 'John Michael Doe', + 'phone': '123-456-7890', + 'mobile_no': '987-654-3210', + 'gender': 'Male', + 'full_name': None # For the first call in rename logic + }.get(field, default) + + mock_doc.has_value_changed.side_effect = lambda field: field in ['first_name', 'phone'] + + mock_user = Mock() + + with patch('jwt_auth.jwt_auth.hooks.contact.User') as mock_user_class: + mock_user_class.return_value = mock_user + + on_update(mock_doc, 'on_update') + + # Verify User was instantiated correctly + mock_user_class.assert_called_once_with("User", "test@example.com") + + # Verify only changed fields were updated + expected_update_fields = { + 'first_name': 'John', + 'phone': '123-456-7890' + } + mock_user.update.assert_called_once_with(expected_update_fields) + mock_user.save.assert_called_once_with(ignore_permissions=True) + + def test_on_update_no_user_fields_update_when_no_user(self): + """Test that user fields are not updated when contact has no linked user.""" + mock_doc = Mock() + mock_doc.name = 'Test Contact' + mock_doc.get.side_effect = lambda field, default=None: { + 'user': False, + 'full_name': None # For the rename logic + }.get(field, default) + + with patch('jwt_auth.jwt_auth.hooks.contact.User') as mock_user_class: + on_update(mock_doc, 'on_update') + + mock_user_class.assert_not_called() + + def test_on_update_no_user_fields_update_when_no_changes(self): + """Test that user fields are not updated when no fields have changed.""" + mock_doc = Mock() + mock_doc.name = 'Test Contact' + mock_doc.get.side_effect = lambda field, default=None: { + 'user': 'test@example.com', + 'first_name': 'John', + 'last_name': 'Doe', + 'full_name': None # For the rename logic + }.get(field, default) + + # No fields have changed + mock_doc.has_value_changed.return_value = False + + with patch('jwt_auth.jwt_auth.hooks.contact.User') as mock_user_class: + on_update(mock_doc, 'on_update') + + mock_user_class.assert_not_called() + + def test_on_update_user_fields_filter_none_values(self): + """Test that None values are filtered out from user updates.""" + mock_doc = Mock() + mock_doc.name = 'Test Contact' + mock_doc.get.side_effect = lambda field, default=None: { + 'user': 'test@example.com', + 'first_name': 'John', + 'middle_name': None, # This should be filtered out + 'last_name': 'Doe', + 'phone': None, # This should be filtered out + 'full_name': None # For the rename logic + }.get(field, default) + + # Mock that first_name and last_name changed, but middle_name and phone are None + mock_doc.has_value_changed.side_effect = lambda field: field in ['first_name', 'middle_name', 'last_name', 'phone'] + + mock_user = Mock() + + with patch('jwt_auth.jwt_auth.hooks.contact.User') as mock_user_class: + mock_user_class.return_value = mock_user + + on_update(mock_doc, 'on_update') + + # Verify only non-None changed fields were updated + expected_update_fields = { + 'first_name': 'John', + 'last_name': 'Doe' + } + mock_user.update.assert_called_once_with(expected_update_fields) + + +class TestErrorPageHooksUnit(UnitTestCase): + """Unit tests for error page hooks.""" + + def test_jwt_auth_error_page_initialization(self): + """Test JWTAuthErrorPage initialization.""" + error_page = JWTAuthErrorPage( + path='/test/path', + http_status_code=500, + exception=Exception('Test error'), + title='Test Error', + message='Test error message' + ) + + # Path should be overridden to jwt_auth_error + self.assertEqual(error_page.path, 'jwt_auth_error') + self.assertEqual(error_page.http_status_code, 500) + self.assertEqual(error_page.title, 'Test Error') + self.assertEqual(error_page.message, 'Test error message') + + def test_jwt_auth_error_page_initialization_minimal(self): + """Test JWTAuthErrorPage initialization with minimal parameters.""" + error_page = JWTAuthErrorPage() + + self.assertEqual(error_page.path, 'jwt_auth_error') + self.assertIsNone(error_page.http_status_code) + self.assertIsNone(error_page.exception) + self.assertIsNone(error_page.title) + self.assertIsNone(error_page.message) + + def test_can_render(self): + """Test can_render method always returns True.""" + error_page = JWTAuthErrorPage() + self.assertTrue(error_page.can_render()) + + def test_init_context_with_explicit_values(self): + """Test init_context with explicitly provided values.""" + error_page = JWTAuthErrorPage( + http_status_code=404, + title='Page Not Found', + message='The requested page could not be found' + ) + + # Mock parent init_context + with patch('jwt_auth.jwt_auth.hooks.error_page.ErrorPage.init_context') as mock_parent_init: + error_page.context = Mock() + error_page.init_context() + + mock_parent_init.assert_called_once() + self.assertEqual(error_page.context.http_status_code, 404) + self.assertEqual(error_page.context.title, 'Page Not Found') + self.assertEqual(error_page.context.message, 'The requested page could not be found') + + def test_init_context_with_exception_values(self): + """Test init_context with values from exception.""" + mock_exception = Mock() + mock_exception.http_status_code = 403 + mock_exception.title = 'Access Denied' + mock_exception.message = 'You do not have permission to access this resource' + + error_page = JWTAuthErrorPage(exception=mock_exception) + + with patch('jwt_auth.jwt_auth.hooks.error_page.ErrorPage.init_context') as mock_parent_init: + error_page.context = Mock() + error_page.init_context() + + mock_parent_init.assert_called_once() + self.assertEqual(error_page.context.http_status_code, 403) + self.assertEqual(error_page.context.title, 'Access Denied') + self.assertEqual(error_page.context.message, 'You do not have permission to access this resource') + + def test_init_context_fallback_to_defaults(self): + """Test init_context fallback to default values.""" + error_page = JWTAuthErrorPage() + + with patch('jwt_auth.jwt_auth.hooks.error_page.ErrorPage.init_context') as mock_parent_init: + error_page.context = Mock() + error_page.init_context() + + mock_parent_init.assert_called_once() + self.assertEqual(error_page.context.http_status_code, 500) # Default fallback + self.assertIsNone(error_page.context.title) + self.assertIsNone(error_page.context.message) + + def test_init_context_priority_explicit_over_exception(self): + """Test that explicit values take priority over exception values.""" + mock_exception = Mock() + mock_exception.http_status_code = 403 + mock_exception.title = 'Exception Title' + mock_exception.message = 'Exception Message' + + error_page = JWTAuthErrorPage( + exception=mock_exception, + http_status_code=500, # This should override exception value + title='Explicit Title', # This should override exception value + message='Explicit Message' # This should override exception value + ) + + with patch('jwt_auth.jwt_auth.hooks.error_page.ErrorPage.init_context') as mock_parent_init: + error_page.context = Mock() + error_page.init_context() + + mock_parent_init.assert_called_once() + self.assertEqual(error_page.context.http_status_code, 500) + self.assertEqual(error_page.context.title, 'Explicit Title') + self.assertEqual(error_page.context.message, 'Explicit Message') + + +class TestHooksIntegration(IntegrationTestCase): + """Integration tests for hooks functionality.""" + + def setUp(self): + """Set up test environment.""" + # Create test user and contact + self.test_email = 'test-contact-hook@example.com' + + # Clean up any existing test data + self.cleanup_test_data() + + # Create test user + self.test_user = frappe.get_doc({ + 'doctype': 'User', + 'email': self.test_email, + 'first_name': 'Test', + 'last_name': 'User', + 'send_welcome_email': 0 + }) + self.test_user.insert(ignore_permissions=True) + + # Create test contact + self.test_contact = frappe.get_doc({ + 'doctype': 'Contact', + 'first_name': 'Test', + 'last_name': 'Contact', + 'full_name': 'Test Contact', + 'user': self.test_email + }) + self.test_contact.insert(ignore_permissions=True) + + def test_contact_hook_integration(self): + """Test contact hook integration with real documents.""" + # Update contact fields + self.test_contact.first_name = 'Updated' + self.test_contact.phone = '555-1234' + + # Mock the has_value_changed method to simulate changes + original_has_value_changed = self.test_contact.has_value_changed + self.test_contact.has_value_changed = lambda field: field in ['first_name', 'phone'] + + try: + # Trigger the hook + on_update(self.test_contact, 'on_update') + + # Reload user to check if changes were applied + updated_user = frappe.get_doc('User', self.test_email) + self.assertEqual(updated_user.first_name, 'Updated') + self.assertEqual(updated_user.phone, '555-1234') + + finally: + # Restore original method + self.test_contact.has_value_changed = original_has_value_changed + + def test_contact_rename_integration(self): + """Test contact rename integration.""" + # Create a contact with '[Change Me]' in the name + change_me_contact = frappe.get_doc({ + 'doctype': 'Contact', + 'first_name': '[Change Me]', + 'last_name': 'User', + 'full_name': 'John Doe' + }) + change_me_contact.insert(ignore_permissions=True) + + try: + # Mock frappe.enqueue to capture the rename call + with patch('jwt_auth.jwt_auth.hooks.contact.frappe.enqueue') as mock_enqueue: + on_update(change_me_contact, 'on_update') + + # Verify enqueue was called with correct parameters + mock_enqueue.assert_called_once() + call_args = mock_enqueue.call_args + self.assertEqual(call_args[0][0], "frappe.model.rename_doc.rename_doc") + self.assertEqual(call_args[1]['old'], change_me_contact.name) + self.assertEqual(call_args[1]['new'], 'John Doe') + + finally: + # Clean up + change_me_contact.delete(ignore_permissions=True) + + def cleanup_test_data(self): + """Clean up test data.""" + # Delete test contact if exists + contacts = frappe.get_all('Contact', filters={'user': self.test_email}) + for contact in contacts: + frappe.delete_doc('Contact', contact.name, ignore_permissions=True) + + # Delete test user if exists + if frappe.db.exists('User', self.test_email): + frappe.delete_doc('User', self.test_email, ignore_permissions=True) + + def tearDown(self): + """Clean up after tests.""" + self.cleanup_test_data() \ No newline at end of file diff --git a/tests/test_providers.py b/tests/test_providers.py new file mode 100644 index 0000000..17e5c70 --- /dev/null +++ b/tests/test_providers.py @@ -0,0 +1,321 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +import json +import jwt +from unittest.mock import Mock, patch, MagicMock +from urllib.parse import quote + +import frappe +from frappe.tests import IntegrationTestCase, UnitTestCase + +from jwt_auth.providers import BaseProvider, CloudflareAccessProvider + + +class TestBaseProviderUnit(UnitTestCase): + """Unit tests for BaseProvider class.""" + + def setUp(self): + """Set up test environment.""" + self.mock_settings = Mock() + self.mock_settings.enabled = True + self.mock_settings.enable_login = True + self.mock_settings.enable_user_reg = True + self.mock_settings.get_password.return_value = 'test-secret' + + def test_base_provider_initialization(self): + """Test BaseProvider initialization.""" + provider = BaseProvider(self.mock_settings) + + self.assertEqual(provider.settings, self.mock_settings) + + def test_base_provider_properties(self): + """Test BaseProvider properties.""" + provider = BaseProvider(self.mock_settings) + + self.assertTrue(provider.enabled) + self.assertTrue(provider.enable_login) + self.assertTrue(provider.enable_user_reg) + self.assertEqual(provider.jwt_private_secret, 'test-secret') + self.assertEqual(provider.jwt_header, 'Cf-Access-Token') + + def test_base_provider_abstract_methods(self): + """Test that abstract methods raise NotImplementedError.""" + provider = BaseProvider(self.mock_settings) + + with self.assertRaises(NotImplementedError): + provider.get_login_url() + + with self.assertRaises(NotImplementedError): + provider.get_logout_url() + + with self.assertRaises(NotImplementedError): + provider.get_jwks_url() + + @patch('jwt_auth.providers.requests.get') + def test_get_public_keys(self, mock_requests_get): + """Test get_public_keys method.""" + provider = BaseProvider(self.mock_settings) + + # Mock JWKS response + mock_jwks_response = { + "keys": [ + { + "kty": "RSA", + "kid": "test-key-1", + "use": "sig", + "alg": "RS256", + "n": "test-n-value-1", + "e": "AQAB" + }, + { + "kty": "RSA", + "kid": "test-key-2", + "use": "sig", + "alg": "RS256", + "n": "test-n-value-2", + "e": "AQAB" + } + ] + } + mock_requests_get.return_value.json.return_value = mock_jwks_response + + # Mock RSA algorithm + mock_public_key_1 = Mock() + mock_public_key_2 = Mock() + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_from_jwk.side_effect = [mock_public_key_1, mock_public_key_2] + + with patch.object(provider, 'get_jwks_url', return_value='https://test.jwks.url'): + public_keys = provider.get_public_keys() + + self.assertEqual(len(public_keys), 2) + self.assertEqual(public_keys[0], mock_public_key_1) + self.assertEqual(public_keys[1], mock_public_key_2) + + # Verify calls + mock_requests_get.assert_called_once_with('https://test.jwks.url') + self.assertEqual(mock_from_jwk.call_count, 2) + + +class TestCloudflareAccessProviderUnit(UnitTestCase): + """Unit tests for CloudflareAccessProvider class.""" + + def setUp(self): + """Set up test environment.""" + self.mock_settings = Mock() + self.mock_settings.enabled = True + self.mock_settings.enable_login = True + self.mock_settings.enable_user_reg = True + self.mock_settings.team_name = 'test-team' + self.mock_settings.aud_tag = 'test-aud-tag' + self.mock_settings.get_password.return_value = 'test-secret' + + def test_cloudflare_provider_initialization(self): + """Test CloudflareAccessProvider initialization.""" + provider = CloudflareAccessProvider(self.mock_settings) + + self.assertEqual(provider.settings, self.mock_settings) + self.assertTrue(isinstance(provider, BaseProvider)) + + def test_cloudflare_provider_properties(self): + """Test CloudflareAccessProvider properties.""" + provider = CloudflareAccessProvider(self.mock_settings) + + self.assertEqual(provider.team_name, 'test-team') + self.assertEqual(provider.aud_tag, 'test-aud-tag') + self.assertEqual(provider.jwt_header, 'Cf-Access-Token') + + def test_get_jwks_url(self): + """Test get_jwks_url method.""" + provider = CloudflareAccessProvider(self.mock_settings) + + jwks_url = provider.get_jwks_url() + expected_url = 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs' + + self.assertEqual(jwks_url, expected_url) + + def test_get_login_url_without_redirect(self): + """Test get_login_url without redirect parameter.""" + provider = CloudflareAccessProvider(self.mock_settings) + + login_url = provider.get_login_url() + expected_url = 'https://test-team.cloudflareaccess.com/cdn-cgi/access/login/test-aud-tag' + + self.assertEqual(login_url, expected_url) + + def test_get_login_url_with_redirect(self): + """Test get_login_url with redirect parameter.""" + provider = CloudflareAccessProvider(self.mock_settings) + + redirect_to = '/dashboard/reports' + login_url = provider.get_login_url(redirect_to) + + expected_path = '%2F' + quote(redirect_to, safe='') + expected_url = f'https://test-team.cloudflareaccess.com/cdn-cgi/access/login/test-aud-tag?redirect_url={expected_path}' + + self.assertEqual(login_url, expected_url) + + @patch('jwt_auth.providers.frappe.utils.get_url') + def test_get_logout_url(self, mock_get_url): + """Test get_logout_url method.""" + mock_get_url.return_value = 'https://mysite.example.com' + + provider = CloudflareAccessProvider(self.mock_settings) + logout_url = provider.get_logout_url() + + expected_site_url = quote('https://mysite.example.com', safe='') + expected_url = f'https://test-team.cloudflareaccess.com/cdn-cgi/access/logout?redirect_url={expected_site_url}' + + self.assertEqual(logout_url, expected_url) + mock_get_url.assert_called_once() + + @patch('jwt_auth.providers.requests.get') + def test_inherited_get_public_keys(self, mock_requests_get): + """Test that CloudflareAccessProvider inherits get_public_keys correctly.""" + provider = CloudflareAccessProvider(self.mock_settings) + + # Mock JWKS response that would come from Cloudflare + mock_jwks_response = { + "keys": [ + { + "kty": "RSA", + "kid": "cloudflare-key-1", + "use": "sig", + "alg": "RS256", + "n": "cloudflare-n-value", + "e": "AQAB" + } + ] + } + mock_requests_get.return_value.json.return_value = mock_jwks_response + + mock_public_key = Mock() + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk', return_value=mock_public_key): + public_keys = provider.get_public_keys() + + self.assertEqual(len(public_keys), 1) + self.assertEqual(public_keys[0], mock_public_key) + + # Verify it called the correct Cloudflare URL + expected_jwks_url = 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs' + mock_requests_get.assert_called_once_with(expected_jwks_url) + + +class TestProvidersIntegration(IntegrationTestCase): + """Integration tests for provider classes.""" + + def setUp(self): + """Set up test environment.""" + # Create test settings + try: + self.settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + self.settings = frappe.new_doc("JWT Auth Settings") + + # Add Cloudflare-specific fields if they don't exist + if not hasattr(self.settings, 'team_name'): + setattr(self.settings, 'team_name', 'test-team') + if not hasattr(self.settings, 'aud_tag'): + setattr(self.settings, 'aud_tag', 'test-aud-tag') + + self.settings.update({ + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'test-secret', + 'login_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/login/test-aud-tag', + 'logout_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url', + 'team_name': 'test-team', + 'aud_tag': 'test-aud-tag' + }) + + def test_cloudflare_provider_with_real_settings(self): + """Test CloudflareAccessProvider with real settings document.""" + provider = CloudflareAccessProvider(self.settings) + + # Test URL generation + login_url = provider.get_login_url('/dashboard') + self.assertIn('test-team.cloudflareaccess.com', login_url) + self.assertIn('test-aud-tag', login_url) + self.assertIn('redirect_url=', login_url) + + # Test JWKS URL + jwks_url = provider.get_jwks_url() + expected_jwks = 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs' + self.assertEqual(jwks_url, expected_jwks) + + def test_provider_integration_with_mocked_cloudflare_api(self): + """Test provider integration with mocked Cloudflare API responses.""" + provider = CloudflareAccessProvider(self.settings) + + # Mock Cloudflare JWKS endpoint response + mock_cloudflare_jwks = { + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": ["MIIC..."], + "n": "example-n-value", + "e": "AQAB", + "kid": "cloudflare-access-kid", + "x5t": "example-thumbprint" + } + ] + } + + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_response = Mock() + mock_response.json.return_value = mock_cloudflare_jwks + mock_requests.return_value = mock_response + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_public_key = Mock() + mock_from_jwk.return_value = mock_public_key + + public_keys = provider.get_public_keys() + + # Verify the integration worked + self.assertEqual(len(public_keys), 1) + self.assertEqual(public_keys[0], mock_public_key) + + # Verify correct Cloudflare URL was called + mock_requests.assert_called_once_with( + 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs' + ) + + def test_provider_url_encoding(self): + """Test that provider handles URL encoding correctly.""" + provider = CloudflareAccessProvider(self.settings) + + # Test complex redirect path with special characters + complex_path = '/app/form/User/test@example.com?tab=details§ion=profile' + login_url = provider.get_login_url(complex_path) + + # Should contain properly encoded redirect URL + self.assertIn('redirect_url=', login_url) + self.assertIn('%2F', login_url) # Forward slash should be encoded + + # Test logout URL encoding + with patch('jwt_auth.providers.frappe.utils.get_url') as mock_get_url: + mock_get_url.return_value = 'https://my-site.example.com:8000/app' + + logout_url = provider.get_logout_url() + + # Should contain properly encoded site URL + self.assertIn('redirect_url=', logout_url) + self.assertIn('https%3A//my-site.example.com', logout_url) + + def tearDown(self): + """Clean up after tests.""" + # Reset settings + if hasattr(self, 'settings'): + self.settings.enabled = 0 + self.settings.enable_user_reg = 0 + self.settings.enable_login = 0 \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..dab21fb --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,356 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +""" +Test utilities for JWT Auth testing suite. +Provides common mock objects and helper functions for testing. +""" + +import json +import jwt +import time +from unittest.mock import Mock, MagicMock +from datetime import datetime, timedelta + + +class MockCloudflareAccess: + """Mock Cloudflare Access API responses and behaviors.""" + + def __init__(self, team_name='test-team', aud_tag='test-aud', secret='test-secret'): + self.team_name = team_name + self.aud_tag = aud_tag + self.secret = secret + + def get_jwks_response(self, num_keys=1): + """Generate mock JWKS response.""" + keys = [] + for i in range(num_keys): + key = { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": [f"MIIC+DCCAeCgAwIBAgIJAKZ7...example_cert_data_{i}"], + "n": f"example_n_parameter_base64url_encoded_{i}", + "e": "AQAB", + "kid": f"cloudflare-access-key-{i}", + "x5t": f"example_x5t_thumbprint_{i}" + } + keys.append(key) + + return {"keys": keys} + + def get_valid_jwt_payload(self, email='user@example.com', extra_claims=None): + """Generate valid JWT payload.""" + payload = { + "aud": [self.secret], + "email": email, + "exp": int(time.time()) + 3600, # Expires in 1 hour + "iat": int(time.time()), + "iss": f"https://{self.team_name}.cloudflareaccess.com", + "sub": "1234567890abcdef", + "custom": { + "name": "John Doe", + "groups": ["Developers", "Users"] + }, + "identity_nonce": "example_nonce", + "common_name": email, + "country": "US" + } + + if extra_claims: + payload.update(extra_claims) + + return payload + + def get_expired_jwt_payload(self, email='user@example.com'): + """Generate expired JWT payload.""" + payload = self.get_valid_jwt_payload(email) + payload.update({ + "exp": int(time.time()) - 3600, # Expired 1 hour ago + "iat": int(time.time()) - 7200 # Issued 2 hours ago + }) + return payload + + def get_invalid_audience_jwt_payload(self, email='user@example.com'): + """Generate JWT payload with invalid audience.""" + payload = self.get_valid_jwt_payload(email) + payload["aud"] = ["wrong-audience"] + return payload + + def get_login_url(self, redirect_to=None): + """Generate expected Cloudflare login URL.""" + base_url = f"https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/login/{self.aud_tag}" + if redirect_to: + from urllib.parse import quote + path = '%2F' + quote(redirect_to, safe='') + base_url += f"?redirect_url={path}" + return base_url + + def get_logout_url(self, site_url='https://example.com'): + """Generate expected Cloudflare logout URL.""" + from urllib.parse import quote + encoded_site = quote(site_url, safe='') + return f"https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/logout?redirect_url={encoded_site}" + + def get_jwks_url(self): + """Generate expected Cloudflare JWKS URL.""" + return f"https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/certs" + + +class MockJWTAuthSettings: + """Mock JWT Auth Settings with reasonable defaults.""" + + def __init__(self, **kwargs): + # Default settings + defaults = { + 'enabled': True, + 'enable_user_reg': True, + 'enable_login': True, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'test-secret', + 'login_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/login/test-aud', + 'logout_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url', + 'team_name': 'test-team', + 'aud_tag': 'test-aud' + } + + # Override with provided kwargs + defaults.update(kwargs) + + # Set attributes + for key, value in defaults.items(): + setattr(self, key, value) + + # Mock the get_password method + self.get_password = Mock(return_value=self.jwt_private_secret) + + +class MockFrappeLocal: + """Mock frappe.local object.""" + + def __init__(self): + self.session = Mock() + self.session.user = 'Guest' + self.session.data = {} + + self.request = Mock() + self.request.cookies = Mock() + self.request.headers = Mock() + self.request.path = '/' + self.request.url = 'https://example.com/' + + self.login_manager = Mock() + self.response = {} + + +class MockFrappeDoc: + """Mock Frappe document.""" + + def __init__(self, doctype, name=None, **kwargs): + self.doctype = doctype + self.name = name or f"{doctype}-001" + + # Set provided fields + for key, value in kwargs.items(): + setattr(self, key, value) + + # Mock methods + self.get = Mock(side_effect=lambda field, default=None: getattr(self, field, default)) + self.has_value_changed = Mock(return_value=False) + self.save = Mock() + self.insert = Mock() + self.delete = Mock() + self.update = Mock() + + +class MockContact(MockFrappeDoc): + """Mock Contact document with JWT Auth specific behavior.""" + + def __init__(self, **kwargs): + defaults = { + 'first_name': 'John', + 'last_name': 'Doe', + 'full_name': 'John Doe', + 'email_id': 'john.doe@example.com', + 'phone': '123-456-7890', + 'mobile_no': '987-654-3210', + 'gender': 'Male', + 'company_name': 'Test Company', + 'user': None + } + defaults.update(kwargs) + super().__init__('Contact', **defaults) + + +class MockUser(MockFrappeDoc): + """Mock User document.""" + + def __init__(self, **kwargs): + defaults = { + 'email': 'user@example.com', + 'username': 'user@example.com', + 'first_name': 'Test', + 'last_name': 'User', + 'full_name': 'Test User', + 'enabled': 1, + 'send_welcome_email': 0 + } + defaults.update(kwargs) + super().__init__('User', **defaults) + + +def create_mock_request(token=None, token_in='cookie', path='/', headers=None): + """Create a mock request object with JWT token.""" + mock_request = Mock() + mock_request.path = path + mock_request.url = f'https://example.com{path}' + + # Setup cookies + mock_request.cookies = Mock() + if token_in == 'cookie' and token: + mock_request.cookies.get = Mock(side_effect=lambda key: token if key == 'Cf-Access-Token' else None) + else: + mock_request.cookies.get = Mock(return_value=None) + + # Setup headers + mock_request.headers = Mock() + if token_in == 'header' and token: + mock_request.headers.get = Mock(side_effect=lambda key: token if key == 'Cf-Access-Token' else None) + else: + mock_request.headers.get = Mock(return_value=None) + + # Add any extra headers + if headers: + for key, value in headers.items(): + mock_request.headers.get = Mock(side_effect=lambda k: headers.get(k)) + + return mock_request + + +def create_mock_response(): + """Create a mock HTTP response object.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {} + return mock_response + + +def setup_mock_frappe_environment(): + """Set up common mock frappe environment.""" + mocks = { + 'local': MockFrappeLocal(), + 'session': Mock(), + 'flags': Mock(), + 'cache': Mock(), + 'db': Mock(), + 'utils': Mock() + } + + # Setup common return values + mocks['session'].get = Mock(return_value='Guest') + mocks['flags'].get = Mock(return_value=False) + mocks['cache'].return_value = Mock() + mocks['db'].get_value = Mock(return_value=None) + mocks['db'].exists = Mock(return_value=False) + mocks['db'].commit = Mock() + mocks['utils'].get_url = Mock(return_value='https://example.com') + + return mocks + + +def assert_cloudflare_urls(test_case, provider, team_name, aud_tag): + """Assert that provider generates correct Cloudflare URLs.""" + import unittest.mock + + # Test JWKS URL + jwks_url = provider.get_jwks_url() + expected_jwks = f'https://{team_name}.cloudflareaccess.com/cdn-cgi/access/certs' + test_case.assertEqual(jwks_url, expected_jwks) + + # Test login URL without redirect + login_url = provider.get_login_url() + expected_login = f'https://{team_name}.cloudflareaccess.com/cdn-cgi/access/login/{aud_tag}' + test_case.assertEqual(login_url, expected_login) + + # Test login URL with redirect + login_url_with_redirect = provider.get_login_url('/dashboard') + test_case.assertIn('redirect_url=', login_url_with_redirect) + test_case.assertIn('%2F', login_url_with_redirect) + + # Test logout URL + with unittest.mock.patch('jwt_auth.providers.frappe.utils.get_url', return_value='https://example.com'): + logout_url = provider.get_logout_url() + expected_logout_base = f'https://{team_name}.cloudflareaccess.com/cdn-cgi/access/logout' + test_case.assertIn(expected_logout_base, logout_url) + test_case.assertIn('redirect_url=', logout_url) + + +def create_test_jwt_settings(docname="JWT Auth Settings", **overrides): + """Create test JWT Auth Settings document.""" + import frappe + + try: + settings = frappe.get_doc(docname) + except frappe.DoesNotExistError: + settings = frappe.new_doc(docname) + + default_values = { + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': 'test-secret', + 'login_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/login/test-aud', + 'logout_url': 'https://test-team.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url' + } + + # Apply overrides + default_values.update(overrides) + + # Set values + for key, value in default_values.items(): + setattr(settings, key, value) + + return settings + + +# Common test data +VALID_JWT_CLAIMS = { + "aud": ["test-secret"], + "email": "test@example.com", + "exp": int(time.time()) + 3600, + "iat": int(time.time()), + "iss": "https://test-team.cloudflareaccess.com", + "sub": "1234567890", + "common_name": "test@example.com" +} + +EXPIRED_JWT_CLAIMS = { + **VALID_JWT_CLAIMS, + "exp": int(time.time()) - 3600, + "iat": int(time.time()) - 7200 +} + +INVALID_AUD_JWT_CLAIMS = { + **VALID_JWT_CLAIMS, + "aud": ["wrong-audience"] +} + +CLOUDFLARE_JWKS_RESPONSE = { + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": ["MIIC+DCCAeCgAwIBAgIJAKZ7..."], + "n": "example_n_parameter", + "e": "AQAB", + "kid": "cloudflare-key-1", + "x5t": "example_thumbprint" + } + ] +} \ No newline at end of file From 580b566badf8f2c9c3e8223e773dc8a15cd47b97 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Jun 2025 13:37:21 +0000 Subject: [PATCH 3/3] Add comprehensive integration test scenarios Co-authored-by: batonac <4996285+batonac@users.noreply.github.com> --- tests/test_integration_scenarios.py | 382 ++++++++++++++++++++++++++++ 1 file changed, 382 insertions(+) create mode 100644 tests/test_integration_scenarios.py diff --git a/tests/test_integration_scenarios.py b/tests/test_integration_scenarios.py new file mode 100644 index 0000000..96bab67 --- /dev/null +++ b/tests/test_integration_scenarios.py @@ -0,0 +1,382 @@ +# Copyright (c) 2024, Avunu LLC and Contributors +# See license.txt + +""" +Integration test demonstrating the complete JWT Auth testing approach. +This file shows how all components work together in a realistic scenario. +""" + +import json +import jwt +import time +from unittest.mock import Mock, patch, MagicMock + +import frappe +from frappe.tests import IntegrationTestCase + +from jwt_auth.auth import JWTAuth, SessionJWTAuth +from jwt_auth.providers import CloudflareAccessProvider +from jwt_auth.tests.test_utils import MockCloudflareAccess, MockJWTAuthSettings, create_mock_request + + +class TestJWTAuthFullIntegration(IntegrationTestCase): + """Full integration test demonstrating complete JWT Auth workflow.""" + + def setUp(self): + """Set up a complete test environment.""" + # Create test settings + try: + self.settings = frappe.get_doc("JWT Auth Settings") + except frappe.DoesNotExistError: + self.settings = frappe.new_doc("JWT Auth Settings") + + # Configure settings for Cloudflare Access + self.team_name = 'integration-test-team' + self.aud_tag = 'integration-test-aud' + self.secret = 'integration-test-secret' + + self.settings.update({ + 'enabled': 1, + 'enable_user_reg': 1, + 'enable_login': 1, + 'jwt_header': 'Cf-Access-Token', + 'jwks_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/certs', + 'jwt_private_secret': self.secret, + 'login_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/login/{self.aud_tag}', + 'logout_url': f'https://{self.team_name}.cloudflareaccess.com/cdn-cgi/access/logout', + 'redirect_param': 'redirect_url', + 'team_name': self.team_name, + 'aud_tag': self.aud_tag + }) + self.settings.save() + + # Create mock Cloudflare Access helper + self.cf_mock = MockCloudflareAccess(self.team_name, self.aud_tag, self.secret) + + # Test user email + self.test_email = 'integration-test@example.com' + + def test_complete_authentication_workflow(self): + """Test the complete authentication workflow from start to finish.""" + # Step 1: User visits protected page without authentication + with patch('jwt_auth.auth.frappe.local') as mock_local: + mock_local.session.user = 'Guest' + mock_local.request = create_mock_request(path='/protected-page') + + auth = JWTAuth('/protected-page') + + # Should not be able to authenticate without token + self.assertFalse(auth.can_auth()) + + # Step 2: User gets redirected to Cloudflare Access login + provider = CloudflareAccessProvider(self.settings) + login_url = provider.get_login_url('/protected-page') + + expected_login_url = self.cf_mock.get_login_url('/protected-page') + self.assertEqual(login_url, expected_login_url) + + # Step 3: User authenticates with Cloudflare and returns with JWT token + jwt_token = 'mock.jwt.token.from.cloudflare' + jwt_claims = self.cf_mock.get_valid_jwt_payload(self.test_email) + + with patch('jwt_auth.auth.frappe.local') as mock_local: + with patch('jwt_auth.auth.frappe.qb') as mock_qb: + # Setup mock request with JWT token + mock_local.session.user = 'Guest' + mock_local.request = create_mock_request( + token=jwt_token, + token_in='cookie', + path='/protected-page' + ) + mock_local.login_manager = Mock() + + # Mock user lookup - no existing user + mock_contact = Mock() + mock_contact_email = Mock() + mock_qb.DocType.side_effect = [mock_contact, mock_contact_email] + mock_qb.from_.return_value.select.return_value.join.return_value.on.return_value.where.return_value.run.return_value = [] + + auth = JWTAuth('/protected-page') + + # Mock JWT validation + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + with patch('jwt_auth.auth.jwt.decode', return_value=jwt_claims): + # Should be able to authenticate with valid token + self.assertTrue(auth.can_auth()) + self.assertEqual(auth.token, jwt_token) + self.assertEqual(auth.claims, jwt_claims) + + # Step 4: User registration (since enable_user_reg is True) + with patch('jwt_auth.auth.frappe.db') as mock_db: + with patch('jwt_auth.auth.frappe.get_doc') as mock_get_doc: + with patch('jwt_auth.auth.frappe.db.commit'): + # Mock no existing contact + mock_db.get_value.return_value = None + + # Mock user document creation + mock_user = Mock() + mock_get_doc.return_value = mock_user + + # Trigger authentication + auth.auth() + + # Verify user registration was attempted + user_doc_args = mock_get_doc.call_args[0][0] + self.assertEqual(user_doc_args['doctype'], 'User') + self.assertEqual(user_doc_args['email'], self.test_email) + + # Verify login was attempted + mock_local.login_manager.login_as.assert_called_once_with(self.test_email) + + def test_existing_user_authentication_workflow(self): + """Test authentication workflow for existing user.""" + jwt_token = 'existing.user.jwt.token' + jwt_claims = self.cf_mock.get_valid_jwt_payload(self.test_email) + + with patch('jwt_auth.auth.frappe.local') as mock_local: + with patch('jwt_auth.auth.frappe.qb') as mock_qb: + # Setup mock request + mock_local.session.user = 'Guest' + mock_local.request = create_mock_request( + token=jwt_token, + token_in='header', + path='/dashboard' + ) + mock_local.login_manager = Mock() + + # Mock existing user lookup + mock_contact = Mock() + mock_contact_email = Mock() + mock_qb.DocType.side_effect = [mock_contact, mock_contact_email] + mock_qb.from_.return_value.select.return_value.join.return_value.on.return_value.where.return_value.run.return_value = [ + {'user': self.test_email} + ] + + auth = JWTAuth('/dashboard') + + # Mock JWT validation + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + with patch('jwt_auth.auth.jwt.decode', return_value=jwt_claims): + # Authenticate existing user + self.assertTrue(auth.can_auth()) + + # Trigger authentication + auth.auth() + + # Verify existing user was logged in directly + mock_local.login_manager.login_as.assert_called_once_with(self.test_email) + + def test_logout_workflow(self): + """Test the logout workflow.""" + # Test programmatic logout + with patch('jwt_auth.auth.SessionJWTAuth') as mock_session_auth: + with patch('jwt_auth.auth.frappe.local') as mock_local: + mock_auth = Mock() + mock_auth.settings.enabled = True + mock_auth.get_logout_url.return_value = self.cf_mock.get_logout_url() + mock_session_auth.return_value = mock_auth + + mock_local.login_manager = Mock() + + from jwt_auth.auth import jwt_logout + result = jwt_logout() + + # Verify logout and redirect + mock_local.login_manager.logout.assert_called_once() + expected_logout_url = self.cf_mock.get_logout_url() + self.assertEqual(result['redirect_url'], expected_logout_url) + + def test_error_scenarios(self): + """Test various error scenarios.""" + # Test expired token + expired_claims = self.cf_mock.get_expired_jwt_payload(self.test_email) + + with patch('jwt_auth.auth.frappe.local') as mock_local: + mock_local.session.user = 'Guest' + mock_local.request = create_mock_request(token='expired.token') + + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + with patch('jwt_auth.auth.jwt.decode', side_effect=jwt.ExpiredSignatureError): + # Should not be able to authenticate with expired token + self.assertFalse(auth.can_auth()) + + # Test invalid audience + with patch('jwt_auth.auth.frappe.local') as mock_local: + mock_local.session.user = 'Guest' + mock_local.request = create_mock_request(token='invalid.aud.token') + + auth = JWTAuth() + + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + with patch('jwt_auth.auth.jwt.decode', side_effect=jwt.InvalidAudienceError): + # Should not be able to authenticate with invalid audience + self.assertFalse(auth.can_auth()) + + def test_provider_integration(self): + """Test provider integration with settings.""" + provider = CloudflareAccessProvider(self.settings) + + # Test all URL generation methods + self.assertEqual(provider.get_jwks_url(), self.cf_mock.get_jwks_url()) + self.assertEqual(provider.get_login_url(), self.cf_mock.get_login_url()) + + # Test with redirect + login_with_redirect = provider.get_login_url('/app/dashboard') + expected_with_redirect = self.cf_mock.get_login_url('/app/dashboard') + self.assertEqual(login_with_redirect, expected_with_redirect) + + # Test public key retrieval + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_response = Mock() + mock_response.json.return_value = self.cf_mock.get_jwks_response() + mock_requests.return_value = mock_response + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_public_key = Mock() + mock_from_jwk.return_value = mock_public_key + + public_keys = provider.get_public_keys() + + self.assertEqual(len(public_keys), 1) + self.assertEqual(public_keys[0], mock_public_key) + + # Verify correct Cloudflare URL was called + mock_requests.assert_called_once_with(self.cf_mock.get_jwks_url()) + + def test_session_jwt_auth_wrapper(self): + """Test SessionJWTAuth wrapper functionality.""" + with patch('jwt_auth.auth.frappe.local') as mock_local: + # Test first initialization + mock_local.jwt_auth = None + + with patch('jwt_auth.auth.JWTAuth') as mock_jwt_auth: + mock_jwt_auth_instance = Mock() + mock_jwt_auth.return_value = mock_jwt_auth_instance + + session_auth = SessionJWTAuth('/test', 200) + + # Verify JWTAuth was created + mock_jwt_auth.assert_called_once_with('/test', 200) + self.assertEqual(mock_local.jwt_auth, mock_jwt_auth_instance) + + # Test attribute delegation + mock_jwt_auth_instance.test_method.return_value = 'test_result' + result = session_auth.test_method() + self.assertEqual(result, 'test_result') + + def tearDown(self): + """Clean up after tests.""" + # Reset settings + self.settings.enabled = 0 + self.settings.enable_user_reg = 0 + self.settings.enable_login = 0 + self.settings.save() + + # Clean up any test users/contacts + if frappe.db.exists('User', self.test_email): + frappe.delete_doc('User', self.test_email, ignore_permissions=True) + + +class TestRealisticCloudflareScenarios(IntegrationTestCase): + """Test realistic Cloudflare Access scenarios.""" + + def setUp(self): + """Set up for realistic scenarios.""" + self.cf_mock = MockCloudflareAccess('mycompany', 'production-app', 'my-app-audience-secret') + + def test_production_like_jwt_validation(self): + """Test JWT validation with production-like claims.""" + # Realistic Cloudflare Access JWT claims + realistic_claims = { + "aud": ["my-app-audience-secret"], + "email": "john.doe@mycompany.com", + "exp": int(time.time()) + 3600, + "iat": int(time.time()), + "iss": "https://mycompany.cloudflareaccess.com", + "sub": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "custom": { + "name": "John Doe", + "department": "Engineering", + "groups": ["Employees", "Developers", "Admin"] + }, + "identity_nonce": "12345abcdef", + "common_name": "john.doe@mycompany.com", + "country": "US", + "ip": "192.168.1.100" + } + + # Mock settings for production-like environment + mock_settings = MockJWTAuthSettings( + team_name='mycompany', + aud_tag='production-app', + jwt_private_secret='my-app-audience-secret', + jwks_url='https://mycompany.cloudflareaccess.com/cdn-cgi/access/certs' + ) + + with patch('jwt_auth.auth.frappe.get_cached_doc', return_value=mock_settings): + auth = JWTAuth() + + # Mock successful JWT decode with realistic claims + with patch.object(auth, 'get_public_keys', return_value=[Mock()]): + with patch('jwt_auth.auth.jwt.decode', return_value=realistic_claims): + result = auth.is_valid_token('realistic.production.token') + + self.assertTrue(result) + self.assertEqual(auth.claims, realistic_claims) + self.assertEqual(auth.claims['email'], 'john.doe@mycompany.com') + self.assertEqual(auth.claims['custom']['department'], 'Engineering') + + def test_multiple_keys_rotation_scenario(self): + """Test key rotation scenario with multiple JWKS keys.""" + # Realistic JWKS response with multiple keys (key rotation) + jwks_with_rotation = { + "keys": [ + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": ["MIIC...old_cert"], + "n": "old_key_n_parameter", + "e": "AQAB", + "kid": "mycompany-2023-key", + "x5t": "old_thumbprint" + }, + { + "alg": "RS256", + "kty": "RSA", + "use": "sig", + "x5c": ["MIIC...new_cert"], + "n": "new_key_n_parameter", + "e": "AQAB", + "kid": "mycompany-2024-key", + "x5t": "new_thumbprint" + } + ] + } + + provider = CloudflareAccessProvider( + MockJWTAuthSettings(team_name='mycompany', aud_tag='production-app') + ) + + with patch('jwt_auth.providers.requests.get') as mock_requests: + mock_response = Mock() + mock_response.json.return_value = jwks_with_rotation + mock_requests.return_value = mock_response + + with patch('jwt_auth.providers.jwt.algorithms.RSAAlgorithm.from_jwk') as mock_from_jwk: + mock_old_key = Mock() + mock_new_key = Mock() + mock_from_jwk.side_effect = [mock_old_key, mock_new_key] + + public_keys = provider.get_public_keys() + + # Should have both keys available for validation + self.assertEqual(len(public_keys), 2) + self.assertEqual(public_keys[0], mock_old_key) + self.assertEqual(public_keys[1], mock_new_key) + + def tearDown(self): + """Clean up.""" + pass \ No newline at end of file