diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f80a43b..6b896aa 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,6 +10,8 @@ jobs: github-release: name: Create GitHub Release runs-on: ubuntu-latest + environment: + name: release permissions: contents: write steps: diff --git a/README.md b/README.md index 8defa5c..279b4bf 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,80 @@ headers = { +### 会话管理(实验性功能) + +`SessionManager` 用于模拟真实用户会话,维护状态化的签名参数,可能有助于提升长期稳定性。 + +**注意**:此功能基于 [#86](https://github.com/Cloxl/xhshow/issues/86) 理论分析,实际效果待验证。建议先在测试环境使用。 + +#### 基本使用 + +```python +from xhshow import Xhshow, SessionManager +import requests + +client = Xhshow() +session = SessionManager() # 创建会话管理器 + +cookies = {"a1": "...", "web_session": "...", "webId": "..."} + +# 使用 session 参数进行签名 +headers = client.sign_headers_get( + uri="/api/sns/web/v1/user_posted", + cookies=cookies, + params={"num": "30"}, + session=session # 传入 session +) + +response = requests.get( + "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted", + params={"num": "30"}, + headers=headers, + cookies=cookies +) + +# 同一个 session 可以在多次请求中复用 +headers2 = client.sign_headers_get( + uri="/api/sns/web/v1/homefeed", + cookies=cookies, + params={"page": "1"}, + session=session # 复用同一个 session +) +``` + +#### 账户池管理 + +如果你有多个账户,需要为每个账户创建独立的 `SessionManager`: + +```python +accounts = [ + {"a1": "account1_a1", "web_session": "session1"}, + {"a1": "account2_a1", "web_session": "session2"}, +] + +# 为每个账户创建独立的 session +sessions = {} +for account in accounts: + sessions[account["a1"]] = SessionManager() + +# 使用时匹配账户和对应的 session +for account in accounts: + headers = client.sign_headers_get( + uri="/api/sns/web/v1/user_posted", + cookies=account, + params={"num": "30"}, + session=sessions[account["a1"]] # 使用对应账户的 session + ) +``` + +#### 工作原理 + +- **无 Session**:每次请求生成随机参数,可能被识别为机器人行为 +- **有 Session**:维护固定的页面加载时间戳和单调递增的计数器,模拟真实用户在同一页面中的连续操作 + +**适用场景:** +- 长期运行的爬虫或服务 + ### 解密签名 ```python diff --git a/src/xhshow/__init__.py b/src/xhshow/__init__.py index a139d42..249bd0f 100644 --- a/src/xhshow/__init__.py +++ b/src/xhshow/__init__.py @@ -1,6 +1,7 @@ from .client import Xhshow from .config import CryptoConfig from .core.crypto import CryptoProcessor +from .session import SessionManager, SignState __version__ = "0.1.0" -__all__ = ["CryptoConfig", "CryptoProcessor", "Xhshow"] +__all__ = ["CryptoConfig", "CryptoProcessor", "SessionManager", "SignState", "Xhshow"] diff --git a/src/xhshow/client.py b/src/xhshow/client.py index a44d1c0..5a419fc 100644 --- a/src/xhshow/client.py +++ b/src/xhshow/client.py @@ -7,6 +7,7 @@ from .config import CryptoConfig from .core.common_sign import XsCommonSigner from .core.crypto import CryptoProcessor +from .session import SessionManager, SignState from .utils.random_gen import RandomGenerator from .utils.url_utils import build_url, extract_uri from .utils.validators import ( @@ -16,7 +17,7 @@ validate_xs_common_params, ) -__all__ = ["Xhshow"] +__all__ = ["Xhshow", "SessionManager", "SignState"] class Xhshow: @@ -111,6 +112,7 @@ def sign_xs( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> str: """ Generate request signature (supports GET and POST) @@ -127,6 +129,7 @@ def sign_xs( - GET request: params value - POST request: payload value timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -136,15 +139,20 @@ def sign_xs( ValueError: Parameter value error """ uri = extract_uri(uri) - - signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy() - content_string = self._build_content_string(method, uri, payload) - d_value = self._generate_d_value(content_string) - signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + self._build_signature( - d_value, a1_value, xsec_appid, content_string, timestamp + + sign_state = session.get_current_state(uri) if session else None + + payload_array = self.crypto_processor.build_payload_array( + d_value, a1_value, xsec_appid, content_string, timestamp, sign_state=sign_state ) + xor_result = self.crypto_processor.bit_ops.xor_transform_array(payload_array) + x3_signature = self.crypto_processor.b64encoder.encode_x3(xor_result[:124]) + + signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy() + signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + x3_signature + return self.crypto_processor.config.XYS_PREFIX + self.crypto_processor.b64encoder.encode( json.dumps(signature_data, separators=(",", ":"), ensure_ascii=False) ) @@ -174,6 +182,7 @@ def sign_xs_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> str: """ Generate GET request signature (convenience method) @@ -186,6 +195,7 @@ def sign_xs_get( xsec_appid: Application identifier, defaults to `xhs-pc-web` params: GET request parameters timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -194,7 +204,7 @@ def sign_xs_get( TypeError: Parameter type error ValueError: Parameter value error """ - return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp) + return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp, session=session) @validate_post_signature_params def sign_xs_post( @@ -204,6 +214,7 @@ def sign_xs_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> str: """ Generate POST request signature (convenience method) @@ -216,6 +227,7 @@ def sign_xs_post( xsec_appid: Application identifier, defaults to `xhs-pc-web` payload: POST request body data timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -224,7 +236,7 @@ def sign_xs_post( TypeError: Parameter type error ValueError: Parameter value error """ - return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp) + return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp, session=session) @validate_xs_common_params def sign_xsc( @@ -406,6 +418,7 @@ def sign_headers( params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers with signature and trace IDs @@ -418,6 +431,7 @@ def sign_headers( params: GET request parameters (only used when method="GET") payload: POST request body data (only used when method="POST") timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid @@ -447,7 +461,6 @@ def sign_headers( method_upper = method.upper() - # Validate method and parameters if method_upper == "GET": if payload is not None: raise ValueError("GET requests must use 'params', not 'payload'") @@ -460,12 +473,11 @@ def sign_headers( raise ValueError(f"Unsupported method: {method}") cookie_dict = self._parse_cookies(cookies) - a1_value = cookie_dict.get("a1") if not a1_value: raise ValueError("Missing 'a1' in cookies") - x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp) + x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp, session) x_s_common = self.sign_xs_common(cookie_dict) x_t = self.get_x_t(timestamp) x_b3_traceid = self.get_b3_trace_id() @@ -486,6 +498,7 @@ def sign_headers_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers for GET request (convenience method) @@ -496,20 +509,12 @@ def sign_headers_get( xsec_appid: Application identifier, defaults to `xhs-pc-web` params: GET request parameters timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid - - Examples: - >>> client = Xhshow() - >>> cookies = {"a1": "your_a1_value", "web_session": "..."} - >>> headers = client.sign_headers_get( - ... uri="/api/sns/web/v1/user_posted", - ... cookies=cookies, - ... params={"num": "30"} - ... ) """ - return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp) + return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp, session=session) def sign_headers_post( self, @@ -518,6 +523,7 @@ def sign_headers_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers for POST request (convenience method) @@ -528,17 +534,11 @@ def sign_headers_post( xsec_appid: Application identifier, defaults to `xhs-pc-web` payload: POST request body data timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid - - Examples: - >>> client = Xhshow() - >>> cookies = {"a1": "your_a1_value", "web_session": "..."} - >>> headers = client.sign_headers_post( - ... uri="/api/sns/web/v1/login", - ... cookies=cookies, - ... payload={"username": "test", "password": "123456"} - ... ) """ - return self.sign_headers("POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp) + return self.sign_headers( + "POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp, session=session + ) diff --git a/src/xhshow/core/crypto.py b/src/xhshow/core/crypto.py index 8d3be5e..8da590e 100644 --- a/src/xhshow/core/crypto.py +++ b/src/xhshow/core/crypto.py @@ -1,5 +1,6 @@ import struct import time +from typing import TYPE_CHECKING from ..config import CryptoConfig from ..utils.bit_ops import BitOperations @@ -7,6 +8,10 @@ from ..utils.hex_utils import HexProcessor from ..utils.random_gen import RandomGenerator +if TYPE_CHECKING: + from ..session import SignState + + __all__ = ["CryptoProcessor"] @@ -57,6 +62,7 @@ def build_payload_array( app_identifier: str = "xhs-pc-web", string_param: str = "", timestamp: float | None = None, + sign_state: "SignState | None" = None, ) -> list[int]: """ Build payload array (t.js version - exact match) @@ -67,6 +73,7 @@ def build_payload_array( app_identifier (str): Application identifier, default "xhs-pc-web" string_param (str): String parameter (used for URI length calculation) timestamp (float | None): Unix timestamp in seconds (defaults to current time) + sign_state (SignState | None): Optional state for realistic signature generation. Returns: list[int]: Complete payload byte array (124 bytes) @@ -84,24 +91,27 @@ def build_payload_array( timestamp = time.time() payload.extend(self.env_fingerprint_a(int(timestamp * 1000), self.config.ENV_FINGERPRINT_XOR_KEY)) - time_offset = self.random_gen.generate_random_byte_in_range( - self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN, - self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX, - ) - payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000))) + if sign_state: + payload.extend(self.env_fingerprint_b(sign_state.page_load_timestamp)) + sequence_value = sign_state.sequence_value + window_props_length = sign_state.window_props_length + uri_length = sign_state.uri_length + else: + time_offset = self.random_gen.generate_random_byte_in_range( + self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN, + self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX, + ) + payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000))) + sequence_value = self.random_gen.generate_random_byte_in_range( + self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX + ) + window_props_length = self.random_gen.generate_random_byte_in_range( + self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX + ) + uri_length = len(string_param) - sequence_value = self.random_gen.generate_random_byte_in_range( - self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX - ) payload.extend(self._int_to_le_bytes(sequence_value, 4)) - - window_props_length = self.random_gen.generate_random_byte_in_range( - self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX - ) payload.extend(self._int_to_le_bytes(window_props_length, 4)) - - # URI length - uri_length = len(string_param) payload.extend(self._int_to_le_bytes(uri_length, 4)) # MD5 XOR segment diff --git a/src/xhshow/session.py b/src/xhshow/session.py new file mode 100644 index 0000000..eb4f9f3 --- /dev/null +++ b/src/xhshow/session.py @@ -0,0 +1,56 @@ +import random +import time +from typing import NamedTuple + + +class SignState(NamedTuple): + """Immutable state for a single signing operation.""" + + page_load_timestamp: int + sequence_value: int + window_props_length: int + uri_length: int + + +class SessionManager: + """ + Manages the state for a simulated user session to generate more realistic signatures. + + This class maintains counters that should persist and evolve across multiple requests + within the same logical session. + """ + + def __init__(self): + self.page_load_timestamp: int = int(time.time() * 1000) + self.sequence_value: int = random.randint(15, 17) + self.window_props_length: int = random.randint(1000, 2000) + + def update_state(self): + """ + Updates the session state to simulate user activity between requests. + + This method should be called before each signing operation. + """ + self.sequence_value += random.randint(0, 1) + self.window_props_length += random.randint(1, 10) + + def get_current_state(self, uri: str) -> SignState: + """ + Get the current signing state for a request. + + This method automatically updates the session state counters and calculates + the URI length from the provided URI string. + + Args: + uri (str): The URI string for the current request. + + Returns: + SignState: An immutable tuple with the current state for signing. + """ + self.update_state() + return SignState( + page_load_timestamp=self.page_load_timestamp, + sequence_value=self.sequence_value, + window_props_length=self.window_props_length, + uri_length=len(uri), + ) diff --git a/src/xhshow/utils/validators.py b/src/xhshow/utils/validators.py index 6570dce..c79e1b7 100644 --- a/src/xhshow/utils/validators.py +++ b/src/xhshow/utils/validators.py @@ -110,6 +110,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", payload: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -127,6 +128,7 @@ def wrapper( validated_xsec_appid, validated_payload, timestamp, + session, ) return wrapper # type: ignore @@ -151,6 +153,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", params: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -166,6 +169,7 @@ def wrapper( validated_xsec_appid, validated_params, timestamp, + session, ) return wrapper # type: ignore @@ -190,6 +194,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", payload: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -205,6 +210,7 @@ def wrapper( validated_xsec_appid, validated_payload, timestamp, + session, ) return wrapper # type: ignore diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..6e3cbed --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,242 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from xhshow.client import Xhshow +from xhshow.session import SessionManager, SignState + + +@pytest.fixture +def mock_crypto_processor(): + """Fixture to mock the CryptoProcessor and its methods.""" + with patch("xhshow.client.CryptoProcessor") as mock_proc_class: + mock_instance = mock_proc_class.return_value + mock_instance.build_payload_array = MagicMock(return_value=[]) + mock_instance.bit_ops.xor_transform_array = MagicMock(return_value=b"") + mock_instance.b64encoder.encode_x3 = MagicMock(return_value="encoded_x3") + mock_instance.b64encoder.encode = MagicMock(return_value="encoded_xs") + + # Set up the config mock to return a real dictionary + mock_instance.config.SIGNATURE_DATA_TEMPLATE = { + "x0": "4.2.6", + "x1": "xhs-pc-web", + "x2": "Windows", + "x3": "", + "x4": "", + } + mock_instance.config.X3_PREFIX = "mns0301_" + mock_instance.config.XYS_PREFIX = "XYS_" + + yield mock_instance + + +def test_signing_with_session(mock_crypto_processor): + """ + Verify that when a session object is provided, its state is used for signing. + """ + client = Xhshow() + session = SessionManager() + + # Update state to ensure counters are not zero + session.update_state() + session.update_state() + + uri = "/api/sns/web/v1/user/posted" + cookies = {"a1": "test_a1", "web_session": "test_session"} + + # Perform signing + client.sign_headers_get(uri=uri, cookies=cookies, session=session) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify the state matches + assert actual_state is not None + assert isinstance(actual_state, SignState) + assert actual_state.page_load_timestamp == session.page_load_timestamp + assert actual_state.sequence_value == session.sequence_value + assert actual_state.window_props_length == session.window_props_length + assert actual_state.uri_length == len(uri) + + +def test_signing_without_session(mock_crypto_processor): + """ + Verify that signing falls back to the old method when no session is provided. + """ + client = Xhshow() + uri = "/api/sns/web/v1/user/posted" + cookies = {"a1": "test_a1", "web_session": "test_session"} + + # Perform signing without a session + client.sign_headers_get(uri=uri, cookies=cookies) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify that no state was passed (it should be None) + assert actual_state is None + + +def test_signing_with_session_post(mock_crypto_processor): + """ + Verify that POST requests use session state when provided. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/login" + cookies = {"a1": "test_a1", "web_session": "test_session"} + payload = {"username": "test_user", "password": "test_pass"} + + # Perform POST signing with session + client.sign_headers_post(uri=uri, cookies=cookies, payload=payload, session=session) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify the state matches + assert actual_state is not None + assert isinstance(actual_state, SignState) + assert actual_state.page_load_timestamp == session.page_load_timestamp + assert actual_state.sequence_value == session.sequence_value + assert actual_state.window_props_length == session.window_props_length + assert actual_state.uri_length == len(uri) + + +def test_signing_without_session_post(mock_crypto_processor): + """ + Verify that POST requests fall back to stateless mode when no session is provided. + """ + client = Xhshow() + uri = "/api/sns/web/v1/login" + cookies = {"a1": "test_a1", "web_session": "test_session"} + payload = {"username": "test_user", "password": "test_pass"} + + # Perform POST signing without session + client.sign_headers_post(uri=uri, cookies=cookies, payload=payload) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify that no state was passed + assert actual_state is None + + +def test_session_state_evolution(): + """ + Verify that SessionManager state evolves correctly across multiple calls. + """ + session = SessionManager() + + initial_sequence = session.sequence_value + initial_window_props = session.window_props_length + initial_timestamp = session.page_load_timestamp + + # Call get_current_state multiple times + uri1 = "/api/test1" + state1 = session.get_current_state(uri1) + + uri2 = "/api/test2" + state2 = session.get_current_state(uri2) + + uri3 = "/api/test3" + state3 = session.get_current_state(uri3) + + # Verify page_load_timestamp remains constant + assert state1.page_load_timestamp == initial_timestamp + assert state2.page_load_timestamp == initial_timestamp + assert state3.page_load_timestamp == initial_timestamp + + # Verify sequence_value increases (0 or 1 per call) + assert state1.sequence_value >= initial_sequence + assert state2.sequence_value >= state1.sequence_value + assert state3.sequence_value >= state2.sequence_value + + # Verify window_props_length increases (1-10 per call) + assert state1.window_props_length > initial_window_props + assert state2.window_props_length > state1.window_props_length + assert state3.window_props_length > state2.window_props_length + + # Verify uri_length uses actual URI length + assert state1.uri_length == len(uri1) + assert state2.uri_length == len(uri2) + assert state3.uri_length == len(uri3) + + +def test_session_uri_length_accuracy(): + """ + Verify that uri_length in SignState always matches actual URI length. + """ + session = SessionManager() + + test_cases = [ + "/api/short", + "/api/medium/path/to/resource", + "/api/very/long/path/to/some/deeply/nested/resource/endpoint", + ] + + for uri in test_cases: + state = session.get_current_state(uri) + assert state.uri_length == len(uri) + + +def test_sign_xs_get_with_session(mock_crypto_processor): + """ + Verify sign_xs_get passes session state correctly. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/homefeed" + params = {"page": "1", "limit": "20"} + + signature = client.sign_xs_get(uri=uri, a1_value="test_a1", params=params, session=session) + + # Verify signature was generated + assert signature.startswith("XYS_") + + # Verify build_payload_array was called with sign_state + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + assert actual_state is not None + assert actual_state.uri_length == len(uri) + + +def test_sign_xs_post_with_session(mock_crypto_processor): + """ + Verify sign_xs_post passes session state correctly. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/comment/post" + payload = {"note_id": "12345", "content": "Great post!"} + + signature = client.sign_xs_post(uri=uri, a1_value="test_a1", payload=payload, session=session) + + # Verify signature was generated + assert signature.startswith("XYS_") + + # Verify build_payload_array was called with sign_state + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + assert actual_state is not None + assert actual_state.uri_length == len(uri) diff --git a/tests/test_url_utils.py b/tests/test_url_utils.py index 697a55c..461d872 100644 --- a/tests/test_url_utils.py +++ b/tests/test_url_utils.py @@ -1,6 +1,7 @@ import pytest from xhshow import Xhshow +from xhshow.session import SessionManager from xhshow.utils.url_utils import build_url, extract_uri @@ -149,3 +150,16 @@ def test_client_sign_with_invalid_params(self): a1_value="test_a1_value", params="invalid", # type: ignore ) + + def test_client_sign_with_session(self): + client = Xhshow() + session = SessionManager() + for _ in range(10): + signature = client.sign_xs_get( + uri="/api/sns/web/v1/user_posted", + a1_value="test_a1_value", + params={"num": "30"}, + session=session, + ) + + assert signature.startswith("XYS_")