Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions mini_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,43 @@ def __init__(
# Initialize logger
self.logger = AgentLogger()

# Pause/stop control flags
self._stop_requested = False
self._stop_notified = False
self._paused = False
self._resume_step = 0

def add_user_message(self, content: str):
"""Add a user message to history."""
self.messages.append(Message(role="user", content=content))

def request_stop(self):
"""Signal the agent loop to stop at the next safe checkpoint."""
self._stop_requested = True
self._stop_notified = False

def cancel_pause(self):
"""Reset pause state when user abandons a resume."""
self._stop_requested = False
self._stop_notified = False
self._paused = False
self._resume_step = 0

def _check_stop_requested(self, current_step: int) -> bool:
"""Return True if a stop was requested and emit a single notification."""
if not self._stop_requested:
return False
if not self._stop_notified:
print(f"\n{Colors.BRIGHT_YELLOW}⏸️ Agent paused by user (press Enter to continue interacting).{Colors.RESET}\n")
self._stop_notified = True
self._paused = True
self._resume_step = current_step
return True

def is_paused(self) -> bool:
"""Whether the agent halted due to a stop request."""
return self._paused

def _estimate_tokens(self) -> int:
"""Accurately calculate token count for message history using tiktoken

Expand Down Expand Up @@ -258,13 +291,24 @@ async def _create_summary(self, messages: list[Message], round_num: int) -> str:

async def run(self) -> str:
"""Execute agent loop until task is complete or max steps reached."""
# Start new run, initialize log file
self.logger.start_new_run()
print(f"{Colors.DIM}📝 Log file: {self.logger.get_log_file_path()}{Colors.RESET}")

step = 0
resuming = self._paused
if not resuming:
self.logger.start_new_run()
print(f"{Colors.DIM}📝 Log file: {self.logger.get_log_file_path()}{Colors.RESET}")
else:
print(f"{Colors.DIM}📝 Resuming run (log: {self.logger.get_log_file_path()}){Colors.RESET}")

step = self._resume_step if resuming else 0
self._stop_requested = False
self._stop_notified = False
self._paused = False
if not resuming:
self._resume_step = 0

while step < self.max_steps:
if self._check_stop_requested(step):
return "Agent run interrupted by user."

# Check and summarize message history to prevent context overflow
await self._summarize_messages()

Expand Down Expand Up @@ -296,8 +340,12 @@ async def run(self) -> str:
else:
error_msg = f"LLM call failed: {str(e)}"
print(f"\n{Colors.BRIGHT_RED}❌ Error:{Colors.RESET} {error_msg}")
self.cancel_pause()
return error_msg

if self._check_stop_requested(step):
return "Agent run interrupted by user."

# Log LLM response
self.logger.log_response(
content=response.content,
Expand Down Expand Up @@ -327,9 +375,10 @@ async def run(self) -> str:

# Check if task is complete (no tool calls)
if not response.tool_calls:
self.cancel_pause()
return response.content

# Execute tool calls
# Execute all tool calls before checking for stop requests again
for tool_call in response.tool_calls:
tool_call_id = tool_call.id
function_name = tool_call.function.name
Expand Down Expand Up @@ -403,10 +452,15 @@ async def run(self) -> str:
self.messages.append(tool_msg)

step += 1
self._resume_step = step

if self._check_stop_requested(step):
return "Agent run interrupted by user."

# Max steps reached
error_msg = f"Task couldn't be completed after {self.max_steps} steps."
print(f"\n{Colors.BRIGHT_YELLOW}⚠️ {error_msg}{Colors.RESET}")
self.cancel_pause()
return error_msg

def get_history(self) -> list[Message]:
Expand Down
117 changes: 112 additions & 5 deletions mini_agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@

import argparse
import asyncio
import sys
from datetime import datetime
from pathlib import Path
from typing import List

try:
import termios
import tty
except ImportError: # pragma: no cover - Windows fallback
termios = None # type: ignore[assignment]
tty = None # type: ignore[assignment]

from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import WordCompleter
Expand Down Expand Up @@ -70,6 +78,75 @@ class Colors:
BG_BLUE = "\033[44m"


class EscapeKeyListener:
"""Listen for ESC key presses during agent execution to request a stop."""

def __init__(self, agent: Agent):
self.agent = agent
self._loop: asyncio.AbstractEventLoop | None = None
self._fd: int | None = None
self._old_settings = None
self._cbreak_enabled = False
self._reader_registered = False

async def __aenter__(self):
self._loop = asyncio.get_running_loop()
await self._loop.run_in_executor(None, self._enable_cbreak_mode)
if (
self._cbreak_enabled
and self._loop
and self._fd is not None
and hasattr(self._loop, "add_reader")
):
self._loop.add_reader(self._fd, self._handle_keypress)
self._reader_registered = True
return self

async def __aexit__(self, exc_type, exc, tb):
if self._reader_registered and self._loop and self._fd is not None:
self._loop.remove_reader(self._fd)
self._reader_registered = False
if self._loop:
await self._loop.run_in_executor(None, self._restore_terminal)

def _enable_cbreak_mode(self):
if termios is None or tty is None:
return
if not sys.stdin.isatty():
return
try:
self._fd = sys.stdin.fileno()
self._old_settings = termios.tcgetattr(self._fd)
tty.setcbreak(self._fd)
self._cbreak_enabled = True
except Exception:
self._cbreak_enabled = False

def _restore_terminal(self):
if (
self._cbreak_enabled
and self._fd is not None
and self._old_settings is not None
and termios is not None
):
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._old_settings)
self._cbreak_enabled = False

def _handle_keypress(self):
if not self._cbreak_enabled:
return
try:
ch = sys.stdin.read(1)
except Exception:
return
if ch == "\x1b": # ESC key
print(f"\n{Colors.BRIGHT_YELLOW}⏹️ Escape detected, requesting agent pause...{Colors.RESET}")
self.agent.request_stop()
if self._loop and self._reader_registered and self._fd is not None:
self._loop.remove_reader(self._fd)
self._reader_registered = False


def print_banner():
"""Print welcome banner with proper alignment"""
BOX_WIDTH = 58
Expand Down Expand Up @@ -105,13 +182,15 @@ def print_help():
{Colors.BRIGHT_CYAN}Tab{Colors.RESET} - Auto-complete commands
{Colors.BRIGHT_CYAN}↑/↓{Colors.RESET} - Browse command history
{Colors.BRIGHT_CYAN}→{Colors.RESET} - Accept auto-suggestion
{Colors.BRIGHT_CYAN}Esc{Colors.RESET} - Pause the current agent run (press Enter to resume)

{Colors.BOLD}{Colors.BRIGHT_YELLOW}Usage:{Colors.RESET}
- Enter your task directly, Agent will help you complete it
- Agent remembers all conversation content in this session
- Use {Colors.BRIGHT_GREEN}/clear{Colors.RESET} to start a new session
- Press {Colors.BRIGHT_CYAN}Enter{Colors.RESET} to submit your message
- Use {Colors.BRIGHT_CYAN}Ctrl+J{Colors.RESET} to insert line breaks within your message
- Press {Colors.BRIGHT_CYAN}Esc{Colors.RESET} anytime during execution to stop the agent, then press Enter (empty line) to resume
"""
print(help_text)

Expand Down Expand Up @@ -491,6 +570,13 @@ def _(event):
key_bindings=kb,
)

async def invoke_agent_run():
print(f"\n{Colors.BRIGHT_BLUE}Agent{Colors.RESET} {Colors.DIM}›{Colors.RESET} {Colors.DIM}Thinking...{Colors.RESET}\n")
async with EscapeKeyListener(agent):
return await agent.run()

resume_pending = False

# 9. Interactive loop
while True:
try:
Expand All @@ -507,7 +593,14 @@ def _(event):
user_input = user_input.strip()

if not user_input:
continue
if resume_pending:
result = await invoke_agent_run()
resume_pending = agent.is_paused()
if not resume_pending:
print(f"\n{Colors.DIM}{'─' * 60}{Colors.RESET}\n")
continue
else:
continue

# Handle commands
if user_input.startswith("/"):
Expand All @@ -520,26 +613,36 @@ def _(event):

elif command == "/help":
print_help()
resume_pending = False
agent.cancel_pause()
continue

elif command == "/clear":
# Clear message history but keep system prompt
old_count = len(agent.messages)
agent.messages = [agent.messages[0]] # Keep only system message
print(f"{Colors.GREEN}✅ Cleared {old_count - 1} messages, starting new session{Colors.RESET}\n")
resume_pending = False
agent.cancel_pause()
continue

elif command == "/history":
print(f"\n{Colors.BRIGHT_CYAN}Current session message count: {len(agent.messages)}{Colors.RESET}\n")
resume_pending = False
agent.cancel_pause()
continue

elif command == "/stats":
print_stats(agent, session_start)
resume_pending = False
agent.cancel_pause()
continue

else:
print(f"{Colors.RED}❌ Unknown command: {user_input}{Colors.RESET}")
print(f"{Colors.DIM}Type /help to see available commands{Colors.RESET}\n")
resume_pending = False
agent.cancel_pause()
continue

# Normal conversation - exit check
Expand All @@ -549,12 +652,16 @@ def _(event):
break

# Run Agent
print(f"\n{Colors.BRIGHT_BLUE}Agent{Colors.RESET} {Colors.DIM}›{Colors.RESET} {Colors.DIM}Thinking...{Colors.RESET}\n")
if resume_pending:
agent.cancel_pause()
resume_pending = False
agent.add_user_message(user_input)
_ = await agent.run()
result = await invoke_agent_run()
resume_pending = agent.is_paused()

# Visual separation - keep it simple like the reference code
print(f"\n{Colors.DIM}{'─' * 60}{Colors.RESET}\n")
if not resume_pending:
# Visual separation - keep it simple like the reference code
print(f"\n{Colors.DIM}{'─' * 60}{Colors.RESET}\n")

except KeyboardInterrupt:
print(f"\n\n{Colors.BRIGHT_YELLOW}👋 Interrupt signal detected, exiting...{Colors.RESET}\n")
Expand Down
Loading