Skip to content
Merged
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ jobs:
github-release:
name: Create GitHub Release
runs-on: ubuntu-latest
environment:
name: release
permissions:
contents: write
steps:
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,80 @@ headers = {

</details>

### 会话管理(实验性功能)

`SessionManager` 用于模拟真实用户会话,维护状态化的签名参数,可能有助于提升长期稳定性。

**注意**:此功能基于 [#86](https://github.com/Cloxl/xhshow/issues/86) 理论分析,实际效果待验证。建议先在测试环境使用。

#### 基本使用

```python
from xhshow import Xhshow, SessionManager
import requests

client = Xhshow()
session = SessionManager() # 创建会话管理器

cookies = {"a1": "...", "web_session": "...", "webId": "..."}

# 使用 session 参数进行签名
headers = client.sign_headers_get(
uri="/api/sns/web/v1/user_posted",
cookies=cookies,
params={"num": "30"},
session=session # 传入 session
)

response = requests.get(
"https://edith.xiaohongshu.com/api/sns/web/v1/user_posted",
params={"num": "30"},
headers=headers,
cookies=cookies
)

# 同一个 session 可以在多次请求中复用
headers2 = client.sign_headers_get(
uri="/api/sns/web/v1/homefeed",
cookies=cookies,
params={"page": "1"},
session=session # 复用同一个 session
)
```

#### 账户池管理

如果你有多个账户,需要为每个账户创建独立的 `SessionManager`:

```python
accounts = [
{"a1": "account1_a1", "web_session": "session1"},
{"a1": "account2_a1", "web_session": "session2"},
]

# 为每个账户创建独立的 session
sessions = {}
for account in accounts:
sessions[account["a1"]] = SessionManager()

# 使用时匹配账户和对应的 session
for account in accounts:
headers = client.sign_headers_get(
uri="/api/sns/web/v1/user_posted",
cookies=account,
params={"num": "30"},
session=sessions[account["a1"]] # 使用对应账户的 session
)
```

#### 工作原理

- **无 Session**:每次请求生成随机参数,可能被识别为机器人行为
- **有 Session**:维护固定的页面加载时间戳和单调递增的计数器,模拟真实用户在同一页面中的连续操作

**适用场景:**
- 长期运行的爬虫或服务

### 解密签名

```python
Expand Down
3 changes: 2 additions & 1 deletion src/xhshow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .client import Xhshow
from .config import CryptoConfig
from .core.crypto import CryptoProcessor
from .session import SessionManager, SignState

__version__ = "0.1.0"
__all__ = ["CryptoConfig", "CryptoProcessor", "Xhshow"]
__all__ = ["CryptoConfig", "CryptoProcessor", "SessionManager", "SignState", "Xhshow"]
64 changes: 32 additions & 32 deletions src/xhshow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .config import CryptoConfig
from .core.common_sign import XsCommonSigner
from .core.crypto import CryptoProcessor
from .session import SessionManager, SignState
from .utils.random_gen import RandomGenerator
from .utils.url_utils import build_url, extract_uri
from .utils.validators import (
Expand All @@ -16,7 +17,7 @@
validate_xs_common_params,
)

__all__ = ["Xhshow"]
__all__ = ["Xhshow", "SessionManager", "SignState"]


class Xhshow:
Expand Down Expand Up @@ -111,6 +112,7 @@ def sign_xs(
xsec_appid: str = "xhs-pc-web",
payload: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> str:
"""
Generate request signature (supports GET and POST)
Expand All @@ -127,6 +129,7 @@ def sign_xs(
- GET request: params value
- POST request: payload value
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
str: Complete signature string
Expand All @@ -136,15 +139,20 @@ def sign_xs(
ValueError: Parameter value error
"""
uri = extract_uri(uri)

signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy()

content_string = self._build_content_string(method, uri, payload)

d_value = self._generate_d_value(content_string)
signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + self._build_signature(
d_value, a1_value, xsec_appid, content_string, timestamp

sign_state = session.get_current_state(uri) if session else None

payload_array = self.crypto_processor.build_payload_array(
d_value, a1_value, xsec_appid, content_string, timestamp, sign_state=sign_state
)
xor_result = self.crypto_processor.bit_ops.xor_transform_array(payload_array)
x3_signature = self.crypto_processor.b64encoder.encode_x3(xor_result[:124])

signature_data = self.crypto_processor.config.SIGNATURE_DATA_TEMPLATE.copy()
signature_data["x3"] = self.crypto_processor.config.X3_PREFIX + x3_signature

return self.crypto_processor.config.XYS_PREFIX + self.crypto_processor.b64encoder.encode(
json.dumps(signature_data, separators=(",", ":"), ensure_ascii=False)
)
Expand Down Expand Up @@ -174,6 +182,7 @@ def sign_xs_get(
xsec_appid: str = "xhs-pc-web",
params: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> str:
"""
Generate GET request signature (convenience method)
Expand All @@ -186,6 +195,7 @@ def sign_xs_get(
xsec_appid: Application identifier, defaults to `xhs-pc-web`
params: GET request parameters
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
str: Complete signature string
Expand All @@ -194,7 +204,7 @@ def sign_xs_get(
TypeError: Parameter type error
ValueError: Parameter value error
"""
return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp)
return self.sign_xs("GET", uri, a1_value, xsec_appid, payload=params, timestamp=timestamp, session=session)

@validate_post_signature_params
def sign_xs_post(
Expand All @@ -204,6 +214,7 @@ def sign_xs_post(
xsec_appid: str = "xhs-pc-web",
payload: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> str:
"""
Generate POST request signature (convenience method)
Expand All @@ -216,6 +227,7 @@ def sign_xs_post(
xsec_appid: Application identifier, defaults to `xhs-pc-web`
payload: POST request body data
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
str: Complete signature string
Expand All @@ -224,7 +236,7 @@ def sign_xs_post(
TypeError: Parameter type error
ValueError: Parameter value error
"""
return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp)
return self.sign_xs("POST", uri, a1_value, xsec_appid, payload=payload, timestamp=timestamp, session=session)

@validate_xs_common_params
def sign_xsc(
Expand Down Expand Up @@ -406,6 +418,7 @@ def sign_headers(
params: dict[str, Any] | None = None,
payload: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> dict[str, str]:
"""
Generate complete request headers with signature and trace IDs
Expand All @@ -418,6 +431,7 @@ def sign_headers(
params: GET request parameters (only used when method="GET")
payload: POST request body data (only used when method="POST")
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid
Expand Down Expand Up @@ -447,7 +461,6 @@ def sign_headers(

method_upper = method.upper()

# Validate method and parameters
if method_upper == "GET":
if payload is not None:
raise ValueError("GET requests must use 'params', not 'payload'")
Expand All @@ -460,12 +473,11 @@ def sign_headers(
raise ValueError(f"Unsupported method: {method}")

cookie_dict = self._parse_cookies(cookies)

a1_value = cookie_dict.get("a1")
if not a1_value:
raise ValueError("Missing 'a1' in cookies")

x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp)
x_s = self.sign_xs(method_upper, uri, a1_value, xsec_appid, request_data, timestamp, session)
x_s_common = self.sign_xs_common(cookie_dict)
x_t = self.get_x_t(timestamp)
x_b3_traceid = self.get_b3_trace_id()
Expand All @@ -486,6 +498,7 @@ def sign_headers_get(
xsec_appid: str = "xhs-pc-web",
params: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> dict[str, str]:
"""
Generate complete request headers for GET request (convenience method)
Expand All @@ -496,20 +509,12 @@ def sign_headers_get(
xsec_appid: Application identifier, defaults to `xhs-pc-web`
params: GET request parameters
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid

Examples:
>>> client = Xhshow()
>>> cookies = {"a1": "your_a1_value", "web_session": "..."}
>>> headers = client.sign_headers_get(
... uri="/api/sns/web/v1/user_posted",
... cookies=cookies,
... params={"num": "30"}
... )
"""
return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp)
return self.sign_headers("GET", uri, cookies, xsec_appid, params=params, timestamp=timestamp, session=session)

def sign_headers_post(
self,
Expand All @@ -518,6 +523,7 @@ def sign_headers_post(
xsec_appid: str = "xhs-pc-web",
payload: dict[str, Any] | None = None,
timestamp: float | None = None,
session: SessionManager | None = None,
) -> dict[str, str]:
"""
Generate complete request headers for POST request (convenience method)
Expand All @@ -528,17 +534,11 @@ def sign_headers_post(
xsec_appid: Application identifier, defaults to `xhs-pc-web`
payload: POST request body data
timestamp: Unix timestamp in seconds (defaults to current time)
session: Optional session manager for stateful signing.

Returns:
dict: Complete headers including x-s, x-s-common, x-t, x-b3-traceid, x-xray-traceid

Examples:
>>> client = Xhshow()
>>> cookies = {"a1": "your_a1_value", "web_session": "..."}
>>> headers = client.sign_headers_post(
... uri="/api/sns/web/v1/login",
... cookies=cookies,
... payload={"username": "test", "password": "123456"}
... )
"""
return self.sign_headers("POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp)
return self.sign_headers(
"POST", uri, cookies, xsec_appid, payload=payload, timestamp=timestamp, session=session
)
40 changes: 25 additions & 15 deletions src/xhshow/core/crypto.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import struct
import time
from typing import TYPE_CHECKING

from ..config import CryptoConfig
from ..utils.bit_ops import BitOperations
from ..utils.encoder import Base64Encoder
from ..utils.hex_utils import HexProcessor
from ..utils.random_gen import RandomGenerator

if TYPE_CHECKING:
from ..session import SignState


__all__ = ["CryptoProcessor"]


Expand Down Expand Up @@ -57,6 +62,7 @@ def build_payload_array(
app_identifier: str = "xhs-pc-web",
string_param: str = "",
timestamp: float | None = None,
sign_state: "SignState | None" = None,
) -> list[int]:
"""
Build payload array (t.js version - exact match)
Expand All @@ -67,6 +73,7 @@ def build_payload_array(
app_identifier (str): Application identifier, default "xhs-pc-web"
string_param (str): String parameter (used for URI length calculation)
timestamp (float | None): Unix timestamp in seconds (defaults to current time)
sign_state (SignState | None): Optional state for realistic signature generation.

Returns:
list[int]: Complete payload byte array (124 bytes)
Expand All @@ -84,24 +91,27 @@ def build_payload_array(
timestamp = time.time()
payload.extend(self.env_fingerprint_a(int(timestamp * 1000), self.config.ENV_FINGERPRINT_XOR_KEY))

time_offset = self.random_gen.generate_random_byte_in_range(
self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN,
self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX,
)
payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000)))
if sign_state:
payload.extend(self.env_fingerprint_b(sign_state.page_load_timestamp))
sequence_value = sign_state.sequence_value
window_props_length = sign_state.window_props_length
uri_length = sign_state.uri_length
else:
time_offset = self.random_gen.generate_random_byte_in_range(
self.config.ENV_FINGERPRINT_TIME_OFFSET_MIN,
self.config.ENV_FINGERPRINT_TIME_OFFSET_MAX,
)
payload.extend(self.env_fingerprint_b(int((timestamp - time_offset) * 1000)))
sequence_value = self.random_gen.generate_random_byte_in_range(
self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX
)
window_props_length = self.random_gen.generate_random_byte_in_range(
self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX
)
uri_length = len(string_param)

sequence_value = self.random_gen.generate_random_byte_in_range(
self.config.SEQUENCE_VALUE_MIN, self.config.SEQUENCE_VALUE_MAX
)
payload.extend(self._int_to_le_bytes(sequence_value, 4))

window_props_length = self.random_gen.generate_random_byte_in_range(
self.config.WINDOW_PROPS_LENGTH_MIN, self.config.WINDOW_PROPS_LENGTH_MAX
)
payload.extend(self._int_to_le_bytes(window_props_length, 4))

# URI length
uri_length = len(string_param)
payload.extend(self._int_to_le_bytes(uri_length, 4))

# MD5 XOR segment
Expand Down
Loading