Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ config.json
/src/octopal/cli/wizard/__pycache__
/src/octopal/infrastructure/connectors/__pycache__
/src/octopal/tools/connectors/__pycache__
/src/octopal/tools/communication/__pycache__
/src/octopal/mcp_servers/__pycache__
25 changes: 25 additions & 0 deletions scripts/whatsapp_bridge/bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,31 @@ const server = http.createServer(async (req, res) => {
rememberOutboundMessageId(result?.key?.id);
return await jsonResponse(res, 200, { ok: true, to, length: text.length });
}
if (req.method === "POST" && url.pathname === "/send-file") {
const payload = await readJson(req);
const to = normalizeDirectJid(payload.to || "");
const filePath = String(payload.path || "").trim();
const caption = String(payload.caption || "").trim();
if (!sock || !to || !filePath) {
return await jsonResponse(res, 400, { ok: false, error: "missing_to_or_path" });
}
const absolutePath = path.resolve(filePath);
try {
const stat = await fs.stat(absolutePath);
if (!stat.isFile()) {
return await jsonResponse(res, 400, { ok: false, error: "path_is_not_file" });
}
} catch {
return await jsonResponse(res, 400, { ok: false, error: "file_not_found" });
}
const result = await sock.sendMessage(to, {
document: { url: absolutePath },
fileName: path.basename(absolutePath),
caption,
});
rememberOutboundMessageId(result?.key?.id);
return await jsonResponse(res, 200, { ok: true, to, path: absolutePath });
}
if (req.method === "POST" && url.pathname === "/react") {
const payload = await readJson(req);
const to = normalizeDirectJid(payload.to || payload.remoteJid || "");
Expand Down
16 changes: 15 additions & 1 deletion src/octopal/channels/telegram/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aiogram import Bot, Dispatcher
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import Command, CommandObject
from aiogram.types import CallbackQuery, Message, ReactionTypeEmoji
from aiogram.types import CallbackQuery, FSInputFile, Message, ReactionTypeEmoji

from octopal.channels.telegram.access import is_allowed_chat, parse_allowed_chat_ids
from octopal.channels.telegram.approvals import ApprovalManager
Expand Down Expand Up @@ -156,6 +156,9 @@ async def _internal_send(chat_id: int, text: str) -> None:
return
await _enqueue_send(bot, chat_id, decision.text)

async def _internal_send_file(chat_id: int, file_path: str, caption: str | None = None) -> None:
await _send_file_safe(bot, chat_id, file_path, caption=caption)

async def _internal_progress_send(
chat_id: int,
state: str,
Expand Down Expand Up @@ -188,11 +191,13 @@ async def _internal_typing_control(chat_id: int, active: bool) -> None:
_publish_runtime_metrics()

octo.internal_send = _internal_send
octo.internal_send_file = _internal_send_file
octo.internal_progress_send = _internal_progress_send
octo.internal_typing_control = _internal_typing_control

# Re-initialize the Octo's default (Telegram) output hooks if needed
octo._tg_send = _internal_send
octo._tg_send_file = _internal_send_file
octo._tg_progress = _internal_progress_send
octo._tg_typing = _internal_typing_control

Expand Down Expand Up @@ -460,6 +465,15 @@ async def _send_message_safe(bot: Bot, chat_id: int, text: str, reply_to_message
await bot.send_message(chat_id, sanitized, reply_to_message_id=reply_to_message_id)


async def _send_file_safe(bot: Bot, chat_id: int, file_path: str, caption: str | None = None) -> None:
clean_caption = sanitize_user_facing_text(strip_reaction_tags(caption or "")) or None
await bot.send_document(
chat_id,
document=FSInputFile(file_path),
caption=clean_caption,
)


async def _enqueue_send(bot: Bot, chat_id: int, text: str, reply_to_message_id: int | None = None) -> None:
decision = resolve_user_delivery(text)
if not decision.user_visible:
Expand Down
7 changes: 7 additions & 0 deletions src/octopal/channels/whatsapp/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ def qr_terminal(self) -> dict[str, Any]:
def send_message(self, to: str, text: str) -> dict[str, Any]:
return self._request("POST", "/send", json={"to": to, "text": text})

def send_file(self, to: str, file_path: str, *, caption: str | None = None) -> dict[str, Any]:
return self._request(
"POST",
"/send-file",
json={"to": to, "path": file_path, "caption": caption or ""},
)

def send_reaction(
self,
to: str,
Expand Down
10 changes: 10 additions & 0 deletions src/octopal/channels/whatsapp/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ async def _internal_send(chat_id: int, text: str) -> None:
for chunk in _chunk_text(clean_text, limit=4000):
self.bridge.send_message(to, chunk)

async def _internal_send_file(chat_id: int, file_path: str, caption: str | None = None) -> None:
to = self._number_by_chat_id.get(chat_id)
if not to:
logger.warning("Missing WhatsApp recipient mapping for file delivery", chat_id=chat_id)
return
clean_caption = sanitize_user_facing_text(caption or "") or None
self.bridge.send_file(to, file_path, caption=clean_caption)

async def _internal_progress_send(
chat_id: int,
state: str,
Expand All @@ -88,9 +96,11 @@ async def _internal_typing_control(chat_id: int, active: bool) -> None:
logger.debug("WhatsApp typing indicator not implemented", chat_id=chat_id, active=active)

self.octo.internal_send = _internal_send
self.octo.internal_send_file = _internal_send_file
self.octo.internal_progress_send = _internal_progress_send
self.octo.internal_typing_control = _internal_typing_control
self.octo._tg_send = _internal_send
self.octo._tg_send_file = _internal_send_file
self.octo._tg_progress = _internal_progress_send
self.octo._tg_typing = _internal_typing_control

Expand Down
6 changes: 6 additions & 0 deletions src/octopal/runtime/octo/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ class Octo:
mcp_manager: MCPManager | None = None
connector_manager: ConnectorManager | None = None
internal_send: callable | None = None
internal_send_file: callable | None = None
internal_progress_send: callable | None = None
internal_typing_control: callable | None = None
_cleanup_task: asyncio.Task | None = None
Expand All @@ -723,6 +724,7 @@ class Octo:
_ws_active: bool = False
_ws_owner: str | None = None
_tg_send: callable | None = None
_tg_send_file: callable | None = None
_tg_progress: callable | None = None
_tg_typing: callable | None = None
_spawn_limits: dict[str, int] | None = None
Expand Down Expand Up @@ -826,6 +828,7 @@ def __post_init__(self):
self._restore_worker_registry_state()
self._thinking_count = 0
self._tg_send = self.internal_send
self._tg_send_file = self.internal_send_file
self._tg_progress = self.internal_progress_send
self._tg_typing = self.internal_typing_control

Expand Down Expand Up @@ -869,6 +872,7 @@ def set_output_channel(
self,
is_ws: bool,
send: callable | None = None,
send_file: callable | None = None,
progress: callable | None = None,
typing: callable | None = None,
owner_id: str | None = None,
Expand All @@ -894,12 +898,14 @@ def set_output_channel(
self._ws_active = is_ws
if is_ws:
self.internal_send = send
self.internal_send_file = send_file
self.internal_progress_send = progress
self.internal_typing_control = typing
self._ws_owner = owner_id or "ws-default"
logger.info("Octo switched to WebSocket output channel")
else:
self.internal_send = self._tg_send
self.internal_send_file = self._tg_send_file
self.internal_progress_send = self._tg_progress
self.internal_typing_control = self._tg_typing
self._ws_owner = None
Expand Down
1 change: 1 addition & 0 deletions src/octopal/runtime/octo/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"get_worker_result",
"get_worker_output_path",
"stop_worker",
"send_file_to_user",
# Octo must always be able to inspect and mutate its workspace.
"fs_list",
"fs_read",
Expand Down
4 changes: 4 additions & 0 deletions src/octopal/runtime/workers/agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ async def execute_agent_task(worker: Worker, workspace_root: Path, worker_dir: P
filtered_tools = apply_tool_policy_pipeline(
available_tools,
[
ToolPolicyPipelineStep(
label="worker.user_communication_denylist",
policy=ToolPolicy(deny=["send_file_to_user"]),
),
ToolPolicyPipelineStep(
label="worker.available_tools",
policy=ToolPolicy(allow=list(spec.available_tools or [])),
Expand Down
7 changes: 6 additions & 1 deletion src/octopal/runtime/workers/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from octopal.utils import utc_now

logger = structlog.get_logger(__name__)
_WORKER_BLOCKED_TOOL_NAMES = {"send_file_to_user"}

# Constants
_MAX_RECOVERY_ATTEMPTS = 1
Expand Down Expand Up @@ -98,7 +99,11 @@ async def run_task(
if not granted:
return WorkerResult(summary="Permission denied for worker task")

requested_tool_names = list(task_request.tools or template.available_tools)
requested_tool_names = [
str(tool_name)
for tool_name in (task_request.tools or template.available_tools)
if str(tool_name) not in _WORKER_BLOCKED_TOOL_NAMES
]
has_requested_mcp_tools = any(str(tool_name).startswith("mcp_") for tool_name in requested_tool_names)
if self.mcp_manager:
try:
Expand Down
30 changes: 30 additions & 0 deletions src/octopal/tools/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from octopal.channels import normalize_user_channel, user_channel_label
from octopal.infrastructure.config.settings import load_settings
from octopal.tools.communication.send_file import send_file_to_user
from octopal.runtime.memory.memchain import (
memchain_init,
memchain_record,
Expand Down Expand Up @@ -54,6 +55,35 @@

def get_tools(mcp_manager=None) -> list[ToolSpec]:
tools = [
ToolSpec(
name="send_file_to_user",
description="Send a local workspace file or a downloaded URL attachment to the active user channel. Only the Octo can use this.",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Workspace-relative or workspace-absolute file path to send.",
},
"url": {
"type": "string",
"description": "HTTP(S) URL to download into workspace/tmp before sending.",
},
"filename": {
"type": "string",
"description": "Optional filename override when downloading from URL.",
},
"caption": {
"type": "string",
"description": "Optional message caption to attach to the file.",
},
},
"additionalProperties": False,
},
permission="self_control",
handler=send_file_to_user,
is_async=True,
),
ToolSpec(
name="manage_canon",
description="Manage canonical memory files (facts.md, decisions.md, failures.md). Only the Octo can use this.",
Expand Down
5 changes: 5 additions & 0 deletions src/octopal/tools/communication/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from octopal.tools.communication.send_file import send_file_to_user

__all__ = ["send_file_to_user"]
122 changes: 122 additions & 0 deletions src/octopal/tools/communication/send_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from __future__ import annotations

import json
import mimetypes
import os
import uuid
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse

import httpx

from octopal.tools.filesystem.path_safety import WorkspacePathError, resolve_workspace_path


def _error(message: str) -> str:
return json.dumps({"status": "error", "message": message}, ensure_ascii=False)


def _infer_filename_from_url(url: str, content_type: str | None = None) -> str:
parsed = urlparse(url)
candidate = Path(unquote(parsed.path or "")).name.strip()
if candidate:
return candidate
extension = mimetypes.guess_extension((content_type or "").split(";", 1)[0].strip()) or ".bin"
return f"download{extension}"


def _sanitize_filename(filename: str) -> str:
cleaned = Path(str(filename or "").strip()).name.strip()
if not cleaned:
raise ValueError("filename is empty")
if cleaned in {".", ".."}:
raise ValueError("filename is invalid")
return cleaned


def _resolve_existing_workspace_file(base_dir: Path, raw_path: str) -> Path:
resolved = resolve_workspace_path(base_dir, raw_path, must_exist=True)
if not resolved.is_file():
raise WorkspacePathError("path is not a file")
return resolved


async def _download_to_tmp(
*,
base_dir: Path,
url: str,
filename: str | None = None,
) -> Path:
tmp_dir = resolve_workspace_path(base_dir, "tmp/outbound_files")
tmp_dir.mkdir(parents=True, exist_ok=True)

async with httpx.AsyncClient(follow_redirects=True, timeout=60.0) as client:
async with client.stream("GET", url) as response:
response.raise_for_status()
inferred_name = _sanitize_filename(
filename or _infer_filename_from_url(url, response.headers.get("content-type"))
)
final_name = f"{uuid.uuid4()}_{inferred_name}"
save_path = resolve_workspace_path(base_dir, f"tmp/outbound_files/{final_name}")
with open(save_path, "wb") as handle:
async for chunk in response.aiter_bytes():
handle.write(chunk)
return save_path


async def send_file_to_user(args: dict[str, Any], ctx: dict[str, Any]) -> str:
octo = ctx.get("octo")
if octo is None:
return _error("send_file_to_user requires octo context")

sender = getattr(octo, "internal_send_file", None)
if not callable(sender):
return _error("active user channel does not support file delivery")

chat_id = int(ctx.get("chat_id", 0) or 0)
if chat_id <= 0:
return _error("send_file_to_user requires a valid chat_id")

base_dir = ctx.get("base_dir")
if not isinstance(base_dir, Path):
return _error("send_file_to_user requires base_dir context")

raw_path = str((args or {}).get("path", "") or "").strip()
raw_url = str((args or {}).get("url", "") or "").strip()
caption = str((args or {}).get("caption", "") or "").strip() or None
requested_filename = str((args or {}).get("filename", "") or "").strip() or None

if bool(raw_path) == bool(raw_url):
return _error("provide exactly one of 'path' or 'url'")

try:
if raw_path:
file_path = _resolve_existing_workspace_file(base_dir, raw_path)
source = "path"
else:
if urlparse(raw_url).scheme not in {"http", "https"}:
return _error("url must use http or https")
file_path = await _download_to_tmp(base_dir=base_dir, url=raw_url, filename=requested_filename)
source = "url"
await sender(chat_id, str(file_path), caption=caption)
except WorkspacePathError as exc:
return _error(f"unsafe file path: {exc}")
except httpx.HTTPStatusError as exc:
return _error(f"download failed with HTTP {exc.response.status_code}")
except ValueError as exc:
return _error(str(exc))
except Exception as exc:
return _error(f"failed to send file: {exc}")

relative_path = os.path.relpath(file_path, base_dir)
return json.dumps(
{
"status": "success",
"source": source,
"path": relative_path.replace("\\", "/"),
"filename": file_path.name,
"caption": caption,
},
ensure_ascii=False,
)
Loading