diff --git a/docs/configuration.md b/docs/configuration.md index 9a12834..9fe9a3a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -14,6 +14,9 @@ options = KsefClientOptions( proxy=None, custom_headers={"X-Custom-Header": "value"}, follow_redirects=False, + strict_presigned_url_validation=True, + allowed_presigned_hosts=None, + allow_private_network_presigned_urls=False, base_qr_url=None, ) ``` @@ -62,6 +65,18 @@ Opcja zwykle nie jest potrzebna. Włączenie ma uzasadnienie wyłącznie w środ Domyślnie `True`. Wyłączenie ma uzasadnienie wyłącznie w specyficznych środowiskach testowych (np. z własnym MITM/proxy). +### `strict_presigned_url_validation` + +Domyślnie `True`. Dla absolutnych URL używanych z `skip_auth=True` wymusza `https`. Przy wyłączeniu możliwe są URL `http`, ale nadal działa walidacja hosta/IP. + +### `allowed_presigned_hosts` + +Domyślnie `None` (brak allowlisty). Jeśli ustawione, host pre-signed URL musi pasować dokładnie albo jako subdomena (np. `a.uploads.example.com` pasuje do `uploads.example.com`). + +### `allow_private_network_presigned_urls` + +Domyślnie `False`. Gdy `False`, blokowane są hosty IP prywatne/link-local/reserved dla żądań `skip_auth=True`. Ustaw `True` wyłącznie w kontrolowanym środowisku. + ## Przekazywanie `access_token` Dostępne są dwa sposoby przekazywania `access_token`: diff --git a/docs/errors.md b/docs/errors.md index e299817..d996a1c 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -4,6 +4,10 @@ Obsługa błędów jest oparta o kody HTTP (>= 400). Biblioteka nie interpretuje ## Typy wyjątków +### `ValueError` (walidacja pre-signed URL) + +Dla żądań z `skip_auth=True` i absolutnym URL biblioteka wykonuje walidację bezpieczeństwa. W przypadku niespełnienia reguł (np. host `localhost`, loopback/private IP bez opt-in, host poza allowlistą, albo `http` przy `strict_presigned_url_validation=True`) rzucany jest `ValueError` z komunikatem bezpieczeństwa. + ### `KsefHttpError` Bazowy błąd HTTP. diff --git a/src/ksef_client/config.py b/src/ksef_client/config.py index 4f5e00c..e384a84 100644 --- a/src/ksef_client/config.py +++ b/src/ksef_client/config.py @@ -40,6 +40,9 @@ class KsefClientOptions: custom_headers: dict[str, str] | None = None follow_redirects: bool = False verify_ssl: bool = True + strict_presigned_url_validation: bool = True + allowed_presigned_hosts: list[str] | None = None + allow_private_network_presigned_urls: bool = False user_agent: str = field(default_factory=_default_user_agent) def normalized_base_url(self) -> str: diff --git a/src/ksef_client/http.py b/src/ksef_client/http.py index dc28681..906c8ac 100644 --- a/src/ksef_client/http.py +++ b/src/ksef_client/http.py @@ -1,7 +1,9 @@ from __future__ import annotations +import ipaddress from dataclasses import dataclass from typing import Any +from urllib.parse import urlparse import httpx @@ -17,6 +19,75 @@ def _merge_headers(base: dict[str, str], extra: dict[str, str] | None) -> dict[s return merged +def _is_absolute_http_url(url: str) -> bool: + return url.startswith("http://") or url.startswith("https://") + + +def _host_allowed(host: str, allowed_hosts: list[str]) -> bool: + normalized_host = host.lower().rstrip(".") + for allowed in allowed_hosts: + normalized_allowed = allowed.lower().strip().rstrip(".") + if not normalized_allowed: + continue + if normalized_host == normalized_allowed: + return True + try: + ipaddress.ip_address(normalized_allowed) + continue + except ValueError: + pass + if normalized_host.endswith("." + normalized_allowed): + return True + return False + + +def _validate_presigned_url_security(options: KsefClientOptions, url: str) -> None: + parsed = urlparse(url) + host = parsed.hostname + if not host: + raise ValueError("Rejected insecure presigned URL: host is missing.") + + normalized_host = host.lower().rstrip(".") + if normalized_host == "localhost" or normalized_host.endswith(".localhost"): + raise ValueError( + "Rejected insecure presigned URL: localhost hosts are not allowed " + "for skip_auth requests." + ) + + if options.strict_presigned_url_validation and parsed.scheme != "https": + raise ValueError( + "Rejected insecure presigned URL: https is required for skip_auth requests." + ) + + try: + host_ip = ipaddress.ip_address(normalized_host) + except ValueError: + host_ip = None + + if host_ip is not None: + if host_ip.is_loopback: + raise ValueError( + "Rejected insecure presigned URL: loopback addresses are not allowed " + "for skip_auth requests." + ) + if ( + not options.allow_private_network_presigned_urls + and (host_ip.is_private or host_ip.is_link_local or host_ip.is_reserved) + ): + raise ValueError( + "Rejected insecure presigned URL: private, link-local, and reserved " + "IP hosts are blocked for skip_auth requests." + ) + + if options.allowed_presigned_hosts and not _host_allowed( + normalized_host, options.allowed_presigned_hosts + ): + raise ValueError( + "Rejected insecure presigned URL: host is not in allowed_presigned_hosts " + "for skip_auth requests." + ) + + @dataclass class HttpResponse: status_code: int @@ -60,8 +131,10 @@ def request( expected_status: set[int] | None = None, ) -> HttpResponse: url = path - if not url.startswith("http://") and not url.startswith("https://"): + if not _is_absolute_http_url(url): url = self._options.normalized_base_url().rstrip("/") + "/" + path.lstrip("/") + elif skip_auth: + _validate_presigned_url_security(self._options, url) base_headers = { "User-Agent": self._options.user_agent, @@ -166,8 +239,10 @@ async def request( expected_status: set[int] | None = None, ) -> HttpResponse: url = path - if not url.startswith("http://") and not url.startswith("https://"): + if not _is_absolute_http_url(url): url = self._options.normalized_base_url().rstrip("/") + "/" + path.lstrip("/") + elif skip_auth: + _validate_presigned_url_security(self._options, url) base_headers = { "User-Agent": self._options.user_agent, diff --git a/tests/test_http.py b/tests/test_http.py index 95b42f9..b397b61 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -5,7 +5,14 @@ from ksef_client.config import KsefClientOptions from ksef_client.exceptions import KsefApiError, KsefHttpError, KsefRateLimitError -from ksef_client.http import AsyncBaseHttpClient, BaseHttpClient, HttpResponse, _merge_headers +from ksef_client.http import ( + AsyncBaseHttpClient, + BaseHttpClient, + HttpResponse, + _host_allowed, + _merge_headers, + _validate_presigned_url_security, +) class HttpTests(unittest.TestCase): @@ -119,6 +126,93 @@ def test_default_status_error(self): ): client.request("GET", "/path") + def test_skip_auth_presigned_url_accepts_valid_https(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = BaseHttpClient(options) + response = httpx.Response(200, json={"ok": True}) + with patch.object(client._client, "request", Mock(return_value=response)) as request_mock: + client.request("GET", "https://files.example.com/upload", skip_auth=True) + _, kwargs = request_mock.call_args + self.assertEqual(kwargs["url"], "https://files.example.com/upload") + self.assertNotIn("Authorization", kwargs["headers"]) + + def test_skip_auth_presigned_url_rejects_http_when_strict(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = BaseHttpClient(options) + with self.assertRaisesRegex(ValueError, "https is required"): + client.request("GET", "http://files.example.com/upload", skip_auth=True) + + def test_skip_auth_presigned_url_allows_http_when_not_strict(self): + options = KsefClientOptions( + base_url="https://api-test.ksef.mf.gov.pl", + strict_presigned_url_validation=False, + ) + client = BaseHttpClient(options) + response = httpx.Response(200, json={"ok": True}) + with patch.object(client._client, "request", Mock(return_value=response)) as request_mock: + client.request("GET", "http://files.example.com/upload", skip_auth=True) + _, kwargs = request_mock.call_args + self.assertEqual(kwargs["url"], "http://files.example.com/upload") + + def test_skip_auth_presigned_url_rejects_localhost_and_loopback(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = BaseHttpClient(options) + with self.assertRaisesRegex(ValueError, "localhost"): + client.request("GET", "https://localhost/upload", skip_auth=True) + with self.assertRaisesRegex(ValueError, "loopback"): + client.request("GET", "https://127.0.0.1/upload", skip_auth=True) + + def test_skip_auth_presigned_url_rejects_private_ip_by_default(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = BaseHttpClient(options) + with self.assertRaisesRegex(ValueError, "private, link-local, and reserved IP"): + client.request("GET", "https://10.1.2.3/upload", skip_auth=True) + + def test_skip_auth_presigned_url_allows_private_ip_when_opted_in(self): + options = KsefClientOptions( + base_url="https://api-test.ksef.mf.gov.pl", + allow_private_network_presigned_urls=True, + ) + client = BaseHttpClient(options) + response = httpx.Response(200, json={"ok": True}) + with patch.object(client._client, "request", Mock(return_value=response)) as request_mock: + client.request("GET", "https://10.1.2.3/upload", skip_auth=True) + _, kwargs = request_mock.call_args + self.assertEqual(kwargs["url"], "https://10.1.2.3/upload") + + def test_skip_auth_presigned_url_allowlist_exact_and_subdomain(self): + options = KsefClientOptions( + base_url="https://api-test.ksef.mf.gov.pl", + allowed_presigned_hosts=["uploads.example.com"], + ) + client = BaseHttpClient(options) + response = httpx.Response(200, json={"ok": True}) + with patch.object(client._client, "request", Mock(return_value=response)): + client.request("GET", "https://uploads.example.com/path", skip_auth=True) + client.request("GET", "https://sub.uploads.example.com/path", skip_auth=True) + + def test_skip_auth_presigned_url_allowlist_rejects_other_hosts(self): + options = KsefClientOptions( + base_url="https://api-test.ksef.mf.gov.pl", + allowed_presigned_hosts=["uploads.example.com"], + ) + client = BaseHttpClient(options) + with self.assertRaisesRegex(ValueError, "allowed_presigned_hosts"): + client.request("GET", "https://other.example.com/path", skip_auth=True) + + def test_host_allowed_skips_empty_and_ip_allowlist_entries(self): + self.assertTrue( + _host_allowed( + "sub.uploads.example.com", + ["", "10.0.0.1", "uploads.example.com"], + ) + ) + + def test_validate_presigned_url_security_rejects_missing_host(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + with self.assertRaisesRegex(ValueError, "host is missing"): + _validate_presigned_url_security(options, "https:///no-host") + class AsyncHttpTests(unittest.IsolatedAsyncioTestCase): async def test_async_request(self): @@ -193,6 +287,12 @@ async def test_async_raise_for_status_paths(self): with self.assertRaises(KsefHttpError): client._raise_for_status(response_http) + async def test_async_skip_auth_presigned_validation_rejects_localhost(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = AsyncBaseHttpClient(options) + with self.assertRaisesRegex(ValueError, "localhost"): + await client.request("GET", "https://localhost/upload", skip_auth=True) + if __name__ == "__main__": unittest.main()