From 893e344c0c96582e3f0c818e3aecfc723f713244 Mon Sep 17 00:00:00 2001 From: KHA Entertaiment Date: Sat, 21 Mar 2026 22:00:56 -0700 Subject: [PATCH 1/2] fix: use proper /hooks/wake endpoint format for Gateway notifications - Add notifications.py module with correct webhook payload format - Use /hooks/wake endpoint with 'text' field instead of incorrect /hooks/ocbs-proceed endpoint that required custom hook mapping - Add NotificationManager class with proper authentication handling - Add file-based fallback when webhook fails - Update serve.py to use new notifications module - Add notification config methods to integration.py Fixes #2 - 'hook mapping requires message' error Co-Authored-By: Claude Opus 4.6 --- src/ocbs/integration.py | 56 ++++++- src/ocbs/notifications.py | 320 ++++++++++++++++++++++++++++++++++++++ src/ocbs/serve.py | 42 ++--- 3 files changed, 386 insertions(+), 32 deletions(-) create mode 100644 src/ocbs/notifications.py diff --git a/src/ocbs/integration.py b/src/ocbs/integration.py index 7666244..71b7702 100644 --- a/src/ocbs/integration.py +++ b/src/ocbs/integration.py @@ -147,7 +147,7 @@ def should_auto_restore(self) -> bool: def get_integration_status(self) -> dict: """Get full integration status.""" config = self.get_config() - + return { 'auto_backup': { 'enabled': config.get('auto_backup_enabled', False), @@ -160,5 +160,59 @@ def get_integration_status(self) -> dict: }, 'heartbeat': { 'enabled': config.get('heartbeat_check_enabled', False) + }, + 'notifications': { + 'enabled': config.get('notification_enabled', False), + 'webhook_url': config.get('webhook_url', 'http://127.0.0.1:18789/hooks/wake'), + 'webhook_host': config.get('webhook_host', '127.0.0.1'), } } + + def get_notification_config(self) -> dict: + """Get notification configuration.""" + config = self.get_config() + return { + 'notification_enabled': config.get('notification_enabled', False), + 'webhook_url': config.get('webhook_url', 'http://127.0.0.1:18789/hooks/wake'), + 'webhook_host': config.get('webhook_host', '127.0.0.1'), + 'webhook_token': config.get('webhook_token', ''), + } + + def save_notification_config(self, notification_config: dict) -> bool: + """Save notification configuration. + + Args: + notification_config: Dict with notification settings + + Returns: + True if saved successfully + """ + config = self.get_config() + config['notification_enabled'] = notification_config.get('notification_enabled', False) + config['webhook_url'] = notification_config.get('webhook_url', 'http://127.0.0.1:18789/hooks/wake') + config['webhook_host'] = notification_config.get('webhook_host', '127.0.0.1') + config['webhook_token'] = notification_config.get('webhook_token', '') + self.save_config(config) + return True + + def setup_notifications(self, enabled: bool = True, webhook_host: str = '127.0.0.1') -> str: + """Configure notification settings. + + Args: + enabled: Enable webhook notifications + webhook_host: Host where OpenClaw Gateway is running + + Returns: + Status message + """ + config = self.get_config() + config['notification_enabled'] = enabled + config['webhook_host'] = webhook_host + # Default port is 18789 (OpenClaw default) + config['webhook_url'] = f'http://{webhook_host}:18789/hooks/wake' + self.save_config(config) + + if enabled: + return f'Notifications enabled - Gateway webhook at {config["webhook_url"]}' + else: + return 'Notifications disabled' diff --git a/src/ocbs/notifications.py b/src/ocbs/notifications.py new file mode 100644 index 0000000..21dd3ed --- /dev/null +++ b/src/ocbs/notifications.py @@ -0,0 +1,320 @@ +""" +Notification module for OCBS to send webhook notifications to OpenClaw Gateway. + +Uses the /hooks/wake endpoint which requires a simple 'text' field. +Does NOT require custom hook mapping configuration. +""" + +import json +import logging +import os +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +class NotificationManager: + """Manages notifications to OpenClaw Gateway via webhook or file-based fallback.""" + + # Default Gateway port for OpenClaw + DEFAULT_GATEWAY_PORT = 18789 + + def __init__(self, state_dir: Optional[Path] = None): + self.state_dir = state_dir or Path.home() / ".config" / "ocbs" + self.state_dir.mkdir(parents=True, exist_ok=True) + + def _get_gateway_url(self) -> str: + """Get the Gateway webhook URL from environment or default.""" + # Check for explicit URL first + if os.environ.get("OCBS_WEBHOOK_URL"): + return os.environ["OCBS_WEBHOOK_URL"] + + # Check for custom port + port = os.environ.get("OPENCLAW_GATEWAY_PORT", self.DEFAULT_GATEWAY_PORT) + + # Check for localhost vs remote + host = os.environ.get("OCBS_WEBHOOK_HOST", "127.0.0.1") + + return f"http://{host}:{port}/hooks/wake" + + def _get_auth_token(self) -> Optional[str]: + """Get authentication token from environment or config.""" + # Check environment variable first + token = os.environ.get("OCBS_WEBHOOK_TOKEN") + if token: + return token + + # Try to get from OpenClaw config + openclaw_config = Path.home() / ".openclaw" / "openclaw.json" + if openclaw_config.exists(): + try: + with open(openclaw_config) as f: + config = json.load(f) + # Try gateway auth token + token = config.get("gateway", {}).get("auth", {}).get("token") + if token: + return token + except (json.JSONDecodeError, OSError): + pass + + return None + + def _send_webhook(self, payload: dict) -> bool: + """Send webhook to OpenClaw Gateway /hooks/wake endpoint. + + Payload format for /hooks/wake: + { + "text": "Notification message", # required + "mode": "now" # optional: "now" or "next-heartbeat" + } + + Returns True if successful, False otherwise. + """ + import urllib.request + import urllib.error + + url = self._get_gateway_url() + token = self._get_auth_token() + + data = json.dumps(payload).encode("utf-8") + + headers = { + "Content-Type": "application/json", + } + + if token: + headers["Authorization"] = f"Bearer {token}" + + try: + req = urllib.request.Request(url, data=data, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=5) as response: + if 200 <= response.status < 300: + logger.info(f"Webhook notification sent successfully: {response.status}") + return True + else: + logger.warning(f"Webhook returned non-success status: {response.status}") + return False + except urllib.error.HTTPError as e: + logger.warning(f"Webhook HTTP error: {e.code} - {e.reason}") + return False + except urllib.error.URLError as e: + logger.warning(f"Webhook connection failed: {e.reason}") + return False + except Exception as e: + logger.warning(f"Webhook unexpected error: {e}") + return False + + def _write_file_notification(self, notification: dict) -> bool: + """Write notification to file as fallback when webhook fails.""" + try: + notify_dir = self.state_dir / "notifications" + notify_dir.mkdir(parents=True, exist_ok=True) + + # Use timestamp-based filename to avoid collisions + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + notify_file = notify_dir / f"ocbs_{timestamp}.json" + + notification["_ocbs_meta"] = { + "created_at": datetime.now().isoformat(), + "type": "webhook_fallback", + } + + with open(notify_file, "w") as f: + json.dump(notification, f, indent=2) + + logger.info(f"File notification written: {notify_file}") + return True + except Exception as e: + logger.error(f"Failed to write file notification: {e}") + return False + + def notify( + self, + text: str, + mode: str = "now", + allow_fallback: bool = True, + ) -> bool: + """Send a notification to OpenClaw Gateway. + + Args: + text: The notification message (required for /hooks/wake endpoint) + mode: "now" for immediate wake, "next-heartbeat" for deferred + allow_fallback: Whether to write file notification if webhook fails + + Returns: + True if notification was sent successfully (via webhook or file) + """ + # Validate text is not empty + if not text or not text.strip(): + logger.warning("Cannot send notification with empty text") + return False + + # Send to /hooks/wake with proper payload format + payload = { + "text": text.strip(), + "mode": mode, + } + + if self._send_webhook(payload): + return True + + # Fallback to file-based notification + if allow_fallback: + logger.info("Falling back to file notification") + return self._write_file_notification({"text": text, "mode": mode}) + + return False + + def notify_backup_complete( + self, + backup_id: str, + scope: str, + file_count: int, + reason: str = "", + ) -> bool: + """Send backup completion notification. + + Args: + backup_id: The backup identifier + scope: Backup scope (minimal, config, etc.) + file_count: Number of files backed up + reason: Optional reason for backup + + Returns: + True if notification sent successfully + """ + text = f"OCBS Backup completed - ID: {backup_id}, Scope: {scope}, Files: {file_count}" + if reason: + text = f"{text}, Reason: {reason}" + + return self.notify(text) + + def notify_restore_complete( + self, + backup_id: str, + file_count: int, + target_dir: str = "~/.openclaw", + ) -> bool: + """Send restore completion notification. + + Args: + backup_id: The backup that was restored + file_count: Number of files restored + target_dir: Where files were restored to + + Returns: + True if notification sent successfully + """ + text = f"OCBS Restore completed - Backup: {backup_id}, Files: {file_count}, Target: {target_dir}" + return self.notify(text) + + def notify_checkpoint_created( + self, + checkpoint_id: str, + backup_id: str, + reason: str = "", + ) -> bool: + """Send checkpoint creation notification. + + Args: + checkpoint_id: The checkpoint identifier + backup_id: The backup this checkpoint references + reason: Optional reason for checkpoint + + Returns: + True if notification sent successfully + """ + text = f"OCBS Checkpoint created - ID: {checkpoint_id}, Backup: {backup_id}" + if reason: + text = f"{text}, Reason: {reason}" + + return self.notify(text) + + def notify_proceed(self, checkpoint_id: str, token: str) -> bool: + """Send proceed notification when user acknowledges checkpoint. + + This uses the standard /hooks/wake endpoint with a simple status message. + The OpenClaw agent should monitor for proceed notifications via polling + the file-based notification directory or by being woken up. + + Args: + checkpoint_id: The checkpoint that was acknowledged + token: The proceed token + + Returns: + True if notification sent successfully + """ + text = f"OCBS Proceed: User acknowledged checkpoint {checkpoint_id}" + success = self.notify(text) + + # Also write a specific proceed notification file for agent polling + if success: + try: + notify_dir = self.state_dir / "proceed_notifications" + notify_dir.mkdir(parents=True, exist_ok=True) + proceed_file = notify_dir / f"{token}.json" + proceed_data = { + "token": token, + "checkpoint_id": checkpoint_id, + "proceeded_at": datetime.now().isoformat(), + "status": "pending_agent_poll", + } + with open(proceed_file, "w") as f: + json.dump(proceed_data, f, indent=2) + except Exception as e: + logger.warning(f"Failed to write proceed notification file: {e}") + + return success + + def test_notification(self) -> bool: + """Send a test notification to verify Gateway connectivity. + + Returns: + True if test notification was received successfully + """ + return self.notify("OCBS Test notification - Gateway connectivity verified", mode="now") + + +# Module-level convenience functions +_default_manager: Optional[NotificationManager] = None + + +def get_notification_manager() -> NotificationManager: + """Get the default notification manager instance.""" + global _default_manager + if _default_manager is None: + _default_manager = NotificationManager() + return _default_manager + + +def notify(text: str, mode: str = "now") -> bool: + """Send a notification using the default manager.""" + return get_notification_manager().notify(text, mode) + + +def notify_backup_complete(backup_id: str, scope: str, file_count: int, reason: str = "") -> bool: + """Send backup complete notification using the default manager.""" + return get_notification_manager().notify_backup_complete(backup_id, scope, file_count, reason) + + +def notify_restore_complete(backup_id: str, file_count: int, target_dir: str = "~/.openclaw") -> bool: + """Send restore complete notification using the default manager.""" + return get_notification_manager().notify_restore_complete(backup_id, file_count, target_dir) + + +def notify_checkpoint_created(checkpoint_id: str, backup_id: str, reason: str = "") -> bool: + """Send checkpoint created notification using the default manager.""" + return get_notification_manager().notify_checkpoint_created(checkpoint_id, backup_id, reason) + + +def notify_proceed(checkpoint_id: str, token: str) -> bool: + """Send proceed notification using the default manager.""" + return get_notification_manager().notify_proceed(checkpoint_id, token) + + +def test_notification() -> bool: + """Send a test notification using the default manager.""" + return get_notification_manager().test_notification() \ No newline at end of file diff --git a/src/ocbs/serve.py b/src/ocbs/serve.py index 7fec8fe..c0f460d 100644 --- a/src/ocbs/serve.py +++ b/src/ocbs/serve.py @@ -11,6 +11,7 @@ from urllib.parse import urlencode from .core import OCBSCore +from .notifications import NotificationManager def get_tailscale_ip() -> Optional[str]: """Get Tailscale IP address if available.""" @@ -211,37 +212,16 @@ def _write_proceed_notification(self, token: str, checkpoint_id: str = None): json.dump(notification, f) def _send_webhook_notification(self, token: str, checkpoint_id: str = None): - """Send webhook notification to gateway when user clicks proceed.""" - import urllib.request - import urllib.error - - # Get webhook URL from environment or use default - webhook_url = os.environ.get('OCBS_WEBHOOK_URL', 'http://localhost:18789/hooks/ocbs-proceed') - webhook_token = os.environ.get('OCBS_WEBHOOK_TOKEN', 'ocbs-webhook-secret') - - payload = json.dumps({ - "message": f"OCBS Proceed: User acknowledged checkpoint {checkpoint_id}. Token: {token}", - "token": token, - "checkpoint_id": checkpoint_id, - "action": "proceed" - }).encode('utf-8') - - req = urllib.request.Request( - webhook_url, - data=payload, - headers={ - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {webhook_token}' - }, - method='POST' - ) - - try: - with urllib.request.urlopen(req, timeout=5) as response: - print(f"Webhook notification sent: {response.status}") - except urllib.error.URLError as e: - print(f"Webhook notification failed: {e}") - # Fall back to file notification + """Send webhook notification to gateway when user clicks proceed. + + Uses the proper /hooks/wake endpoint with 'text' field format + instead of the incorrect /hooks/ocbs-proceed endpoint. + """ + notifier = NotificationManager(state_dir=self.state_dir) + success = notifier.notify_proceed(checkpoint_id, token) + + if not success: + # Fall back to file notification if webhook fails self._write_proceed_notification(token, checkpoint_id) def get_pending_proceed_notifications(self) -> list[dict]: From 90fa73b9470e6cb321e6edcaefbf10728d030bd0 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:55:01 +0000 Subject: [PATCH 2/2] fix: apply CodeRabbit auto-fixes Fixed 2 file(s) based on 2 unresolved review comments. Co-authored-by: CodeRabbit --- pyproject.toml | 4 ++-- src/ocbs/notifications.py | 44 +++++++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c64cdb8..d7aa674 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ ] [project.optional-dependencies] -clawhub = "clawhub>=0.1.0" +clawhub = ["clawhub>=0.1.0"] [project.scripts] ocbs = "ocbs.cli:main" @@ -32,4 +32,4 @@ dev = [ [tool.pytest.ini_options] testpaths = ["tests"] -python_files = ["test_*.py"] +python_files = ["test_*.py"] \ No newline at end of file diff --git a/src/ocbs/notifications.py b/src/ocbs/notifications.py index 21dd3ed..1a573af 100644 --- a/src/ocbs/notifications.py +++ b/src/ocbs/notifications.py @@ -47,6 +47,18 @@ def _get_auth_token(self) -> Optional[str]: if token: return token + # Try to get from OCBS config (where save_notification_config stores it) + ocbs_config_file = self.state_dir / "config.json" + if ocbs_config_file.exists(): + try: + with open(ocbs_config_file) as f: + config = json.load(f) + token = config.get("webhook_token") + if token: + return token + except (json.JSONDecodeError, OSError): + pass + # Try to get from OpenClaw config openclaw_config = Path.home() / ".openclaw" / "openclaw.json" if openclaw_config.exists(): @@ -250,22 +262,22 @@ def notify_proceed(self, checkpoint_id: str, token: str) -> bool: text = f"OCBS Proceed: User acknowledged checkpoint {checkpoint_id}" success = self.notify(text) - # Also write a specific proceed notification file for agent polling - if success: - try: - notify_dir = self.state_dir / "proceed_notifications" - notify_dir.mkdir(parents=True, exist_ok=True) - proceed_file = notify_dir / f"{token}.json" - proceed_data = { - "token": token, - "checkpoint_id": checkpoint_id, - "proceeded_at": datetime.now().isoformat(), - "status": "pending_agent_poll", - } - with open(proceed_file, "w") as f: - json.dump(proceed_data, f, indent=2) - except Exception as e: - logger.warning(f"Failed to write proceed notification file: {e}") + # Always write a specific proceed notification file for agent polling + # This ensures the proceed file is created regardless of webhook success + try: + notify_dir = self.state_dir / "proceed_notifications" + notify_dir.mkdir(parents=True, exist_ok=True) + proceed_file = notify_dir / f"{token}.json" + proceed_data = { + "token": token, + "checkpoint_id": checkpoint_id, + "proceeded_at": datetime.now().isoformat(), + "status": "pending_agent_poll", + } + with open(proceed_file, "w") as f: + json.dump(proceed_data, f, indent=2) + except Exception as e: + logger.warning(f"Failed to write proceed notification file: {e}") return success