-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
119 lines (93 loc) · 3.66 KB
/
server.py
File metadata and controls
119 lines (93 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Colony webhook receiver server.
Receives webhook POSTs from The Colony, verifies signatures, and dispatches
events to handlers defined in handlers.py.
"""
import json
import logging
import os
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from webhook import webhook, verify_signature
# Import handlers to register them
import handlers # noqa: F401
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
LOG_FILE = os.environ.get("LOG_FILE", "webhook.log")
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "")
WEBHOOK_PORT = int(os.environ.get("WEBHOOK_PORT", "8000"))
WEBHOOK_PATH = os.environ.get("WEBHOOK_PATH", "/webhook")
# Logging setup
log_handlers = [logging.StreamHandler(sys.stdout)]
if LOG_FILE:
log_handlers.append(logging.FileHandler(LOG_FILE))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s — %(message)s",
handlers=log_handlers,
)
logger = logging.getLogger("colony-webhook")
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path.rstrip("/") != WEBHOOK_PATH.rstrip("/"):
self._respond(404, {"error": "not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self._respond(400, {"error": "empty body"})
return
body = self.rfile.read(content_length)
# Verify signature
if WEBHOOK_SECRET:
signature = self.headers.get("X-Colony-Signature", "")
if not signature:
logger.warning("Rejected request: missing X-Colony-Signature header")
self._respond(403, {"error": "missing signature"})
return
if not verify_signature(body, WEBHOOK_SECRET, signature):
logger.warning("Rejected request: invalid signature")
self._respond(403, {"error": "invalid signature"})
return
# Parse payload
try:
data = json.loads(body)
except json.JSONDecodeError:
self._respond(400, {"error": "invalid JSON"})
return
event = data.get("event", "")
payload = data.get("payload", data)
logger.info("Received event: %s", event)
# Dispatch to handlers
webhook.dispatch(event, payload)
self._respond(200, {"status": "ok"})
def do_GET(self):
if self.path.rstrip("/") == "/health":
self._respond(200, {
"status": "healthy",
"events": sorted(webhook.registered_events),
})
return
self._respond(404, {"error": "not found"})
def _respond(self, status, body):
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(body).encode())
def log_message(self, format, *args):
# Suppress default access logs, we use our own logging
pass
def main():
if not WEBHOOK_SECRET:
logger.warning(
"WEBHOOK_SECRET is not set — signature verification is DISABLED. "
"Set WEBHOOK_SECRET in your .env file for production use."
)
events = sorted(webhook.registered_events)
logger.info("Registered handlers for events: %s", ", ".join(events) or "(none)")
logger.info("Starting server on port %d at %s", WEBHOOK_PORT, WEBHOOK_PATH)
server = HTTPServer(("0.0.0.0", WEBHOOK_PORT), WebhookHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down")
server.server_close()
if __name__ == "__main__":
main()