Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

*
* Rate limiting for WebSocket connections (configurable via `PUSHI_RATE_LIMIT_MESSAGES` and `PUSHI_RATE_LIMIT_WINDOW`)
* Connection limits: global (`PUSHI_MAX_CONNECTIONS_GLOBAL`), per-IP (`PUSHI_MAX_CONNECTIONS_PER_IP`), and per-app (`PUSHI_MAX_CONNECTIONS_PER_APP`)
* Message size limits (`PUSHI_MAX_MESSAGE_SIZE`)
* Channel and event name length limits (`PUSHI_MAX_CHANNEL_NAME_LENGTH`, `PUSHI_MAX_EVENT_NAME_LENGTH`)
* Subscription limits: channels per socket (`PUSHI_MAX_CHANNELS_PER_SOCKET`) and sockets per channel (`PUSHI_MAX_SOCKETS_PER_CHANNEL`)
* Webhook timeout configuration (`PUSHI_WEBHOOK_TIMEOUT`) and concurrent request limits (`PUSHI_WEBHOOK_MAX_CONCURRENT`)
* Proper error messages sent to clients when limits are exceeded

### Changed

*
* Webhook calls are now asynchronous and non-blocking with configurable timeout
* Improved error handling in disconnect flow to ensure proper cleanup even on failures

### Fixed

*
* Added `@appier.private` decorator to app creation endpoint to require authentication
* Fixed missing input validation in `handle_event` for required fields
* Fixed potential KeyError when event data is missing required fields
* Improved security by validating event names before dynamic method dispatch

## [0.3.1] - 2024-01-17

Expand Down
1 change: 1 addition & 0 deletions src/pushi/app/controllers/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def list(self):
apps = pushi.App.find(map=True)
return dict(apps=apps)

@appier.private
@appier.route("/apps", "POST")
def create(self):
app = pushi.App.new()
Expand Down
38 changes: 36 additions & 2 deletions src/pushi/base/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
from pushi.base import apn
from pushi.base import web

# subscription limit configuration
MAX_CHANNELS_PER_SOCKET = int(os.environ.get("PUSHI_MAX_CHANNELS_PER_SOCKET", "100"))
MAX_SOCKETS_PER_CHANNEL = int(os.environ.get("PUSHI_MAX_SOCKETS_PER_CHANNEL", "10000"))


class AppState(object):
"""
Expand Down Expand Up @@ -254,11 +258,29 @@ def disconnect(self, connection, app_key, socket_id):
# then uses it to retrieve the complete set of channels that the
# socket is subscribed and then unsubscribe it from them then
# removes the reference of the socket in the socket channels map
state = self.get_state(app_key=app_key)
try:
state = self.get_state(app_key=app_key)
except Exception:
# if we can't get the state, nothing to clean up
return

channels = state.socket_channels.get(socket_id, [])
channels = copy.copy(channels)

# unsubscribe from each channel with error handling to ensure
# all channels are attempted even if one fails
for channel in channels:
self.unsubscribe(connection, app_key, socket_id, channel)
try:
self.unsubscribe(connection, app_key, socket_id, channel)
except Exception as exception:
# log the error but continue cleaning up other channels
if self.app:
self.app.logger.warning(
"Error unsubscribing from channel '%s' during disconnect: %s"
% (channel, appier.legacy.UNICODE(exception))
)

# ensure socket is removed from socket_channels even if unsubscribe failed
if socket_id in state.socket_channels:
del state.socket_channels[socket_id]

Expand Down Expand Up @@ -329,6 +351,11 @@ def subscribe(
subscribed = channel in channels
if subscribed:
raise RuntimeError("Channel already subscribed")

# check if socket has reached the maximum number of channels
if len(channels) >= MAX_CHANNELS_PER_SOCKET:
raise RuntimeError("Maximum channels per socket exceeded")

channels.append(channel)
state.socket_channels[socket_id] = channels

Expand All @@ -339,6 +366,13 @@ def subscribe(
subscribed = socket_id in sockets
if subscribed:
raise RuntimeError("Socket already subscribed")

# check if channel has reached the maximum number of sockets
if len(sockets) >= MAX_SOCKETS_PER_CHANNEL:
# remove the channel from the socket's channel list since we can't complete
channels.remove(channel)
raise RuntimeError("Maximum sockets per channel exceeded")

sockets.append(socket_id)
state.channel_sockets[channel] = sockets

Expand Down
110 changes: 87 additions & 23 deletions src/pushi/base/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import os
import json

import netius
import netius.clients

import pushi

from . import handler

# webhook configuration
WEBHOOK_TIMEOUT = int(os.environ.get("PUSHI_WEBHOOK_TIMEOUT", "10"))
WEBHOOK_MAX_CONCURRENT = int(os.environ.get("PUSHI_WEBHOOK_MAX_CONCURRENT", "100"))


class WebHandler(handler.Handler):
"""
Expand All @@ -51,6 +57,7 @@ class WebHandler(handler.Handler):
def __init__(self, owner):
handler.Handler.__init__(self, owner, name="web")
self.subs = {}
self._active_requests = 0

def send(self, app_id, event, json_d, invalid={}):
# retrieves the reference to the app structure associated with the
Expand Down Expand Up @@ -96,17 +103,6 @@ def send(self, app_id, event, json_d, invalid={}):
data = json.dumps(json_d)
headers = {"content-type": "application/json"}

# creates the on message function that is going to be used at the end of
# the request to be able to close the protocol, this is a clojure and so
# current local variables will be exposed to the method
def on_message(protocol, parser, message):
protocol.close()

# creates the on close function that will be responsible for the stopping
# of the loop as defined by the web implementation
def on_finish(protocol):
netius.compat_loop(loop).stop()

# iterates over the complete set of URLs that are going to
# be notified about the message, each of them is going to
# received an HTTP post request with the data
Expand All @@ -117,23 +113,91 @@ def on_finish(protocol):
if url in invalid:
continue

# prints a debug message about the web message that
# is going to be sent (includes URL)
self.logger.debug("Sending POST request to '%s'" % url)
# check if we have reached the maximum concurrent requests
if self._active_requests >= WEBHOOK_MAX_CONCURRENT:
self.logger.warning(
"Maximum concurrent webhook requests reached, skipping '%s'" % url
)
continue

# adds the current URL to the list of invalid items for
# the current message sending stream (do this before sending
# to prevent duplicate sends in case of rapid events)
invalid[url] = True

# send the webhook using netius async callback pattern
self._send_webhook(url, headers, data)

Comment on lines 116 to 130
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition in concurrency limit check.

The check at lines 118-124 reads _active_requests under lock, but by the time _send_webhook_async spawns a thread and increments the counter (lines 141-142), other threads may have already passed the same check. This can allow more concurrent requests than WEBHOOK_MAX_CONCURRENT.

Consider moving the increment and check into a single atomic operation:

🔎 Proposed fix
-            # check if we have reached the maximum concurrent requests
-            with self._lock:
-                if self._active_requests >= WEBHOOK_MAX_CONCURRENT:
-                    self.logger.warning(
-                        "Maximum concurrent webhook requests reached, skipping '%s'"
-                        % url
-                    )
-                    continue
+            # check and reserve a slot atomically
+            with self._lock:
+                if self._active_requests >= WEBHOOK_MAX_CONCURRENT:
+                    self.logger.warning(
+                        "Maximum concurrent webhook requests reached, skipping '%s'"
+                        % url
+                    )
+                    continue
+                self._active_requests += 1

Then in _send_webhook_async, remove the increment in do_send since it's already done:

         def do_send():
-            with self._lock:
-                self._active_requests += 1
-
             try:

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/pushi/base/web.py around lines 117-133, the concurrency check reads
_active_requests under the lock but delays incrementing until the spawned thread
runs, causing a race that can exceed WEBHOOK_MAX_CONCURRENT; modify the code so
the increment and check are done atomically while holding self._lock (i.e.,
acquire the lock, if _active_requests >= WEBHOOK_MAX_CONCURRENT then log and
continue, otherwise increment _active_requests and release the lock) before
marking invalid[url] and spawning the thread; then remove the increment from
inside do_send (leave the decrement in its finally block so the counter is
properly decremented when the send completes).

def _send_webhook(self, url, headers, data):
"""
Sends a webhook POST request using netius async event loop with timeout.
Uses protocol.delay() for timeout handling instead of threading.
"""
self._active_requests += 1
self.logger.debug("Sending POST request to '%s'" % url)

# creates the HTTP protocol to be used in the POST request and
# sets the headers and the data then registers for the message
# event so that the loop and protocol may be closed
# track completion state for timeout handling
completed = [False]

try:
# creates the HTTP protocol to be used in the POST request
loop, protocol = netius.clients.HTTPClient.post_s(
url, headers=headers, data=data
)
protocol.bind("message", on_message)
protocol.bind("finish", on_finish)
loop.run_forever()
except Exception as exception:
self.logger.warning(
"Error creating HTTP request to '%s': %s" % (url, str(exception))
)
self._active_requests -= 1
return

# adds the current URL to the list of invalid items for
# the current message sending stream
invalid[url] = True
def on_message(protocol, parser, message):
protocol.close()

def on_finish(protocol):
completed[0] = True
self._active_requests -= 1
try:
netius.compat_loop(loop).stop()
except Exception:
pass

def on_error(protocol, error):
self.logger.warning("Webhook error for '%s': %s" % (url, str(error)))

def on_timeout():
if not completed[0]:
self.logger.warning(
"Webhook request to '%s' timed out after %d seconds"
% (url, WEBHOOK_TIMEOUT)
)
try:
protocol.close()
netius.compat_loop(loop).stop()
except Exception:
pass
# decrement only if on_finish hasn't run
if not completed[0]:
completed[0] = True
self._active_requests -= 1

# bind event handlers
protocol.bind("message", on_message)
protocol.bind("finish", on_finish)
protocol.bind("error", on_error)

# schedule timeout using netius event loop delay
protocol.delay(on_timeout, timeout=WEBHOOK_TIMEOUT)

try:
loop.run_forever()
except Exception as exception:
self.logger.warning(
"Error sending webhook to '%s': %s" % (url, str(exception))
)
if not completed[0]:
completed[0] = True
self._active_requests -= 1
Comment on lines +192 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's see the actual code in web.py, especially around lines 131-200 and the send() method
head -250 src/pushi/base/web.py | tail -120

Repository: hivesolutions/pushi

Length of output: 4159


🏁 Script executed:

# Let's also look at the imports and class definition to understand the context better
head -50 src/pushi/base/web.py

Repository: hivesolutions/pushi

Length of output: 1567


🏁 Script executed:

# Find the send() method to see how it calls _send_webhook
rg -n "def send\(" src/pushi/base/web.py -A 15

Repository: hivesolutions/pushi

Length of output: 891


🏁 Script executed:

# Look for how the event loop is created and managed
rg -n "loop\s*=" src/pushi/base/web.py -B 2 -A 2

Repository: hivesolutions/pushi

Length of output: 45


🏁 Script executed:

# Get the full send() method to see how _send_webhook is called
sed -n '62,150p' src/pushi/base/web.py

Repository: hivesolutions/pushi

Length of output: 4109


🏁 Script executed:

# Check if there's any threading involved in calling _send_webhook
rg -n "threading\|Thread\|_send_webhook" src/pushi/base/web.py -B 2 -A 2

Repository: hivesolutions/pushi

Length of output: 45


🏁 Script executed:

# Check how netius is used - does it have its own event loop?
rg -n "netius" src/pushi/base/web.py

Repository: hivesolutions/pushi

Length of output: 503


🌐 Web query:

netius HTTPClient.post_s documentation event loop

💡 Result:

netius provides a synchronous "helper" API (methods named *_s, e.g. get_s / post_s) that runs requests via netius's internal event loop and supports both synchronous (blocking) and asynchronous (callback) usage. See the project docs and PyPI page for details. [1][2]

Quick notes:

  • post_s is the same pattern as get_s: you can call it with asynchronous=False to block and return the result, or provide callback/on_data handlers for non‑blocking operation. The request/callbacks run inside netius's event loop. Use HTTPClient.cleanup_s() when you want to stop/cleanup the loop after async calls. [1]
  • Installation and source are on PyPI; the site includes usage examples and links to the project. [2]

Sources:
[1] Netius documentation (HTTP Client — examples and async vs sync usage).
[2] Netius on PyPI (project description and releases).


🌐 Web query:

netius compat_loop what does it do

💡 Result:

I can help — can you paste the code or show where you saw "compat_loop" (file, library version, or a link)? I couldn't find an authoritative reference for "netius compat_loop" and need the exact context to give an accurate explanation.


🏁 Script executed:

# Check if netius imports any threading or if post_s spawns threads internally
grep -n "import" src/pushi/base/web.py | head -20

Repository: hivesolutions/pushi

Length of output: 178


🏁 Script executed:

# Look for any handler/callback thread spawning in the code
rg -n "thread\|Thread\|executor\|spawn" src/pushi/base/web.py

Repository: hivesolutions/pushi

Length of output: 45


🏁 Script executed:

# Check git history to understand the intent - was this intentionally made blocking?
git log --oneline src/pushi/base/web.py | head -5

Repository: hivesolutions/pushi

Length of output: 124


🏁 Script executed:

# Let's look at netius module info if available
python3 << 'EOF'
try:
    import netius
    import inspect
    # Try to understand post_s signature and behavior
    print("netius version:", getattr(netius, "__version__", "unknown"))
    print("\nHTTPClient.post_s signature:")
    print(inspect.signature(netius.clients.HTTPClient.post_s))
except Exception as e:
    print("Error:", e)
EOF

Repository: hivesolutions/pushi

Length of output: 149


loop.run_forever() blocks the caller, serializing all webhook deliveries.

Each call to _send_webhook blocks until completion or timeout. The send() method calls _send_webhook directly in a synchronous loop for each URL—there is no threading or background task execution. This means webhooks are delivered sequentially rather than concurrently. With 10 URLs timing out at 10 seconds each, the total delay is 100 seconds. The _active_requests counter and WEBHOOK_MAX_CONCURRENT limit are therefore ineffective at achieving concurrency.

To achieve true concurrency, consider:

  1. Spawning tasks without waiting (fire-and-forget with the existing netius event loop)
  2. Batching all protocols and running the loop once
  3. Using asyncio with gather() if the runtime supports it
🧰 Tools
🪛 Ruff (0.14.10)

194-194: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In src/pushi/base/web.py around lines 192 to 200, loop.run_forever() blocks the
caller and serializes webhook deliveries so WEBHOOK_MAX_CONCURRENT is
ineffective; instead spawn non-blocking tasks and let the event loop drive them:
create tasks for each send operation (for example with loop.create_task or
scheduling a coroutine) and avoid calling run_forever per webhook, or collect
all send coroutines and await them together (e.g., gather/run_until_complete
once) so deliveries run concurrently; ensure the completion handling still
decrements self._active_requests and logs errors within each task's callback or
exception handler.


def load(self):
subs = pushi.Web.find()
Expand Down
Loading