Skip to content

Feat/hyperscaled proxy to hyperliquid via decodo#669

Open
brokespace wants to merge 62 commits intofeat/hyperscaledfrom
feat/hyperscaled_proxy
Open

Feat/hyperscaled proxy to hyperliquid via decodo#669
brokespace wants to merge 62 commits intofeat/hyperscaledfrom
feat/hyperscaled_proxy

Conversation

@brokespace
Copy link
Copy Markdown
Collaborator

@brokespace brokespace commented Feb 18, 2026

Taoshi Pull Request

Description

This introduces the proxy support to the hyperliquid tracker. This will bypass the 10 unique address tracking / IP limit that hyperliquid enforces. I have created a test script in runnable that finds some random addresses, then starts tracking the trades they make

This merges into the other hyperscaled branch

Related Issues (JIRA)

feat/hyperscaled

Checklist

  • I have tested my changes on testnet.
  • I have updated any necessary documentation.
  • I have added unit tests for my changes (if applicable).
  • If there are breaking changes for validators, I have (or will) notify the community in Discord of the release.

Reviewer Instructions

[Provide any specific instructions or areas you would like the reviewer to focus on.]

Definition of Done

  • Code has been reviewed.
  • All checks and tests pass.
  • Documentation is up to date.
  • Approved by at least one reviewer.

Checklist (for the reviewer)

  • Code follows project conventions.
  • Code is well-documented.
  • Changes are necessary and align with the project's goals.
  • No breaking changes introduced.

Optional: Deploy Notes

[Any instructions or notes related to deployment, if applicable.]

/cc @mention_reviewer

@github-actions
Copy link
Copy Markdown

🤖 Claude AI Code Review

Last reviewed on: 13:42:17


Summary

This PR implements proxy sharding for Hyperliquid WebSocket connections via Decodo SOCKS5 proxies, enabling validators to track >10 addresses by routing connections through multiple IPs. The changes introduce a _WebSocketShard inner class, orchestration logic, and comprehensive test tooling. The implementation is well-structured but has several concerns around resource management and error handling.


✅ Strengths

  • Well-architected sharding model: The _WebSocketShard inner class cleanly encapsulates connection state, making the code modular and testable
  • Graceful degradation: Falls back to direct connection when proxy config is missing (lines 286-293 in hyperliquid_tracker.py)
  • Comprehensive test script: test_hl_proxy_sharding.py is excellent - standalone, well-documented with clear phases and colored output
  • Health monitoring: Tracks consecutive failures and marks unhealthy shards (lines 113-119)
  • Bounded deduplication: Uses OrderedDict with MAX_FILL_HASHES to prevent memory growth
  • Documentation: Excellent updates to validator.md with clear setup instructions

⚠️ Concerns

CRITICAL: WebSocket Cleanup Race Condition

Lines 123-142 in hyperliquid_tracker.py (_WebSocketShard.run)

finally:
    self.connected = False
    if ws is not None:
        try:
            await asyncio.wait_for(ws.close(), timeout=2.0)
        except RuntimeError:
            pass
        except Exception:
            transport = getattr(ws, "transport", None)
            if transport is not None:
                try:
                    loop = asyncio.get_running_loop()
                    if not loop.is_closed():
                        transport.abort()
                except RuntimeError:
                    pass

Issue: The nested try-except with RuntimeError catching suggests you're fighting event loop lifecycle issues. The get_running_loop() call can fail if the loop is already stopping.

Fix:

finally:
    self.connected = False
    if ws is not None:
        try:
            await asyncio.wait_for(ws.close(), timeout=2.0)
        except asyncio.TimeoutError:
            # Force close on timeout
            if hasattr(ws, 'transport') and ws.transport:
                ws.transport.abort()
        except (RuntimeError, asyncio.CancelledError):
            # Loop closing or task cancelled - cleanup already handled
            pass
        except Exception as e:
            bt.logging.debug(f"[HL_{self.label}] Error during ws close: {e}")

CRITICAL: Unprotected Loop Stop

Lines 324-331 in hyperliquid_tracker.py

def stop(self):
    """Signal the tracker to stop."""
    self._stop_event.set()
    if self._thread:
        self._thread.join(timeout=5.0)
    bt.logging.info("[HL_TRACKER] Stopped")

Issue: The comment says "Let _run_stream exit naturally" but there's no guarantee tasks will clean up in 5 seconds. If shards are reconnecting with backoff, they may still be running when the thread terminates.

Fix:

def stop(self):
    """Signal the tracker to stop and wait for cleanup."""
    self._stop_event.set()
    
    # Cancel all shard tasks if loop is accessible
    if self._loop and not self._loop.is_closed():
        for shard in self._shards.values():
            if shard.task and not shard.task.done():
                self._loop.call_soon_threadsafe(shard.task.cancel)
    
    if self._thread:
        self._thread.join(timeout=10.0)  # Increased timeout
        if self._thread.is_alive():
            bt.logging.warning("[HL_TRACKER] Thread did not stop cleanly")
    bt.logging.info("[HL_TRACKER] Stopped")

HIGH: Memory Leak in Address Assignment

Lines 478-482 in hyperliquid_tracker.py

# 1. Remove addresses no longer active
stale = set(self._address_to_shard.keys()) - active_addresses
for addr in stale:
    sid = self._address_to_shard.pop(addr, None)
    if sid is not None and sid in self._shards:
        self._shards[sid].addresses.discard(addr)

Issue: If a shard is removed between the check and discard (race condition), _address_to_shard still references the old shard ID but the shard's address set won't be updated.

Fix: Add lock or ensure shard removal happens only in orchestrator:

stale = set(self._address_to_shard.keys()) - active_addresses
for addr in stale:
    sid = self._address_to_shard.pop(addr, None)
    if sid is not None:
        # Always attempt cleanup even if shard is gone
        shard = self._shards.get(sid)
        if shard:
            shard.addresses.discard(addr)

HIGH: Unbounded Port Exhaustion

Lines 494-503 in hyperliquid_tracker.py

When unhealthy shards are removed, ports go to _unhealthy_ports but are never returned to _available_ports. Once a port fails 5 times, it's permanently lost.

Fix: Add recovery mechanism or document this as intentional:

# Option 1: Time-based recovery (add to ValiConfig)
HL_UNHEALTHY_PORT_RECOVERY_TIME_S = 3600  # 1 hour

# Track when ports became unhealthy
self._unhealthy_since: Dict[int, float] = {}

# In _assign_addresses_to_shards, after removing unhealthy shards:
now = time.time()
recovered = []
for port in list(self._unhealthy_ports):
    if now - self._unhealthy_since.get(port, now) > ValiConfig.HL_UNHEALTHY_PORT_RECOVERY_TIME_S:
        recovered.append(port)
        
for port in recovered:
    self._unhealthy_ports.discard(port)
    self._available_ports.append(port)
    bt.logging.info(f"[HL_TRACKER] Recovered unhealthy port {port}")

MEDIUM: SSL Context Reuse

Lines 54-62 in hyperliquid_tracker.py (_open_ws_connection)

ssl_ctx = ssl.create_default_context()
return websockets.connect(
    ValiConfig.HL_MAINNET_WS,
    sock=sock,
    ssl=ssl_ctx,
    server_hostname="api.hyperliquid.xyz",
    ping_interval=None,
)

Issue: Creating a new SSL context on every reconnect is inefficient. SSL contexts are thread-safe and should be reused.

Fix: Create once at shard initialization:

class _WebSocketShard:
    def __init__(self, shard_id: int, proxy_url: Optional[str], tracker: 'HyperliquidTracker'):
        # ... existing code ...
        self._ssl_context = ssl.create_default_context()
    
    async def _open_ws_connection(self):
        if self.proxy_url and SocksProxy:
            proxy = SocksProxy.from_url(self.proxy_url)
            sock = await asyncio.wait_for(
                proxy.connect(dest_host="api.hyperliquid.xyz", dest_port=443),
                timeout=15,
            )
            return websockets.connect(
                ValiConfig.HL_MAINNET_WS,
                sock=sock,
                ssl=self._ssl_context,  # Reuse
                server_hostname="api.hyperliquid.xyz",
                ping_interval=None,
            )

MEDIUM: Exception Swallowing

Lines 150-153 in hyperliquid_tracker.py

try:
    await ws.send(json.dumps(msg))
    bt.logging.info(f"[HL_{self.label}] Subscribed to userFills for {addr}")
except Exception as e:
    bt.logging.error(f"[HL_{self.label}] Failed to subscribe for {addr}: {e}")

Issue: Subscription failures are logged but not acted upon. The shard continues with incomplete subscriptions.

Fix: Track failed subscriptions and retry or mark shard unhealthy:

except Exception as e:
    bt.logging.error(f"[HL_{self.label}] Failed to subscribe for {addr}: {e}")
    self._consecutive_failures += 1
    if self._consecutive_failures >= ValiConfig.HL_SHARD_MAX_CONSECUTIVE_FAILURES:
        raise  # Trigger shard reconnection

💡 Suggestions

1. Add Connection Pooling Metrics

Track connection attempts, successes, and failures per shard for monitoring:

class _WebSocketShard:
    def __init__(self, ...):
        # ... existing ...
        self.connection_attempts = 0
        self.connection_successes = 0
        
    async def run(self):
        while not self.tracker._stop_event.is_set():
            self.connection_attempts += 1
            try:
                ws_ctx = await self._open_ws_connection()
                ws = await ws_ctx
                self.connection_successes += 1
                # ... rest of connection handling ...

Add to get_status():

"connection_success_rate": shard.connection_successes / max(shard.connection_attempts, 1)

2. Optimize Port Parsing

Line 639 in hyperliquid_tracker.py

The _parse_ports method is called every time _get_hl_max_addresses() is invoked. Cache the result:

def _load_proxy_config(self):
    # ... existing code ...
    self._available_ports = self._parse_ports(ports_str)
    self._total_configured_ports = len(self._available_ports)  # Cache count
    
def _get_hl_max_addresses(self) -> int:
    if self._proxy_base_url:
        return self._total_configured_ports * ValiConfig.HL_MAX_TRACKED_ADDRESSES_PER_IP
    return ValiConfig.HL_MAX_TRACKED_ADDRESSES_PER_IP

3. Add Backoff Jitter

Line 147 in hyperliquid_tracker.py

backoff_s = min(backoff_s * 2.0, ValiConfig.HL_WS_RECONNECT_BACKOFF_MAX_S)

Add jitter to prevent thundering herd when all shards reconnect simultaneously:

import random
jitter = random.uniform(0.0, 0.2 * backoff_s)
await asyncio.sleep(backoff_s + jitter)
backoff_s = min(backoff_s * 2.0, ValiConfig.HL_WS_RECONNECT_BACKOFF_MAX_S)

4. Improve Test Script Resilience

Lines 265-272 in test_hl_proxy_sharding.py

The proxy close handling has the same issues as the main code:

finally:
    if ws is not None:
        try:
            await asyncio.wait_for(ws.close(), timeout=2.0)
        except (asyncio.TimeoutError, asyncio.CancelledError):
            # Abort transport on timeout/cancel
            if hasattr(ws, 'transport') and ws.transport:
                ws.transport.abort()
        except Exception:
            pass  # Already closing

5. Add Configuration Validation

Lines 325-363 in hyperliquid_tracker.py

Validate proxy config early and fail fast:

def _load_proxy_config(self):
    # ... existing code ...
    
    if self._proxy_base_url:
        # Validate format
        if not self._proxy_base_url.startswith("socks5://"):
            bt.logging.error("[HL_TRACKER] Invalid proxy URL format - must start with socks5://")
            self._proxy_base_url = None
            self._available_ports = []
            return
            
        # Validate port count
        if not self._available_ports:
            bt.logging.error("[HL_TRACKER] No valid ports parsed from hl_proxy_ports")
            self._proxy_base_url = None
            return

🔒 Security Notes

1. Credential Exposure in Logs

The proxy URL contains username/password. Ensure it's never logged:

Line 77 in hyperliquid_tracker.py:

bt.logging.info(
    f"[HL_{self.label}] Connected to {ValiConfig.HL_MAINNET_WS}"
    + (f" via {self.proxy_url}" if self.proxy_url else " (direct)")  # ⚠️ Contains credentials
)

Fix:

proxy_info = f" via proxy port {self.port}" if self.proxy_url else " (direct)"
bt.logging.info(f"[HL_{self.label}] Connected to {ValiConfig.HL_MAINNET_WS}{proxy_info}")

Similarly in test script at lines 104, 221.


2. secrets.json Validation

Line 341 in hyperliquid_tracker.py:

try:
    secrets = ValiUtils.get_secrets()
except Exception:
    secrets = {}

Silently failing here could lead to confusion. Consider:

try:
    secrets = ValiUtils.get_secrets()
except Exception as e:
    bt.logging.warning(f"[HL_TRACKER] Could not load secrets: {e}")
    secrets = {}

3. Dependency Version Pinning

requirements.txt line 20-21:

websockets==16.0
python-socks>=2.7.1

Good: websockets is pinned.
Issue: python-socks uses >= which could pull breaking changes.

Fix:

python-socks==2.7.1

Or use upper bound:

python-socks>=2.7.1,<3.0.0

4. Test Script Mock Data

Lines 367-378 in test_hl_proxy_sharding.py:

The mock _patch_tracker_dependencies replaces OrderProcessor.process_order globally. If this script is imported (shouldn't be, but defensive coding):

def _patch_tracker_dependencies(secrets: dict):
    """... docstring ..."""
    import sys
    if 'pytest' in sys.modules or 'unittest' in sys.modules:
        raise RuntimeError("Cannot patch dependencies when running under test framework")
    # ... rest of function ...

📋 Additional Notes

Testing Checklist Items Not Addressed

The PR description checkboxes are empty:

  • Has this been tested on testnet?
  • Are there unit tests for shard orchestration logic?
  • Has the proxy failover been tested with intentionally bad ports?

Recommendation: Add unit tests for:

  1. _parse_ports() with edge cases (empty string, invalid ranges)
  2. _assign_addresses_to_shards() logic with various shard health states
  3. Port recovery mechanism (if implemented per suggestion above)

Documentation Gap

The validator.md docs are excellent but missing:

  • What happens when ports are exhausted?
  • How to monitor shard health?
  • Troubleshooting guide for common proxy issues

Suggested addition to docs/validator.md:

### Monitoring Proxy Health

Check tracker status via the validator API:
```bash
curl http://localhost:8000/hl_tracker_status

Unhealthy ports are automatically detected and their addresses redistributed.
If all ports become unhealthy, the tracker will log errors and stop accepting new addresses.

To reset unhealthy ports, restart the validator.


---

### 🎯 Verdict

**Overall Assessment**: This is solid work with good architecture, but has several resource management issues that could cause memory leaks or connection exhaustion in production. The test script is particularly impressive.

**Recommended Actions Before Merge**:
1. ✅ Fix the critical WebSocket cleanup race condition
2. ✅ Implement port recovery or document permanent exclusion behavior
3. ✅ Remove credential exposure from logs
4. ✅ Add unit tests for core orchestration logic
5. ⚠️ Consider implementing the SSL context reuse optimization

**Risk Level**: MEDIUM - Works as designed but may degrade under load or after extended operation due to resource accumulation issues.

@brokespace brokespace force-pushed the feat/hyperscaled branch 4 times, most recently from 48461d5 to 3b5630d Compare March 13, 2026 19:41
@sli-tao sli-tao force-pushed the feat/hyperscaled branch 2 times, most recently from c2d55af to 595b86c Compare April 2, 2026 11:07
@sli-tao sli-tao force-pushed the feat/hyperscaled branch 2 times, most recently from b077a4b to be3cb98 Compare April 3, 2026 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants