From 2ac2bb751b75b18587dbac596424d46f6263d305 Mon Sep 17 00:00:00 2001 From: dev Date: Wed, 14 Jan 2026 16:46:05 +0800 Subject: [PATCH 1/9] =?UTF-8?q?feat(signature):=20=E5=BC=95=E5=85=A5=20Ses?= =?UTF-8?q?sionManager=20=E4=BB=A5=E4=BC=98=E5=8C=96=E9=A3=8E=E6=8E=A7?= =?UTF-8?q?=E5=8F=82=E6=95=B0=20-=20=E5=BC=95=E5=85=A5=20SessionManager=20?= =?UTF-8?q?=E6=9D=A5=E7=AE=A1=E7=90=86=E5=92=8C=E7=BB=B4=E6=8A=A4=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E6=A8=A1=E6=8B=9F=E4=BC=9A=E8=AF=9D=E7=9A=84=E7=8A=B6?= =?UTF-8?q?=E6=80=81=EF=BC=8C=E5=8C=85=E6=8B=AC=E9=A1=B5=E9=9D=A2=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E6=97=B6=E9=97=B4=E6=88=B3=E5=92=8C=E5=90=84=E7=A7=8D?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E5=99=A8=E3=80=82=20-=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E7=AD=BE=E5=90=8D=E6=96=B9=E6=B3=95=E4=BB=A5=E6=8E=A5=E5=8F=97?= =?UTF-8?q?=E5=8F=AF=E9=80=89=E7=9A=84=20SessionManager=20=E5=AE=9E?= =?UTF-8?q?=E4=BE=8B=EF=BC=8C=E4=BB=8E=E8=80=8C=E7=94=9F=E6=88=90=E6=9B=B4?= =?UTF-8?q?=E9=80=BC=E7=9C=9F=E7=9A=84=E3=80=81=E5=8C=85=E5=90=AB=E5=8D=95?= =?UTF-8?q?=E8=B0=83=E9=80=92=E5=A2=9E=E8=AE=A1=E6=95=B0=E5=99=A8=E7=9A=84?= =?UTF-8?q?=E7=AD=BE=E5=90=8D=E3=80=82=20-=20=E6=B7=BB=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E7=9B=B8=E5=BA=94=E7=9A=84=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E6=9D=A5=E9=AA=8C=E8=AF=81=E6=96=B0=E4=BC=9A=E8=AF=9D=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=99=A8=E7=9A=84=E5=8A=9F=E8=83=BD=E5=92=8C=E5=90=91?= =?UTF-8?q?=E5=90=8E=E5=85=BC=E5=AE=B9=E6=80=A7=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/xhshow/client.py | 71 ++++++++++++++------------- src/xhshow/core/crypto.py | 40 ++++++++++------ src/xhshow/session.py | 59 +++++++++++++++++++++++ src/xhshow/utils/validators.py | 6 +++ tests/test_session.py | 88 ++++++++++++++++++++++++++++++++++ tests/test_url_utils.py | 16 +++++++ 6 files changed, 231 insertions(+), 49 deletions(-) create mode 100644 src/xhshow/session.py create mode 100644 tests/test_session.py diff --git a/src/xhshow/client.py b/src/xhshow/client.py index a44d1c0..bea0aa8 100644 --- a/src/xhshow/client.py +++ b/src/xhshow/client.py @@ -2,11 +2,12 @@ import json import time import urllib.parse -from typing import Any, Literal +from typing import Any, Literal, TYPE_CHECKING from .config import CryptoConfig from .core.common_sign import XsCommonSigner from .core.crypto import CryptoProcessor +from .session import SessionManager, SignState from .utils.random_gen import RandomGenerator from .utils.url_utils import build_url, extract_uri from .utils.validators import ( @@ -16,7 +17,10 @@ validate_xs_common_params, ) -__all__ = ["Xhshow"] +if TYPE_CHECKING: + from .session import SessionManager + +__all__ = ["Xhshow", "SessionManager", "SignState"] class Xhshow: @@ -111,6 +115,7 @@ def sign_xs( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> str: """ Generate request signature (supports GET and POST) @@ -127,6 +132,7 @@ def sign_xs( - GET request: params value - POST request: payload value timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -136,15 +142,20 @@ def sign_xs( ValueError: Parameter value error """ uri = extract_uri(uri) - - signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy() - content_string = self._build_content_string(method, uri, payload) - d_value = self._generate_d_value(content_string) - signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + self._build_signature( - d_value, a1_value, xsec_appid, content_string, timestamp + + sign_state = session.get_current_state(content_string) if session else None + + payload_array = self.crypto_processor.build_payload_array( + d_value, a1_value, xsec_appid, content_string, timestamp, sign_state=sign_state ) + xor_result = self.crypto_processor.bit_ops.xor_transform_array(payload_array) + x3_signature = self.crypto_processor.b64encoder.encode_x3(xor_result[:124]) + + signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy() + signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + x3_signature + return self.crypto_processor.config.XYS_PREFIX + self.crypto_processor.b64encoder.encode( json.dumps(signature_data, separators=(",", ":"), ensure_ascii=False) ) @@ -174,6 +185,7 @@ def sign_xs_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> str: """ Generate GET request signature (convenience method) @@ -186,6 +198,7 @@ def sign_xs_get( xsec_appid: Application identifier, defaults to `xhs-pc-web` params: GET request parameters timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -194,7 +207,7 @@ def sign_xs_get( TypeError: Parameter type error ValueError: Parameter value error """ - return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp) + return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp, session=session) @validate_post_signature_params def sign_xs_post( @@ -204,6 +217,7 @@ def sign_xs_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> str: """ Generate POST request signature (convenience method) @@ -216,6 +230,7 @@ def sign_xs_post( xsec_appid: Application identifier, defaults to `xhs-pc-web` payload: POST request body data timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: str: Complete signature string @@ -224,7 +239,7 @@ def sign_xs_post( TypeError: Parameter type error ValueError: Parameter value error """ - return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp) + return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp, session=session) @validate_xs_common_params def sign_xsc( @@ -406,6 +421,7 @@ def sign_headers( params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> dict[str, str]: """ Generate complete request headers with signature and trace IDs @@ -418,6 +434,7 @@ def sign_headers( params: GET request parameters (only used when method="GET") payload: POST request body data (only used when method="POST") timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid @@ -447,7 +464,6 @@ def sign_headers( method_upper = method.upper() - # Validate method and parameters if method_upper == "GET": if payload is not None: raise ValueError("GET requests must use 'params', not 'payload'") @@ -460,12 +476,11 @@ def sign_headers( raise ValueError(f"Unsupported method: {method}") cookie_dict = self._parse_cookies(cookies) - a1_value = cookie_dict.get("a1") if not a1_value: raise ValueError("Missing 'a1' in cookies") - x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp) + x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp, session) x_s_common = self.sign_xs_common(cookie_dict) x_t = self.get_x_t(timestamp) x_b3_traceid = self.get_b3_trace_id() @@ -486,6 +501,7 @@ def sign_headers_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> dict[str, str]: """ Generate complete request headers for GET request (convenience method) @@ -496,20 +512,12 @@ def sign_headers_get( xsec_appid: Application identifier, defaults to `xhs-pc-web` params: GET request parameters timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid - - Examples: - >>> client = Xhshow() - >>> cookies = {"a1": "your_a1_value", "web_session": "..."} - >>> headers = client.sign_headers_get( - ... uri="/api/sns/web/v1/user_posted", - ... cookies=cookies, - ... params={"num": "30"} - ... ) """ - return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp) + return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp, session=session) def sign_headers_post( self, @@ -518,6 +526,7 @@ def sign_headers_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, + session: "SessionManager | None" = None, ) -> dict[str, str]: """ Generate complete request headers for POST request (convenience method) @@ -528,17 +537,11 @@ def sign_headers_post( xsec_appid: Application identifier, defaults to `xhs-pc-web` payload: POST request body data timestamp: Unix timestamp in seconds (defaults to current time) + session: Optional session manager for stateful signing. Returns: - dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid - - Examples: - >>> client = Xhshow() - >>> cookies = {"a1": "your_a1_value", "web_session": "..."} - >>> headers = client.sign_headers_post( - ... uri="/api/sns/web/v1/login", - ... cookies=cookies, - ... payload={"username": "test", "password": "123456"} - ... ) + dict: Complete headers including x-s, x-s-common, x-t, b3-traceid, x-xray-traceid """ - return self.sign_headers("POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp) + return self.sign_headers( + "POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp, session=session + ) diff --git a/src/xhshow/core/crypto.py b/src/xhshow/core/crypto.py index 8d3be5e..8da590e 100644 --- a/src/xhshow/core/crypto.py +++ b/src/xhshow/core/crypto.py @@ -1,5 +1,6 @@ import struct import time +from typing import TYPE_CHECKING from ..config import CryptoConfig from ..utils.bit_ops import BitOperations @@ -7,6 +8,10 @@ from ..utils.hex_utils import HexProcessor from ..utils.random_gen import RandomGenerator +if TYPE_CHECKING: + from ..session import SignState + + __all__ = ["CryptoProcessor"] @@ -57,6 +62,7 @@ def build_payload_array( app_identifier: str = "xhs-pc-web", string_param: str = "", timestamp: float | None = None, + sign_state: "SignState | None" = None, ) -> list[int]: """ Build payload array (t.js version - exact match) @@ -67,6 +73,7 @@ def build_payload_array( app_identifier (str): Application identifier, default "xhs-pc-web" string_param (str): String parameter (used for URI length calculation) timestamp (float | None): Unix timestamp in seconds (defaults to current time) + sign_state (SignState | None): Optional state for realistic signature generation. Returns: list[int]: Complete payload byte array (124 bytes) @@ -84,24 +91,27 @@ def build_payload_array( timestamp = time.time() payload.extend(self.env_fingerprint_a(int(timestamp * 1000), self.config.ENV_FINGERPRINT_XOR_KEY)) - time_offset = self.random_gen.generate_random_byte_in_range( - self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN, - self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX, - ) - payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000))) + if sign_state: + payload.extend(self.env_fingerprint_b(sign_state.page_load_timestamp)) + sequence_value = sign_state.sequence_value + window_props_length = sign_state.window_props_length + uri_length = sign_state.uri_length + else: + time_offset = self.random_gen.generate_random_byte_in_range( + self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN, + self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX, + ) + payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000))) + sequence_value = self.random_gen.generate_random_byte_in_range( + self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX + ) + window_props_length = self.random_gen.generate_random_byte_in_range( + self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX + ) + uri_length = len(string_param) - sequence_value = self.random_gen.generate_random_byte_in_range( - self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX - ) payload.extend(self._int_to_le_bytes(sequence_value, 4)) - - window_props_length = self.random_gen.generate_random_byte_in_range( - self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX - ) payload.extend(self._int_to_le_bytes(window_props_length, 4)) - - # URI length - uri_length = len(string_param) payload.extend(self._int_to_le_bytes(uri_length, 4)) # MD5 XOR segment diff --git a/src/xhshow/session.py b/src/xhshow/session.py new file mode 100644 index 0000000..f50c084 --- /dev/null +++ b/src/xhshow/session.py @@ -0,0 +1,59 @@ +import random +import time +from typing import NamedTuple + + +class SignState(NamedTuple): + """Immutable state for a single signing operation.""" + + page_load_timestamp: int + sequence_value: int + window_props_length: int + uri_length: int + + +class SessionManager: + """ + Manages the state for a simulated user session to generate more realistic signatures. + + This class maintains counters that should persist and evolve across multiple requests + within the same logical session. + """ + + def __init__(self): + self.page_load_timestamp: int = int(time.time() * 1000) + self.sequence_value: int = random.randint(15, 17) + self.window_props_length: int = random.randint(1000, 2000) + self.uri_length: int = random.randint(200, 400) + + def update_state(self): + """ + Updates the session state to simulate user activity between requests. + + This method should be called before each signing operation. + """ + # Simulate realistic counter increments + # self.sequence_value += random.randint(0, 1) + self.window_props_length += random.randint(1, 10) + self.uri_length += random.randint(0, 2) + + def get_current_state(self, uri: str) -> SignState: + """ + Get the current signing state, with the option to use real URI length. + + For maximum realism, the actual URI length is used, but the internal + counter is still maintained for other purposes. + + Args: + uri (str): The URI string for the current request. + + Returns: + SignState: An immutable tuple with the current state for signing. + """ + self.update_state() + return SignState( + page_load_timestamp=self.page_load_timestamp, + sequence_value=self.sequence_value, + window_props_length=self.window_props_length, + uri_length=len(uri), # Use the real URI length for the signature + ) diff --git a/src/xhshow/utils/validators.py b/src/xhshow/utils/validators.py index 6570dce..c79e1b7 100644 --- a/src/xhshow/utils/validators.py +++ b/src/xhshow/utils/validators.py @@ -110,6 +110,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", payload: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -127,6 +128,7 @@ def wrapper( validated_xsec_appid, validated_payload, timestamp, + session, ) return wrapper # type: ignore @@ -151,6 +153,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", params: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -166,6 +169,7 @@ def wrapper( validated_xsec_appid, validated_params, timestamp, + session, ) return wrapper # type: ignore @@ -190,6 +194,7 @@ def wrapper( xsec_appid: Any = "xhs-pc-web", payload: Any = None, timestamp: float | None = None, + session: Any = None, ): validator = RequestSignatureValidator() @@ -205,6 +210,7 @@ def wrapper( validated_xsec_appid, validated_payload, timestamp, + session, ) return wrapper # type: ignore diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..74feb83 --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,88 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from xhshow.client import Xhshow +from xhshow.session import SessionManager + + +@pytest.fixture +def mock_crypto_processor(): + """Fixture to mock the CryptoProcessor and its methods.""" + with patch("xhshow.client.CryptoProcessor") as mock_proc_class: + mock_instance = mock_proc_class.return_value + mock_instance.build_payload_array = MagicMock(return_value=[]) + mock_instance.bit_ops.xor_transform_array = MagicMock(return_value=b"") + mock_instance.b64encoder.encode_x3 = MagicMock(return_value="encoded_x3") + mock_instance.b64encoder.encode = MagicMock(return_value="encoded_xs") + + # Set up the config mock to return a real dictionary + mock_instance.config.SIGNATURE_DATA_TEMPLATE = { + "x0": "4.2.6", + "x1": "xhs-pc-web", + "x2": "Windows", + "x3": "", + "x4": "", + } + mock_instance.config.X3_PREFIX = "mns0301_" + mock_instance.config.XYS_PREFIX = "XYS_" + + yield mock_instance + + +def test_signing_with_session(mock_crypto_processor): + """ + Verify that when a session object is provided, its state is used for signing. + """ + client = Xhshow() + session = SessionManager() + + # Update state to ensure counters are not zero + session.update_state() + session.update_state() + + uri = "/api/sns/web/v1/user/posted" + cookies = {"a1": "test_a1", "web_session": "test_session"} + content_string = uri # Simplified for this test + + # Perform signing + client.sign_headers_get(uri=uri, cookies=cookies, session=session) + + # Get the expected state from the session + expected_state = session.get_current_state(content_string) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify the state matches + assert actual_state is not None + assert actual_state.page_load_timestamp == expected_state.page_load_timestamp + assert actual_state.sequence_value == expected_state.sequence_value + assert actual_state.window_props_length == expected_state.window_props_length + assert actual_state.uri_length == len(content_string) + + +def test_signing_without_session(mock_crypto_processor): + """ + Verify that signing falls back to the old method when no session is provided. + """ + client = Xhshow() + uri = "/api/sns/web/v1/user/posted" + cookies = {"a1": "test_a1", "web_session": "test_session"} + + # Perform signing without a session + client.sign_headers_get(uri=uri, cookies=cookies) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify that no state was passed (it should be None) + assert actual_state is None diff --git a/tests/test_url_utils.py b/tests/test_url_utils.py index 697a55c..1314959 100644 --- a/tests/test_url_utils.py +++ b/tests/test_url_utils.py @@ -1,6 +1,7 @@ import pytest from xhshow import Xhshow +from xhshow.session import SessionManager from xhshow.utils.url_utils import build_url, extract_uri @@ -149,3 +150,18 @@ def test_client_sign_with_invalid_params(self): a1_value="test_a1_value", params="invalid", # type: ignore ) + + def test_client_sign_with_session(self): + client = Xhshow() + session = SessionManager() + for i in range(10): + signature = client.sign_xs_get( + uri="/api/sns/web/v1/user_posted", + a1_value="test_a1_value", + params={"num": "30"}, + session=session, + ) + session.update_state() + print(signature) + + assert signature.startswith("XYS_") From 0d95ab381eb61b86f3b06ae2254c0704d2781938 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 03:35:36 +0800 Subject: [PATCH 2/9] fix(session): remove redundant uri_length field and fix state passing --- src/xhshow/client.py | 21 +++++++++------------ src/xhshow/session.py | 13 +++++-------- tests/test_session.py | 15 ++++++--------- tests/test_url_utils.py | 1 - 4 files changed, 20 insertions(+), 30 deletions(-) diff --git a/src/xhshow/client.py b/src/xhshow/client.py index bea0aa8..5a419fc 100644 --- a/src/xhshow/client.py +++ b/src/xhshow/client.py @@ -2,7 +2,7 @@ import json import time import urllib.parse -from typing import Any, Literal, TYPE_CHECKING +from typing import Any, Literal from .config import CryptoConfig from .core.common_sign import XsCommonSigner @@ -17,9 +17,6 @@ validate_xs_common_params, ) -if TYPE_CHECKING: - from .session import SessionManager - __all__ = ["Xhshow", "SessionManager", "SignState"] @@ -115,7 +112,7 @@ def sign_xs( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> str: """ Generate request signature (supports GET and POST) @@ -145,7 +142,7 @@ def sign_xs( content_string = self._build_content_string(method, uri, payload) d_value = self._generate_d_value(content_string) - sign_state = session.get_current_state(content_string) if session else None + sign_state = session.get_current_state(uri) if session else None payload_array = self.crypto_processor.build_payload_array( d_value, a1_value, xsec_appid, content_string, timestamp, sign_state=sign_state @@ -185,7 +182,7 @@ def sign_xs_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> str: """ Generate GET request signature (convenience method) @@ -217,7 +214,7 @@ def sign_xs_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> str: """ Generate POST request signature (convenience method) @@ -421,7 +418,7 @@ def sign_headers( params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers with signature and trace IDs @@ -501,7 +498,7 @@ def sign_headers_get( xsec_appid: str = "xhs-pc-web", params: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers for GET request (convenience method) @@ -526,7 +523,7 @@ def sign_headers_post( xsec_appid: str = "xhs-pc-web", payload: dict[str, Any] | None = None, timestamp: float | None = None, - session: "SessionManager | None" = None, + session: SessionManager | None = None, ) -> dict[str, str]: """ Generate complete request headers for POST request (convenience method) @@ -540,7 +537,7 @@ def sign_headers_post( session: Optional session manager for stateful signing. Returns: - dict: Complete headers including x-s, x-s-common, x-t, b3-traceid, x-xray-traceid + dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid """ return self.sign_headers( "POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp, session=session diff --git a/src/xhshow/session.py b/src/xhshow/session.py index f50c084..eb4f9f3 100644 --- a/src/xhshow/session.py +++ b/src/xhshow/session.py @@ -24,7 +24,6 @@ def __init__(self): self.page_load_timestamp: int = int(time.time() * 1000) self.sequence_value: int = random.randint(15, 17) self.window_props_length: int = random.randint(1000, 2000) - self.uri_length: int = random.randint(200, 400) def update_state(self): """ @@ -32,17 +31,15 @@ def update_state(self): This method should be called before each signing operation. """ - # Simulate realistic counter increments - # self.sequence_value += random.randint(0, 1) + self.sequence_value += random.randint(0, 1) self.window_props_length += random.randint(1, 10) - self.uri_length += random.randint(0, 2) def get_current_state(self, uri: str) -> SignState: """ - Get the current signing state, with the option to use real URI length. + Get the current signing state for a request. - For maximum realism, the actual URI length is used, but the internal - counter is still maintained for other purposes. + This method automatically updates the session state counters and calculates + the URI length from the provided URI string. Args: uri (str): The URI string for the current request. @@ -55,5 +52,5 @@ def get_current_state(self, uri: str) -> SignState: page_load_timestamp=self.page_load_timestamp, sequence_value=self.sequence_value, window_props_length=self.window_props_length, - uri_length=len(uri), # Use the real URI length for the signature + uri_length=len(uri), ) diff --git a/tests/test_session.py b/tests/test_session.py index 74feb83..20fff4e 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -3,7 +3,7 @@ import pytest from xhshow.client import Xhshow -from xhshow.session import SessionManager +from xhshow.session import SessionManager, SignState @pytest.fixture @@ -43,14 +43,10 @@ def test_signing_with_session(mock_crypto_processor): uri = "/api/sns/web/v1/user/posted" cookies = {"a1": "test_a1", "web_session": "test_session"} - content_string = uri # Simplified for this test # Perform signing client.sign_headers_get(uri=uri, cookies=cookies, session=session) - # Get the expected state from the session - expected_state = session.get_current_state(content_string) - # Assert that build_payload_array was called mock_crypto_processor.build_payload_array.assert_called_once() @@ -60,10 +56,11 @@ def test_signing_with_session(mock_crypto_processor): # Verify the state matches assert actual_state is not None - assert actual_state.page_load_timestamp == expected_state.page_load_timestamp - assert actual_state.sequence_value == expected_state.sequence_value - assert actual_state.window_props_length == expected_state.window_props_length - assert actual_state.uri_length == len(content_string) + assert isinstance(actual_state, SignState) + assert actual_state.page_load_timestamp == session.page_load_timestamp + assert actual_state.sequence_value == session.sequence_value + assert actual_state.window_props_length == session.window_props_length + assert actual_state.uri_length == len(uri) def test_signing_without_session(mock_crypto_processor): diff --git a/tests/test_url_utils.py b/tests/test_url_utils.py index 1314959..dd6e9d3 100644 --- a/tests/test_url_utils.py +++ b/tests/test_url_utils.py @@ -162,6 +162,5 @@ def test_client_sign_with_session(self): session=session, ) session.update_state() - print(signature) assert signature.startswith("XYS_") From c413ba5de4f4ddd70c0e139169583fae37a1a7af Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 03:36:55 +0800 Subject: [PATCH 3/9] test(session): add comprehensive tests for POST and state evolution --- tests/test_session.py | 168 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/tests/test_session.py b/tests/test_session.py index 20fff4e..44f97dc 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -83,3 +83,171 @@ def test_signing_without_session(mock_crypto_processor): # Verify that no state was passed (it should be None) assert actual_state is None + + +def test_signing_with_session_post(mock_crypto_processor): + """ + Verify that POST requests use session state when provided. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/login" + cookies = {"a1": "test_a1", "web_session": "test_session"} + payload = {"username": "test_user", "password": "test_pass"} + + # Perform POST signing with session + client.sign_headers_post(uri=uri, cookies=cookies, payload=payload, session=session) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify the state matches + assert actual_state is not None + assert isinstance(actual_state, SignState) + assert actual_state.page_load_timestamp == session.page_load_timestamp + assert actual_state.sequence_value == session.sequence_value + assert actual_state.window_props_length == session.window_props_length + assert actual_state.uri_length == len(uri) + + +def test_signing_without_session_post(mock_crypto_processor): + """ + Verify that POST requests fall back to stateless mode when no session is provided. + """ + client = Xhshow() + uri = "/api/sns/web/v1/login" + cookies = {"a1": "test_a1", "web_session": "test_session"} + payload = {"username": "test_user", "password": "test_pass"} + + # Perform POST signing without session + client.sign_headers_post(uri=uri, cookies=cookies, payload=payload) + + # Assert that build_payload_array was called + mock_crypto_processor.build_payload_array.assert_called_once() + + # Get the actual arguments passed to the mock + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + # Verify that no state was passed + assert actual_state is None + + +def test_session_state_evolution(): + """ + Verify that SessionManager state evolves correctly across multiple calls. + """ + session = SessionManager() + + initial_sequence = session.sequence_value + initial_window_props = session.window_props_length + initial_timestamp = session.page_load_timestamp + + # Call get_current_state multiple times + uri1 = "/api/test1" + state1 = session.get_current_state(uri1) + + uri2 = "/api/test2" + state2 = session.get_current_state(uri2) + + uri3 = "/api/test3" + state3 = session.get_current_state(uri3) + + # Verify page_load_timestamp remains constant + assert state1.page_load_timestamp == initial_timestamp + assert state2.page_load_timestamp == initial_timestamp + assert state3.page_load_timestamp == initial_timestamp + + # Verify sequence_value increases (0 or 1 per call) + assert state1.sequence_value >= initial_sequence + assert state2.sequence_value >= state1.sequence_value + assert state3.sequence_value >= state2.sequence_value + + # Verify window_props_length increases (1-10 per call) + assert state1.window_props_length > initial_window_props + assert state2.window_props_length > state1.window_props_length + assert state3.window_props_length > state2.window_props_length + + # Verify uri_length uses actual URI length + assert state1.uri_length == len(uri1) + assert state2.uri_length == len(uri2) + assert state3.uri_length == len(uri3) + + +def test_session_uri_length_accuracy(): + """ + Verify that uri_length in SignState always matches actual URI length. + """ + session = SessionManager() + + test_cases = [ + "/api/short", + "/api/medium/path/to/resource", + "/api/very/long/path/to/some/deeply/nested/resource/endpoint", + ] + + for uri in test_cases: + state = session.get_current_state(uri) + assert state.uri_length == len(uri) + + +def test_sign_xs_get_with_session(mock_crypto_processor): + """ + Verify sign_xs_get passes session state correctly. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/homefeed" + params = {"page": "1", "limit": "20"} + + signature = client.sign_xs_get( + uri=uri, + a1_value="test_a1", + params=params, + session=session + ) + + # Verify signature was generated + assert signature.startswith("XYS_") + + # Verify build_payload_array was called with sign_state + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + assert actual_state is not None + assert actual_state.uri_length == len(uri) + + +def test_sign_xs_post_with_session(mock_crypto_processor): + """ + Verify sign_xs_post passes session state correctly. + """ + client = Xhshow() + session = SessionManager() + + uri = "/api/sns/web/v1/comment/post" + payload = {"note_id": "12345", "content": "Great post!"} + + signature = client.sign_xs_post( + uri=uri, + a1_value="test_a1", + payload=payload, + session=session + ) + + # Verify signature was generated + assert signature.startswith("XYS_") + + # Verify build_payload_array was called with sign_state + _, kwargs = mock_crypto_processor.build_payload_array.call_args + actual_state = kwargs.get("sign_state") + + assert actual_state is not None + assert actual_state.uri_length == len(uri) + From 1459a921d477f4c695d96d537162fb8d40848ea5 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 03:42:11 +0800 Subject: [PATCH 4/9] style(test): rename unused loop variable to underscore --- tests/test_url_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_url_utils.py b/tests/test_url_utils.py index dd6e9d3..a5b24aa 100644 --- a/tests/test_url_utils.py +++ b/tests/test_url_utils.py @@ -154,7 +154,7 @@ def test_client_sign_with_invalid_params(self): def test_client_sign_with_session(self): client = Xhshow() session = SessionManager() - for i in range(10): + for _ in range(10): signature = client.sign_xs_get( uri="/api/sns/web/v1/user_posted", a1_value="test_a1_value", From 2281e27099b51fced633a98a36ad7a2d98587e95 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 04:42:01 +0800 Subject: [PATCH 5/9] ci(release): add environment protection to github-release job --- .github/workflows/release.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f80a43b..6b896aa 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,6 +10,8 @@ jobs: github-release: name: Create GitHub Release runs-on: ubuntu-latest + environment: + name: release permissions: contents: write steps: From 310168cccf352971946c6daf554251ca687dad1c Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 04:42:11 +0800 Subject: [PATCH 6/9] style(test): format function calls and remove trailing newline --- tests/test_session.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/test_session.py b/tests/test_session.py index 44f97dc..6e3cbed 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -206,12 +206,7 @@ def test_sign_xs_get_with_session(mock_crypto_processor): uri = "/api/sns/web/v1/homefeed" params = {"page": "1", "limit": "20"} - signature = client.sign_xs_get( - uri=uri, - a1_value="test_a1", - params=params, - session=session - ) + signature = client.sign_xs_get(uri=uri, a1_value="test_a1", params=params, session=session) # Verify signature was generated assert signature.startswith("XYS_") @@ -234,12 +229,7 @@ def test_sign_xs_post_with_session(mock_crypto_processor): uri = "/api/sns/web/v1/comment/post" payload = {"note_id": "12345", "content": "Great post!"} - signature = client.sign_xs_post( - uri=uri, - a1_value="test_a1", - payload=payload, - session=session - ) + signature = client.sign_xs_post(uri=uri, a1_value="test_a1", payload=payload, session=session) # Verify signature was generated assert signature.startswith("XYS_") @@ -250,4 +240,3 @@ def test_sign_xs_post_with_session(mock_crypto_processor): assert actual_state is not None assert actual_state.uri_length == len(uri) - From a36572bf48c5c1fef25e6bbfe0822108aee19f87 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 18:13:49 +0800 Subject: [PATCH 7/9] feat(session): export SessionManager and SignState from package --- src/xhshow/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/xhshow/__init__.py b/src/xhshow/__init__.py index a139d42..249bd0f 100644 --- a/src/xhshow/__init__.py +++ b/src/xhshow/__init__.py @@ -1,6 +1,7 @@ from .client import Xhshow from .config import CryptoConfig from .core.crypto import CryptoProcessor +from .session import SessionManager, SignState __version__ = "0.1.0" -__all__ = ["CryptoConfig", "CryptoProcessor", "Xhshow"] +__all__ = ["CryptoConfig", "CryptoProcessor", "SessionManager", "SignState", "Xhshow"] From 92138e83f1f94c27892a78ad9624cfb83b085285 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 18:14:02 +0800 Subject: [PATCH 8/9] test(session): remove redundant update_state call in test --- tests/test_url_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_url_utils.py b/tests/test_url_utils.py index a5b24aa..461d872 100644 --- a/tests/test_url_utils.py +++ b/tests/test_url_utils.py @@ -161,6 +161,5 @@ def test_client_sign_with_session(self): params={"num": "30"}, session=session, ) - session.update_state() assert signature.startswith("XYS_") From 2fe8f0c781b4d2358af7a611270b373c89d17c78 Mon Sep 17 00:00:00 2001 From: cloxl Date: Thu, 15 Jan 2026 18:14:09 +0800 Subject: [PATCH 9/9] docs(session): add SessionManager usage guide --- README.md | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/README.md b/README.md index 8defa5c..279b4bf 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,80 @@ headers = { +### 会话管理(实验性功能) + +`SessionManager` 用于模拟真实用户会话,维护状态化的签名参数,可能有助于提升长期稳定性。 + +**注意**:此功能基于 [#86](https://github.com/Cloxl/xhshow/issues/86) 理论分析,实际效果待验证。建议先在测试环境使用。 + +#### 基本使用 + +```python +from xhshow import Xhshow, SessionManager +import requests + +client = Xhshow() +session = SessionManager() # 创建会话管理器 + +cookies = {"a1": "...", "web_session": "...", "webId": "..."} + +# 使用 session 参数进行签名 +headers = client.sign_headers_get( + uri="/api/sns/web/v1/user_posted", + cookies=cookies, + params={"num": "30"}, + session=session # 传入 session +) + +response = requests.get( + "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted", + params={"num": "30"}, + headers=headers, + cookies=cookies +) + +# 同一个 session 可以在多次请求中复用 +headers2 = client.sign_headers_get( + uri="/api/sns/web/v1/homefeed", + cookies=cookies, + params={"page": "1"}, + session=session # 复用同一个 session +) +``` + +#### 账户池管理 + +如果你有多个账户,需要为每个账户创建独立的 `SessionManager`: + +```python +accounts = [ + {"a1": "account1_a1", "web_session": "session1"}, + {"a1": "account2_a1", "web_session": "session2"}, +] + +# 为每个账户创建独立的 session +sessions = {} +for account in accounts: + sessions[account["a1"]] = SessionManager() + +# 使用时匹配账户和对应的 session +for account in accounts: + headers = client.sign_headers_get( + uri="/api/sns/web/v1/user_posted", + cookies=account, + params={"num": "30"}, + session=sessions[account["a1"]] # 使用对应账户的 session + ) +``` + +#### 工作原理 + +- **无 Session**:每次请求生成随机参数,可能被识别为机器人行为 +- **有 Session**:维护固定的页面加载时间戳和单调递增的计数器,模拟真实用户在同一页面中的连续操作 + +**适用场景:** +- 长期运行的爬虫或服务 + ### 解密签名 ```python