fix: add security hardening for production stability#5
fix: add security hardening for production stability#5
Conversation
- Add rate limiting for WebSocket connections - Add connection limits (global, per-IP, per-app) - Add message size limits - Add channel/subscription limits - Fix unauthenticated app creation endpoint - Add input validation for events and channels - Make webhook calls asynchronous with timeouts - Improve disconnect error handling for proper cleanup
📝 WalkthroughWalkthroughAdds comprehensive connection and message limits, per-connection rate limiting, per-IP/per-app connection tracking, stricter validation for channels/events/messages, resilient state cleanup, and refactors webhook delivery into an asynchronous, timeout- and concurrency-controlled flow. Also marks app creation endpoint as private. Changes
Sequence Diagram(s)sequenceDiagram
actor Client as WebSocket Client
participant Server as PushiServer
participant State as StateStore
participant WebHandler as WebHandler
participant HTTP as HTTP Client / netius
participant Target as External Webhook
Client->>Server: sends event/message
Server->>Server: validate size, names, rate limit
alt validation fails
Server->>Client: _send_error(error)
else valid
Server->>State: record/lookup subscriptions
State-->>Server: subscription list
Server->>WebHandler: dispatch webhooks (per URL)
rect rgb(220,240,220)
Note over WebHandler: Concurrency & in-flight guard
WebHandler->>WebHandler: if _active_requests >= WEBHOOK_MAX_CONCURRENT -> skip URL
end
alt proceed with send
WebHandler->>HTTP: _send_webhook(url, headers, data)
HTTP->>Target: POST (with timeout)
par success / timeout / error
Target-->>HTTP: response
HTTP-->>WebHandler: on_finish/on_message
and
HTTP-->>WebHandler: on_error or on_timeout
end
WebHandler->>WebHandler: decrement _active_requests, mark URL handled/invalid
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (5)
src/pushi/base/web.py (1)
159-175: Consider logging exceptions in cleanup handlers for debugging.The
on_finishandon_timeouthandlers silently swallow exceptions (lines 161-162 and 174-175). While this prevents cleanup failures from propagating, logging these exceptions at debug level would help diagnose issues in production.🔎 Proposed improvement
def on_finish(protocol): completed.set() try: netius.compat_loop(loop).stop() - except Exception: - pass + except Exception as e: + self.logger.debug("Error stopping loop in on_finish: %s" % e) # creates a timeout function to stop the request if it takes too long def on_timeout(): if not completed.is_set(): self.logger.warning( "Webhook request to '%s' timed out after %d seconds" % (url, WEBHOOK_TIMEOUT) ) try: protocol.close() netius.compat_loop(loop).stop() - except Exception: - pass + except Exception as e: + self.logger.debug("Error during timeout cleanup: %s" % e)src/pushi/base/state.py (2)
369-374: Rollback logic is correct but could leave partial state on exception.The rollback at line 373 correctly removes the channel from the socket's channel list when the per-channel socket limit is exceeded. However, if
channels.remove(channel)raises an exception (e.g., if the channel was somehow removed by another thread), the state would be inconsistent.Consider wrapping the rollback in a try-except for additional safety, though this is a low-risk edge case in practice.
52-55: Duplicate constant definitions withserver.py.
MAX_CHANNELS_PER_SOCKETandMAX_SOCKETS_PER_CHANNELare independently defined in bothsrc/pushi/net/server.py(lines 43-44) andsrc/pushi/base/state.py(lines 53-54). Both read from identical environment variables with identical defaults, but neither file imports from the other. This creates maintenance risk—future edits to one definition could inadvertently diverge from the other.Consolidate these constants in a single shared module and import from there.
src/pushi/net/server.py (2)
228-232: Chain exception for better debugging.Per static analysis, the exception at line 232 should chain from the original exception to preserve the traceback.
🔎 Proposed fix
try: data = data.decode("utf-8") json_d = json.loads(data) except Exception as exception: - raise netius.DataError("Invalid message received '%s'" % data) + raise netius.DataError("Invalid message received '%s'" % data) from exception
185-193: Consider logging exceptions in _send_error for debugging.The silent exception handling at lines 192-193 is appropriate since the connection may already be closed, but logging at debug level would help diagnose issues.
🔎 Proposed improvement
def _send_error(self, connection, message): """ Sends an error message to the connection. """ json_d = dict(event="pusher:error", data=json.dumps(dict(message=message))) try: connection.send_pushi(json_d) - except Exception: - pass + except Exception as e: + self.debug("Failed to send error to connection: %s" % e)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
CHANGELOG.mdsrc/pushi/app/controllers/app.pysrc/pushi/base/state.pysrc/pushi/base/web.pysrc/pushi/net/server.py
🧰 Additional context used
🧬 Code graph analysis (1)
src/pushi/base/web.py (1)
src/pushi/net/client.py (2)
on_message(316-318)timer(339-341)
🪛 Ruff (0.14.10)
src/pushi/base/web.py
152-152: Unused function argument: parser
(ARG001)
152-152: Unused function argument: message
(ARG001)
157-157: Unused function argument: protocol
(ARG001)
161-162: try-except-pass detected, consider logging the exception
(S110)
161-161: Do not catch blind exception: Exception
(BLE001)
174-175: try-except-pass detected, consider logging the exception
(S110)
174-174: Do not catch blind exception: Exception
(BLE001)
193-193: Do not catch blind exception: Exception
(BLE001)
200-200: Do not catch blind exception: Exception
(BLE001)
src/pushi/base/state.py
263-263: Do not catch blind exception: Exception
(BLE001)
275-275: Do not catch blind exception: Exception
(BLE001)
357-357: Avoid specifying long messages outside the exception class
(TRY003)
374-374: Avoid specifying long messages outside the exception class
(TRY003)
src/pushi/net/server.py
192-193: try-except-pass detected, consider logging the exception
(S110)
192-192: Do not catch blind exception: Exception
(BLE001)
231-231: Do not catch blind exception: Exception
(BLE001)
232-232: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🔇 Additional comments (7)
src/pushi/app/controllers/app.py (1)
43-48: Good security fix: App creation endpoint now requires authentication.Adding
@appier.privateto thecreatemethod ensures the POST/appsendpoint requires authentication, closing a security gap where apps could be created without proper authorization.CHANGELOG.md (1)
10-30: Comprehensive changelog documentation.The changelog entries properly document all the new configuration options, behavioral changes, and security fixes introduced in this PR. The environment variable names are consistently documented, which aids deployment configuration.
src/pushi/base/web.py (1)
208-211: Daemon threads will terminate on shutdown.Marking webhook threads as daemon (line 210) means in-flight webhook requests will be aborted if the server shuts down. This is acceptable for non-blocking behavior but worth documenting. If webhook delivery guarantees are needed, consider adding a graceful shutdown mechanism that waits for active requests.
src/pushi/base/state.py (1)
261-285: Good defensive error handling in disconnect flow.The enhanced disconnect logic properly:
- Handles failures in
get_stategracefully (lines 261-265)- Continues cleanup even if individual channel unsubscriptions fail (lines 272-281)
- Ensures
socket_channelscleanup regardless of unsubscribe errors (lines 283-285)This prevents a single failure from leaving the system in an inconsistent state.
src/pushi/net/server.py (3)
67-86: Rate limiting implementation is correct.The sliding window rate limiter properly filters expired timestamps and enforces the message limit. For the default configuration (60 messages/60 seconds), the list size is bounded and performance is acceptable.
For very high-throughput scenarios, consider using
collections.dequewithmaxlenfor automatic bounding, but the current approach is sufficient for typical WebSocket usage.
157-183: Connection disconnect cleanup is thorough.The cleanup logic properly:
- Removes socket from tracking (lines 161-162)
- Decrements and removes IP tracking entries (lines 165-169)
- Decrements and removes app tracking entries (lines 172-176)
- Triggers disconnect event for state cleanup (lines 178-183)
The defensive checks before decrementing prevent KeyError exceptions.
349-376: Good input validation for handle_event.The validation properly checks:
- Presence of required fields (data, event, channel)
- Type validation for channel and event (must be strings)
- Length limits for channel and event names
The
datafield is not type-checked, which is correct since event data can be any JSON-serializable value.
| # check if we have reached the maximum concurrent requests | ||
| with self._lock: | ||
| if self._active_requests >= WEBHOOK_MAX_CONCURRENT: | ||
| self.logger.warning( | ||
| "Maximum concurrent webhook requests reached, skipping '%s'" | ||
| % url | ||
| ) | ||
| continue | ||
|
|
||
| # adds the current URL to the list of invalid items for | ||
| # the current message sending stream | ||
| # the current message sending stream (do this before sending | ||
| # to prevent duplicate sends in case of rapid events) | ||
| invalid[url] = True | ||
|
|
||
| # send the webhook in a separate thread to avoid blocking | ||
| self._send_webhook_async(url, headers, data) | ||
|
|
There was a problem hiding this comment.
Race condition in concurrency limit check.
The check at lines 118-124 reads _active_requests under lock, but by the time _send_webhook_async spawns a thread and increments the counter (lines 141-142), other threads may have already passed the same check. This can allow more concurrent requests than WEBHOOK_MAX_CONCURRENT.
Consider moving the increment and check into a single atomic operation:
🔎 Proposed fix
- # check if we have reached the maximum concurrent requests
- with self._lock:
- if self._active_requests >= WEBHOOK_MAX_CONCURRENT:
- self.logger.warning(
- "Maximum concurrent webhook requests reached, skipping '%s'"
- % url
- )
- continue
+ # check and reserve a slot atomically
+ with self._lock:
+ if self._active_requests >= WEBHOOK_MAX_CONCURRENT:
+ self.logger.warning(
+ "Maximum concurrent webhook requests reached, skipping '%s'"
+ % url
+ )
+ continue
+ self._active_requests += 1Then in _send_webhook_async, remove the increment in do_send since it's already done:
def do_send():
- with self._lock:
- self._active_requests += 1
-
try:Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/pushi/base/web.py around lines 117-133, the concurrency check reads
_active_requests under the lock but delays incrementing until the spawned thread
runs, causing a race that can exceed WEBHOOK_MAX_CONCURRENT; modify the code so
the increment and check are done atomically while holding self._lock (i.e.,
acquire the lock, if _active_requests >= WEBHOOK_MAX_CONCURRENT then log and
continue, otherwise increment _active_requests and release the lock) before
marking invalid[url] and spawning the thread; then remove the increment from
inside do_send (leave the decrement in its finally block so the counter is
properly decremented when the send completes).
| # check per-app connection limit | ||
| app_key = connection.app_key | ||
| if app_key: | ||
| app_count = self.connections_by_app.get(app_key, 0) | ||
| if app_count >= MAX_CONNECTIONS_PER_APP: | ||
| self._send_error(connection, "Per-app connection limit exceeded") | ||
| connection.close() | ||
| return | ||
| self.connections_by_app[app_key] = app_count + 1 |
There was a problem hiding this comment.
Per-app connection limit check may be ineffective at connection time.
At line 137, connection.app_key is checked for per-app limits, but the app key is only set in load_app() which is called during on_handshake() (line 200). At on_connection_c() time, app_key will still be None, so per-app limits won't be enforced here.
Consider moving the per-app limit check to on_handshake() after load_app() is called, or document that per-app limits are enforced elsewhere.
🔎 Proposed fix - move per-app check to on_handshake
def on_handshake(self, connection):
netius.servers.WSServer.on_handshake(self, connection)
connection.load_app()
+
+ # check per-app connection limit now that app_key is loaded
+ app_key = connection.app_key
+ if app_key:
+ app_count = self.connections_by_app.get(app_key, 0)
+ if app_count >= MAX_CONNECTIONS_PER_APP:
+ self._send_error(connection, "Per-app connection limit exceeded")
+ connection.close()
+ return
+ self.connections_by_app[app_key] = app_count + 1
json_d = dict(
event="pusher:connection_established",And remove the per-app check from on_connection_c().
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/pushi/net/server.py around lines 136-144, the per-app connection limit is
checked using connection.app_key but that attribute is only populated in
load_app() during on_handshake(), so the check is ineffective at
on_connection_c(); remove the per-app check from on_connection_c and instead
perform the per-app limit check in on_handshake immediately after load_app() has
set connection.app_key (reject and close the connection if the app_count >=
MAX_CONNECTIONS_PER_APP), and ensure you update connections_by_app increment on
successful accept and decrement on connection close (and guard these updates
with existing locking/synchronization used for connections_by_app).
| # check channel subscription limit per socket | ||
| if len(connection.channels) >= MAX_CHANNELS_PER_SOCKET: | ||
| self._send_error(connection, "Maximum channels per socket exceeded") | ||
| return |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if connection.channels is populated anywhere
echo "=== Searching for connection.channels updates ==="
rg -n "connection\.channels" --type py -C 3
echo ""
echo "=== Searching for .channels.append or similar ==="
rg -n "\.channels\.(append|extend|add)" --type py
echo ""
echo "=== Check the structure of the connection object and channels attribute ==="
rg -n "\.channels\s*=" --type py -C 2Repository: hivesolutions/pushi
Length of output: 1576
🏁 Script executed:
# Read the handle_pusher_subscribe method
sed -n '240,300p' src/pushi/net/server.py | cat -nRepository: hivesolutions/pushi
Length of output: 2614
🏁 Script executed:
# Search for handlers of the "subscribe" event
echo "=== Searching for subscribe event handler ==="
rg -n "subscribe" --type py -C 2 | head -100
echo ""
echo "=== Looking for where connection.channels should be appended ==="
rg -n "append|add.*channel" --type py -C 2Repository: hivesolutions/pushi
Length of output: 11410
🏁 Script executed:
# Find where state.subscribe is called
echo "=== Searching for state.subscribe calls ==="
rg -n "state\.subscribe" --type py -C 3
echo ""
echo "=== Read the full subscribe method in state.py ==="
sed -n '340,380p' src/pushi/base/state.py | cat -nRepository: hivesolutions/pushi
Length of output: 2362
Remove or fix the ineffective channel limit check at line 276.
The check len(connection.channels) >= MAX_CHANNELS_PER_SOCKET is broken because connection.channels is initialized as an empty list but never populated during subscription. This means the limit check always passes, offering no actual protection. The real limit enforcement happens in state.subscribe() using state.socket_channels, making this check redundant. Either remove it or update connection.channels when subscriptions occur.
🤖 Prompt for AI Agents
In src/pushi/net/server.py around lines 275-278, the check using
len(connection.channels) is ineffective because connection.channels is
initialized but never updated during subscription; either remove this redundant
check entirely (since state.subscribe enforces the limit via
state.socket_channels) or make it consistent by updating connection.channels
whenever a subscription/unsubscription happens (i.e., append the channel on
successful subscribe and remove it on unsubscribe) so the local check becomes
valid; if you choose to update connection.channels, ensure updates occur at the
same points state.subscribe/state.unsubscribe mutate state.socket_channels and
add corresponding error handling and tests.
Replace threading-based webhook handling with native netius async event loop patterns: - Use protocol.delay() for timeout instead of threading.Timer - Remove threading dependency from web handler - Bind error event handler for better error reporting - Simpler concurrency tracking without locks
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/pushi/base/web.py (1)
116-121: Race condition between concurrency check and increment persists.The check at line 117 and increment at line 136 are not atomic. Multiple concurrent calls to
send()could pass the check before any increments_active_requests, allowing more thanWEBHOOK_MAX_CONCURRENTrequests. The previous review's suggestion to atomically check and increment was not applied.Note: If
send()is only ever called from a single event loop thread, this may be safe. However, if multiple coroutines or threads can invokesend()concurrently, the limit can be exceeded.
🧹 Nitpick comments (2)
src/pushi/base/web.py (2)
168-182: Timeout handler has correct double-check pattern but could be cleaner.The double-check of
completed[0]at lines 169 and 180 correctly prevents double-decrement. However, the outer check at line 169 could still race withon_finishsettingcompleted[0] = Truebetween lines 169 and 180. Since line 180 re-verifies before decrementing, this is safe.Consider simplifying by removing the redundant outer check or adding a brief comment explaining the race-safe design.
🔎 Suggested simplification
def on_timeout(): - if not completed[0]: - self.logger.warning( - "Webhook request to '%s' timed out after %d seconds" - % (url, WEBHOOK_TIMEOUT) - ) - try: - protocol.close() - netius.compat_loop(loop).stop() - except Exception: - pass - # decrement only if on_finish hasn't run - if not completed[0]: - completed[0] = True - self._active_requests -= 1 + # Race-safe: only decrement if on_finish hasn't completed + if completed[0]: + return + self.logger.warning( + "Webhook request to '%s' timed out after %d seconds" + % (url, WEBHOOK_TIMEOUT) + ) + try: + protocol.close() + netius.compat_loop(loop).stop() + except Exception: + pass + # Final check before decrement (on_finish may have run during close) + if not completed[0]: + completed[0] = True + self._active_requests -= 1
160-163: Consider logging suppressed exceptions at debug level.The silent
except Exception: passwhen stopping the loop makes debugging harder. Even a debug-level log would help trace issues.🔎 Proposed change
try: netius.compat_loop(loop).stop() - except Exception: - pass + except Exception as e: + self.logger.debug("Error stopping loop in on_finish: %s" % e)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/pushi/base/web.py
🧰 Additional context used
🪛 Ruff (0.14.10)
src/pushi/base/web.py
147-147: Do not catch blind exception: Exception
(BLE001)
154-154: Unused function argument: parser
(ARG001)
154-154: Unused function argument: message
(ARG001)
157-157: Unused function argument: protocol
(ARG001)
162-163: try-except-pass detected, consider logging the exception
(S110)
162-162: Do not catch blind exception: Exception
(BLE001)
165-165: Unused function argument: protocol
(ARG001)
177-178: try-except-pass detected, consider logging the exception
(S110)
177-177: Do not catch blind exception: Exception
(BLE001)
194-194: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (3)
src/pushi/base/web.py (3)
41-43: Configuration constants look reasonable.The environment-based configuration with sensible defaults (10s timeout, 100 concurrent) is appropriate. Module-load-time failure on invalid values is acceptable behavior.
57-60: LGTM!Counter initialization for tracking active requests is straightforward.
154-156: Unused callback parameters are acceptable.The
parserandmessageparameters inon_messageare required by the netius callback signature. Static analysis flags these (ARG001), but they're necessary for the event binding contract.
| try: | ||
| loop.run_forever() | ||
| except Exception as exception: | ||
| self.logger.warning( | ||
| "Error sending webhook to '%s': %s" % (url, str(exception)) | ||
| ) | ||
| if not completed[0]: | ||
| completed[0] = True | ||
| self._active_requests -= 1 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's see the actual code in web.py, especially around lines 131-200 and the send() method
head -250 src/pushi/base/web.py | tail -120Repository: hivesolutions/pushi
Length of output: 4159
🏁 Script executed:
# Let's also look at the imports and class definition to understand the context better
head -50 src/pushi/base/web.pyRepository: hivesolutions/pushi
Length of output: 1567
🏁 Script executed:
# Find the send() method to see how it calls _send_webhook
rg -n "def send\(" src/pushi/base/web.py -A 15Repository: hivesolutions/pushi
Length of output: 891
🏁 Script executed:
# Look for how the event loop is created and managed
rg -n "loop\s*=" src/pushi/base/web.py -B 2 -A 2Repository: hivesolutions/pushi
Length of output: 45
🏁 Script executed:
# Get the full send() method to see how _send_webhook is called
sed -n '62,150p' src/pushi/base/web.pyRepository: hivesolutions/pushi
Length of output: 4109
🏁 Script executed:
# Check if there's any threading involved in calling _send_webhook
rg -n "threading\|Thread\|_send_webhook" src/pushi/base/web.py -B 2 -A 2Repository: hivesolutions/pushi
Length of output: 45
🏁 Script executed:
# Check how netius is used - does it have its own event loop?
rg -n "netius" src/pushi/base/web.pyRepository: hivesolutions/pushi
Length of output: 503
🌐 Web query:
netius HTTPClient.post_s documentation event loop
💡 Result:
netius provides a synchronous "helper" API (methods named *_s, e.g. get_s / post_s) that runs requests via netius's internal event loop and supports both synchronous (blocking) and asynchronous (callback) usage. See the project docs and PyPI page for details. [1][2]
Quick notes:
- post_s is the same pattern as get_s: you can call it with asynchronous=False to block and return the result, or provide callback/on_data handlers for non‑blocking operation. The request/callbacks run inside netius's event loop. Use HTTPClient.cleanup_s() when you want to stop/cleanup the loop after async calls. [1]
- Installation and source are on PyPI; the site includes usage examples and links to the project. [2]
Sources:
[1] Netius documentation (HTTP Client — examples and async vs sync usage).
[2] Netius on PyPI (project description and releases).
🌐 Web query:
netius compat_loop what does it do
💡 Result:
I can help — can you paste the code or show where you saw "compat_loop" (file, library version, or a link)? I couldn't find an authoritative reference for "netius compat_loop" and need the exact context to give an accurate explanation.
🏁 Script executed:
# Check if netius imports any threading or if post_s spawns threads internally
grep -n "import" src/pushi/base/web.py | head -20Repository: hivesolutions/pushi
Length of output: 178
🏁 Script executed:
# Look for any handler/callback thread spawning in the code
rg -n "thread\|Thread\|executor\|spawn" src/pushi/base/web.pyRepository: hivesolutions/pushi
Length of output: 45
🏁 Script executed:
# Check git history to understand the intent - was this intentionally made blocking?
git log --oneline src/pushi/base/web.py | head -5Repository: hivesolutions/pushi
Length of output: 124
🏁 Script executed:
# Let's look at netius module info if available
python3 << 'EOF'
try:
import netius
import inspect
# Try to understand post_s signature and behavior
print("netius version:", getattr(netius, "__version__", "unknown"))
print("\nHTTPClient.post_s signature:")
print(inspect.signature(netius.clients.HTTPClient.post_s))
except Exception as e:
print("Error:", e)
EOFRepository: hivesolutions/pushi
Length of output: 149
loop.run_forever() blocks the caller, serializing all webhook deliveries.
Each call to _send_webhook blocks until completion or timeout. The send() method calls _send_webhook directly in a synchronous loop for each URL—there is no threading or background task execution. This means webhooks are delivered sequentially rather than concurrently. With 10 URLs timing out at 10 seconds each, the total delay is 100 seconds. The _active_requests counter and WEBHOOK_MAX_CONCURRENT limit are therefore ineffective at achieving concurrency.
To achieve true concurrency, consider:
- Spawning tasks without waiting (fire-and-forget with the existing netius event loop)
- Batching all protocols and running the loop once
- Using
asynciowithgather()if the runtime supports it
🧰 Tools
🪛 Ruff (0.14.10)
194-194: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In src/pushi/base/web.py around lines 192 to 200, loop.run_forever() blocks the
caller and serializes webhook deliveries so WEBHOOK_MAX_CONCURRENT is
ineffective; instead spawn non-blocking tasks and let the event loop drive them:
create tasks for each send operation (for example with loop.create_task or
scheduling a coroutine) and avoid calling run_forever per webhook, or collect
all send coroutines and await them together (e.g., gather/run_until_complete
once) so deliveries run concurrently; ensure the completion handling still
decrements self._active_requests and logs errors within each task's callback or
exception handler.
Summary by CodeRabbit
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.