diff --git a/cecli/__init__.py b/cecli/__init__.py index 770e502e99e..421a7a40714 100644 --- a/cecli/__init__.py +++ b/cecli/__init__.py @@ -1,6 +1,6 @@ from packaging import version -__version__ = "0.99.0.dev" +__version__ = "0.99.1.dev" safe_version = __version__ try: diff --git a/cecli/coders/agent_coder.py b/cecli/coders/agent_coder.py index 86916dd8ac3..d6edbb84341 100644 --- a/cecli/coders/agent_coder.py +++ b/cecli/coders/agent_coder.py @@ -1009,37 +1009,44 @@ def _generate_tool_context(self, repetitive_tools): repetition_warning = None if repetitive_tools: + default_temp = ( + float(self.get_active_model().use_temperature) + if isinstance(self.get_active_model().use_temperature, (int, float, str)) + else 1 + ) + default_fp = 0 + if not self.model_kwargs: self.model_kwargs = { - "temperature": ( - 1 - if isinstance(self.get_active_model().use_temperature, bool) - else float(self.get_active_model().use_temperature) - ) + 0.1, - "frequency_penalty": 0.2, + "temperature": default_temp + 0.1, + "frequency_penalty": default_fp + 0.2, "presence_penalty": 0.1, } else: - temperature = nested.getter(self.model_kwargs, "temperature") - freq_penalty = nested.getter(self.model_kwargs, "frequency_penalty") - if temperature and freq_penalty: - self.model_kwargs["temperature"] = min(temperature + 0.1, 2) - self.model_kwargs["frequency_penalty"] = min(freq_penalty + 0.1, 1) + temperature = nested.getter(self.model_kwargs, "temperature", default_temp) + freq_penalty = nested.getter(self.model_kwargs, "frequency_penalty", default_fp) + + self.model_kwargs["temperature"] = temperature + 0.1 + self.model_kwargs["frequency_penalty"] = freq_penalty + 0.1 if random.random() < 0.2: - self.model_kwargs["temperature"] = min( - ( - 1 - if isinstance(self.get_active_model().use_temperature, bool) - else float(self.get_active_model().use_temperature) - ), - max(temperature - 0.15, 1), + self.model_kwargs["temperature"] = max( + default_temp, + temperature - 0.15, + ) + self.model_kwargs["frequency_penalty"] = max( + default_fp, + freq_penalty - 0.15, ) - self.model_kwargs["frequency_penalty"] = min(0, max(freq_penalty - 0.15, 0)) self.model_kwargs["temperature"] = max( - 0, min(nested.getter(self.model_kwargs, "temperature", 1), 1) + 0, min(nested.getter(self.model_kwargs, "temperature", default_temp), 1) ) + + self.model_kwargs["frequency_penalty"] = max( + 0, min(nested.getter(self.model_kwargs, "frequency_penalty", default_fp), 1) + ) + # One twentieth of the time, just straight reset the randomness if random.random() < 0.05: self.model_kwargs = {} diff --git a/cecli/coders/base_coder.py b/cecli/coders/base_coder.py index c8462207b9a..4139412156a 100755 --- a/cecli/coders/base_coder.py +++ b/cecli/coders/base_coder.py @@ -3065,7 +3065,7 @@ async def send(self, messages, model=None, functions=None, tools=None): self.temperature, # This could include any tools, but for now it is just MCP tools tools=tools, - override_kwargs=self.model_kwargs, + override_kwargs=self.model_kwargs.copy(), ) self.chat_completion_call_hashes.append(hash_object.hexdigest()) diff --git a/cecli/helpers/background_commands.py b/cecli/helpers/background_commands.py index 39f3d8f991b..b2f731377a9 100644 --- a/cecli/helpers/background_commands.py +++ b/cecli/helpers/background_commands.py @@ -5,11 +5,22 @@ in the background and capturing their output for injection into chat streams. """ +import codecs +import os +import platform import subprocess import threading from collections import deque from typing import Dict, Optional, Tuple +try: + import pty + import termios + + HAS_PTY = True +except ImportError: + HAS_PTY = False + class CircularBuffer: """ @@ -89,14 +100,48 @@ def size(self) -> int: return sum(len(chunk) for chunk in self.buffer) +class InputBuffer: + """ + Thread-safe buffer for queuing input to be sent to a process. + """ + + def __init__(self): + self.queue = deque() + self.lock = threading.Lock() + + def append(self, text: str) -> None: + """Add text to the input queue.""" + with self.lock: + self.queue.append(text) + + def pop_all(self) -> str: + """Get and clear all queued input.""" + with self.lock: + result = "".join(self.queue) + self.queue.clear() + return result + + def has_input(self) -> bool: + """Check if there is queued input.""" + with self.lock: + return len(self.queue) > 0 + + class BackgroundProcess: """ Represents a background process with output capture. """ def __init__( - self, command: str, process: subprocess.Popen, buffer: CircularBuffer, persist: bool = False + self, + command: str, + process: subprocess.Popen, + buffer: CircularBuffer, + persist: bool = False, + input_buffer: Optional[InputBuffer] = None, + master_fd: Optional[int] = None, ): + self.master_fd = master_fd """ Initialize background process wrapper. @@ -116,7 +161,11 @@ def __init__( self.start_time = time.time() self.end_time = None self.persist = persist + self.input_buffer = input_buffer or InputBuffer() + self.writer_thread = None + self._stop_event = threading.Event() self._start_output_reader() + self._start_input_writer() def _start_output_reader(self) -> None: """Start thread to read process output.""" @@ -128,15 +177,25 @@ def reader(): # we're in a separate thread and the buffer will capture # output as soon as it's available - # Read stdout - for line in iter(self.process.stdout.readline, ""): - if line: - self.buffer.append(line) + if self.master_fd is not None: + while not self._stop_event.is_set(): + try: + data = os.read(self.master_fd, 4096).decode(errors="replace") + if not data: + break + self.buffer.append(data) + except (OSError, EOFError): + break + else: + # Read stdout + for line in iter(self.process.stdout.readline, ""): + if line: + self.buffer.append(line) - # Read stderr - for line in iter(self.process.stderr.readline, ""): - if line: - self.buffer.append(line) + # Read stderr + for line in iter(self.process.stderr.readline, ""): + if line: + self.buffer.append(line) except Exception as e: self.buffer.append(f"\n[Error reading process output: {str(e)}]\n") @@ -144,6 +203,45 @@ def reader(): self.reader_thread = threading.Thread(target=reader, daemon=True) self.reader_thread.start() + def _start_input_writer(self) -> None: + """Start thread to write input to process stdin.""" + + def writer(): + try: + while not self._stop_event.is_set() and self.is_alive(): + if self.input_buffer.has_input(): + text = self.input_buffer.pop_all() + if text: + if self.master_fd is not None: + os.write(self.master_fd, text.encode("latin-1")) + else: + try: + # Try to write to the binary buffer for lossless propagation + self.process.stdin.buffer.write(text.encode("latin-1")) + self.process.stdin.buffer.flush() + except (AttributeError, ValueError): + # Fallback to text mode if buffer is not available + self.process.stdin.write(text) + self.process.stdin.flush() + import time + + time.sleep(0.1) + except (BrokenPipeError, OSError): + pass + except Exception as e: + self.buffer.append(f"\n[Error writing to process input: {str(e)}]\n") + finally: + try: + if self.master_fd is not None: + os.close(self.master_fd) + else: + self.process.stdin.close() + except Exception: + pass + + self.writer_thread = threading.Thread(target=writer, daemon=True) + self.writer_thread.start() + def get_output(self, clear: bool = False) -> str: """ Get current output buffer. @@ -171,6 +269,11 @@ def is_alive(self) -> bool: """Check if process is running.""" return self.process.poll() is None + def send_input(self, text: str) -> None: + """Queue input to be sent to the process.""" + if self.input_buffer: + self.input_buffer.append(text) + def stop(self, timeout: float = 5.0) -> Tuple[bool, str, Optional[int]]: """ Stop the process gracefully. @@ -184,6 +287,9 @@ def stop(self, timeout: float = 5.0) -> Tuple[bool, str, Optional[int]]: import time try: + # Signal threads to stop + self._stop_event.set() + # Try SIGTERM first self.process.terminate() self.process.wait(timeout=timeout) @@ -192,7 +298,6 @@ def stop(self, timeout: float = 5.0) -> Tuple[bool, str, Optional[int]]: output = self.get_output(clear=True) exit_code = self.process.returncode self.end_time = time.time() - return True, output, exit_code except subprocess.TimeoutExpired: @@ -262,6 +367,8 @@ def start_background_command( existing_process: Optional[subprocess.Popen] = None, existing_buffer: Optional[CircularBuffer] = None, persist: bool = False, + existing_input_buffer: Optional[InputBuffer] = None, + use_pty: bool = False, ) -> str: """ Start a command in background. @@ -283,7 +390,29 @@ def start_background_command( buffer = existing_buffer or CircularBuffer(max_size=max_buffer_size) # Use existing process or start new one - if existing_process: + master_fd = None + if use_pty and HAS_PTY and platform.system() != "Windows": + master_fd, slave_fd = pty.openpty() + + # Disable echo on the slave PTY + attr = termios.tcgetattr(slave_fd) + attr[3] = attr[3] & ~termios.ECHO + termios.tcsetattr(slave_fd, termios.TCSANOW, attr) + + process = subprocess.Popen( + command, + shell=True, + stdout=slave_fd, + stderr=slave_fd, + stdin=slave_fd, + cwd=cwd, + close_fds=True, + text=True, + bufsize=1, + universal_newlines=True, + ) + os.close(slave_fd) + elif existing_process: process = existing_process else: process = subprocess.Popen( @@ -291,15 +420,22 @@ def start_background_command( shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - stdin=subprocess.DEVNULL, # No stdin for background commands + stdin=subprocess.PIPE, cwd=cwd, - text=True, # Use text mode for easier handling - bufsize=1, # Line buffered + text=True, + bufsize=1, universal_newlines=True, ) # Create background process wrapper - bg_process = BackgroundProcess(command, process, buffer, persist=persist) + bg_process = BackgroundProcess( + command, + process, + buffer, + persist=persist, + input_buffer=existing_input_buffer, + master_fd=master_fd, + ) # Generate unique key and store command_key = cls._generate_command_key(command) @@ -367,6 +503,30 @@ def get_new_command_output(cls, command_key: str) -> str: return f"[Error] No background command found with key: {command_key}" return bg_process.get_new_output() + @classmethod + def send_command_input(cls, command_key: str, text: str) -> bool: + """ + Send input to a background command. + + Args: + command_key: Command key returned by start_background_command + text: Text to send to the command's stdin + + Returns: + True if input was queued, False if command not found + """ + with cls._lock: + bg_process = cls._background_commands.get(command_key) + if not bg_process: + return False + # Decode escape sequences (like \x1b) if present in the string + try: + text = codecs.decode(text, "unicode_escape") + except Exception: + pass + bg_process.send_input(text) + return True + @classmethod def get_all_command_outputs(cls, clear: bool = False) -> Dict[str, str]: """ diff --git a/cecli/repo.py b/cecli/repo.py index 0c508287a98..efe39efa2f9 100644 --- a/cecli/repo.py +++ b/cecli/repo.py @@ -128,9 +128,8 @@ def __init__( self.io.tool_error("Files are in different git repos.") raise FileNotFoundError - # https://github.com/gitpython-developers/GitPython/issues/427 - self.repo = git.Repo(repo_paths.pop(), odbt=git.GitCmdObjectDB) - self.root = utils.safe_abs_path(self.repo.working_tree_dir) + self._init_repo_path = repo_paths.pop() + self.init_repo() if cecli_ignore_file: self.cecli_ignore_file = Path(cecli_ignore_file) @@ -140,6 +139,18 @@ def __init__( if self.workspace_path: self.io.tool_output(f"Working in workspace: {self.workspace_path.name}") + def init_repo(self): + if not self.repo: + self.repo = git.Repo(self._init_repo_path, odbt=git.GitCmdObjectDB) + self.root = utils.safe_abs_path(self.repo.working_tree_dir) + + try: + commit = self.repo.head.commit + return commit + except ANY_GIT_ERROR: + self.repo = git.Repo(self._init_repo_path, odbt=git.GitCmdObjectDB) + self.root = utils.safe_abs_path(self.repo.working_tree_dir) + def _detect_workspace_path(self, start_path: str): """Check if current directory is within a workspace""" current = Path(start_path).resolve() @@ -478,6 +489,8 @@ def get_tracked_files(self): if not self.repo: return [] + self.init_repo() + try: commit = self.repo.head.commit except ValueError: diff --git a/cecli/tools/command.py b/cecli/tools/command.py index 8545c17230b..e6cd80bb8e9 100644 --- a/cecli/tools/command.py +++ b/cecli/tools/command.py @@ -17,42 +17,99 @@ class Tool(BaseTool): "parameters": { "type": "object", "properties": { - "command_string": { + "command": { "type": "string", - "description": "The shell command to execute.", + "description": ( + "The shell command to execute. To send stdin to an existing background" + " command, use the format 'command_key::{key}'." + ), }, "background": { "type": "boolean", "description": "Run command in background (non-blocking).", "default": False, }, - "stop_background": { + "stop": { + "type": "boolean", + "description": ( + "If true, stop the background command specified in the 'command'" + " parameter (format 'command_key::{key}')." + ), + "default": False, + }, + "pty": { + "type": "boolean", + "description": ( + "Run the command in a pseudo-terminal (PTY). Useful for interactive" + " programs like 'vi' or 'top'." + ), + "default": False, + }, + "stdin": { "type": "string", - "description": "Command string to stop if running in background.", + "description": ( + "Input to send to the command's stdin. Supports escape sequences like" + " \\n, \\r, \\t, and hex escapes like \\x1b." + ), }, }, - "required": ["command_string"], + "required": ["command"], }, }, } + @staticmethod + def _parse_command_key(command): + """Extract command key from command string if it follows the pattern.""" + if command and command.startswith("command_key::"): + return command.split("::", 1)[1].strip() + return None + @classmethod - async def execute(cls, coder, command_string, background=False, stop_background=None, **kwargs): + async def execute( + cls, coder, command, background=False, stop=None, stdin=None, pty=False, **kwargs + ): """ Execute a shell command, optionally in background. Commands run with timeout based on agent_config['command_timeout'] (default: 30 seconds). """ + command_key = cls._parse_command_key(command) + # Handle stopping background commands - if stop_background: - return await cls._stop_background_command(coder, stop_background) + if stop: + if not command_key: + return ( + "Error: 'command' in format 'command_key::{key}' is required when 'stop' is" + " true." + ) + return await cls._stop_background_command(coder, command_key) + + # Handle sending stdin to an existing background command + if stdin: + if not command_key: + return ( + "Error: 'command' in format 'command_key::{key}' is required when using" + " 'stdin'." + ) + + cls.clear_invocation_cache() + + success = BackgroundCommandManager.send_command_input(command_key, stdin) + if success: + return f"Sent input to background command {command_key}: {stdin}" + else: + return f"Error: Background command {command_key} not found or not running." + + if not command: + return "Error: 'command' must be provided." # Check for implicit background (trailing & on Linux) - if not background and command_string.strip().endswith("&"): + if not background and command.strip().endswith("&"): background = True - command_string = command_string.strip()[:-1].strip() + command = command.strip()[:-1].strip() # Get user confirmation - confirmed = await cls._get_confirmation(coder, command_string, background) + confirmed = await cls._get_confirmation(coder, command, background) if not confirmed: return "Command execution skipped by user." @@ -62,11 +119,11 @@ async def execute(cls, coder, command_string, background=False, stop_background= timeout = coder.agent_config.get("command_timeout", 30) if background: - return await cls._execute_background(coder, command_string) + return await cls._execute_background(coder, command, use_pty=pty) elif timeout > 0: - return await cls._execute_with_timeout(coder, command_string, timeout) + return await cls._execute_with_timeout(coder, command, timeout, use_pty=pty) else: - return await cls._execute_foreground(coder, command_string) + return await cls._execute_foreground(coder, command) @classmethod async def _get_confirmation(cls, coder, command_string, background): @@ -90,7 +147,7 @@ async def _get_confirmation(cls, coder, command_string, background): ) @classmethod - async def _execute_background(cls, coder, command_string): + async def _execute_background(cls, coder, command_string, use_pty=False): """ Execute command in background. """ @@ -98,7 +155,11 @@ async def _execute_background(cls, coder, command_string): # Use static manager to start background command command_key = BackgroundCommandManager.start_background_command( - command_string, verbose=coder.verbose, cwd=coder.root, max_buffer_size=4096 + command_string, + verbose=coder.verbose, + cwd=coder.root, + max_buffer_size=4096, + use_pty=use_pty, ) return ( @@ -108,7 +169,7 @@ async def _execute_background(cls, coder, command_string): ) @classmethod - async def _execute_with_timeout(cls, coder, command_string, timeout): + async def _execute_with_timeout(cls, coder, command_string, timeout, use_pty=False): """ Execute command with timeout. If timeout elapses, move to background. @@ -136,7 +197,7 @@ async def _execute_with_timeout(cls, coder, command_string, timeout): executable=shell if platform.system() != "Windows" else None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - stdin=subprocess.DEVNULL, + stdin=subprocess.PIPE, cwd=coder.root, text=True, bufsize=1, diff --git a/cecli/tools/command_interactive.py b/cecli/tools/command_interactive.py index 591be7b7379..45d3251bdcb 100644 --- a/cecli/tools/command_interactive.py +++ b/cecli/tools/command_interactive.py @@ -7,6 +7,7 @@ class Tool(BaseTool): NORM_NAME = "commandinteractive" + TRACK_INVOCATIONS = False SCHEMA = { "type": "function", "function": { @@ -92,6 +93,8 @@ def _run_interactive(): " large_file_token_threshold)" ) + cls.clear_invocation_cache() + if exit_status == 0: return ( "Interactive command finished successfully (exit code 0)." diff --git a/cecli/tools/delete_text.py b/cecli/tools/delete_text.py index f5d0b2206f3..3f5aea9b99b 100644 --- a/cecli/tools/delete_text.py +++ b/cecli/tools/delete_text.py @@ -64,8 +64,6 @@ def execute( raise ToolError( "Please call `ShowContext` first to make sure edits are appropriately scoped" ) - else: - coder.edit_allowed = False tool_name = "DeleteText" try: @@ -120,6 +118,8 @@ def execute( ) coder.files_edited_by_tools.add(rel_path) + cls.clear_invocation_cache() + # 5. Format and return result success_message = f"Deleted lines {start_line} to {end_line} in {file_path}" return format_tool_result( @@ -131,7 +131,9 @@ def execute( except ToolError as e: # Handle errors raised by utility functions (expected errors) + coder.edit_allowed = False return handle_tool_error(coder, tool_name, e, add_traceback=False) except Exception as e: # Handle unexpected errors + coder.edit_allowed = False return handle_tool_error(coder, tool_name, e) diff --git a/cecli/tools/finished.py b/cecli/tools/finished.py index ff1a8ef7518..6f188a3d874 100644 --- a/cecli/tools/finished.py +++ b/cecli/tools/finished.py @@ -26,6 +26,7 @@ def execute(cls, coder, **kwargs): This gives the LLM explicit control over when it can stop looping """ + cls.clear_invocation_cache() if coder: coder.agent_finished = True diff --git a/cecli/tools/insert_text.py b/cecli/tools/insert_text.py index e73c33155ff..615eb156ea8 100644 --- a/cecli/tools/insert_text.py +++ b/cecli/tools/insert_text.py @@ -1,5 +1,3 @@ -import traceback - from cecli.helpers.hashline import apply_hashline_operation from cecli.tools.utils.base_tool import BaseTool from cecli.tools.utils.helpers import ( @@ -73,8 +71,6 @@ def execute( raise ToolError( "Please call `ShowContext` first to make sure edits are appropriately scoped" ) - else: - coder.edit_allowed = False tool_name = "InsertText" try: @@ -127,6 +123,7 @@ def execute( ) coder.files_edited_by_tools.add(rel_path) + cls.clear_invocation_cache() # 5. Format and return result success_message = f"Inserted content at {start_line} in {file_path}" @@ -139,13 +136,12 @@ def execute( except ToolError as e: # Handle errors raised by utility functions (expected errors) + coder.edit_allowed = False return handle_tool_error(coder, tool_name, e, add_traceback=False) except Exception as e: - coder.io.tool_error( - f"Error in InsertText: {str(e)}\n{traceback.format_exc()}" - ) # Add traceback - return f"Error: {str(e)}" + coder.edit_allowed = False + return handle_tool_error(coder, tool_name, e) @classmethod def format_output(cls, coder, mcp_server, tool_response): diff --git a/cecli/tools/replace_text.py b/cecli/tools/replace_text.py index 3d9ba450f55..30566281b21 100644 --- a/cecli/tools/replace_text.py +++ b/cecli/tools/replace_text.py @@ -87,8 +87,6 @@ def execute( raise ToolError( "Please call `ShowContext` first to make sure edits are appropriately scoped" ) - else: - coder.edit_allowed = False tool_name = "ReplaceText" try: @@ -264,6 +262,8 @@ def execute( success_message += "\nFailed edits:\n" + "\n".join(all_failed_edits) change_id_to_return = None # Multiple change IDs, can't return single one + cls.clear_invocation_cache() + return format_tool_result( coder, tool_name, @@ -273,9 +273,11 @@ def execute( except ToolError as e: # Handle errors raised by utility functions or explicitly raised here + coder.edit_allowed = False return handle_tool_error(coder, tool_name, e, add_traceback=False) except Exception as e: # Handle unexpected errors + coder.edit_allowed = False return handle_tool_error(coder, tool_name, e) @classmethod diff --git a/cecli/tools/show_context.py b/cecli/tools/show_context.py index 65eee3b115b..f1f55e1af54 100644 --- a/cecli/tools/show_context.py +++ b/cecli/tools/show_context.py @@ -23,7 +23,7 @@ class Tool(BaseTool): " used for start_text and end_text to represent the first and last lines of" " the file respectively. Never use hashlines as the start_text and end_text" " values. These values must be lines from the content of the file." - " They should not contain newlines." + " They can contain up to 3 lines but newlines should generally be avoided." " Avoid using generic keywords." " Do not use the same pattern for the start_text and end_text." " It is usually best to use function names and other block identifiers as " @@ -114,11 +114,8 @@ def execute(cls, coder, show, **kwargs): " 'end_text'." ) - if "\n" in start_text or "\n" in end_text: - raise ToolError( - "Patterns must not contain newlines characters. They must match a single" - " line." - ) + if start_text.count("\n") > 4 or end_text.count("\n") > 4: + raise ToolError("Patterns must not contain more than 5 lines.") start_text = strip_hashline(start_text).strip() end_text = strip_hashline(end_text).strip() @@ -151,12 +148,26 @@ def execute(cls, coder, show, **kwargs): if start_text == "@000": start_indices = [0] else: - start_indices = [i for i, line in enumerate(lines) if start_text in line] + start_pattern_lines = start_text.split("\n") + start_indices = [] + for i in range(len(lines) - len(start_pattern_lines) + 1): + if all( + p_line in lines[i + j] + for j, p_line in enumerate(start_pattern_lines) + ): + start_indices.append(i) if end_text == "000@": end_indices = [num_lines - 1] else: - end_indices = [i for i, line in enumerate(lines) if end_text in line] + end_pattern_lines = end_text.split("\n") + end_indices = [] + for i in range(len(lines) - len(end_pattern_lines) + 1): + if all( + p_line in lines[i + j] for j, p_line in enumerate(end_pattern_lines) + ): + # For multiline end patterns, we want the index of the LAST line of the match + end_indices.append(i + len(end_pattern_lines) - 1) if len(start_indices) > 5: raise ToolError( diff --git a/cecli/tools/utils/base_tool.py b/cecli/tools/utils/base_tool.py index 6fc59f888d0..927b65e8cf3 100644 --- a/cecli/tools/utils/base_tool.py +++ b/cecli/tools/utils/base_tool.py @@ -88,10 +88,7 @@ def process_response(cls, coder, params): cls._invocations[tool_name].append((current_params_tuple, params)) if len(cls._invocations[tool_name]) > 3: cls._invocations[tool_name] = cls._invocations[tool_name][-3:] - else: - # When TRACK_INVOCATIONS is False, clear all invocation history - # This indicates the job is making progress, so reset tracking for all tools - cls._invocations.clear() + try: return cls.execute(coder, **params) except Exception as e: @@ -104,3 +101,7 @@ def format_output(cls, coder, mcp_server, tool_response): @classmethod def on_duplicate_request(cls, coder, **kwargs): pass + + @classmethod + def clear_invocation_cache(cls): + cls._invocations.clear() diff --git a/tests/basic/test_io.py b/tests/basic/test_io.py index 577adab5987..cd838cbfbb7 100644 --- a/tests/basic/test_io.py +++ b/tests/basic/test_io.py @@ -537,6 +537,8 @@ def test_format_files_for_input_pretty_false(self, mock_is_dumb_terminal): def test_format_files_for_input_pretty_true_no_files( self, mock_join, mock_abspath, mock_columns, mock_is_dumb_terminal ): + mock_join.side_effect = lambda *args: "/".join(args) + mock_abspath.side_effect = lambda p: "/ABS_PREFIX_VERY_LONG/" + os.path.normpath(p) io = InputOutput(pretty=True, root="test_root") io.format_files_for_input([], [], []) mock_columns.assert_not_called() @@ -547,6 +549,8 @@ def test_format_files_for_input_pretty_true_no_files( def test_format_files_for_input_pretty_true_editable_only( self, mock_join, mock_abspath, mock_columns, mock_is_dumb_terminal ): + mock_join.side_effect = lambda *args: "/".join(args) + mock_abspath.side_effect = lambda p: "/ABS_PREFIX_VERY_LONG/" + os.path.normpath(p) io = InputOutput(pretty=True, root="test_root") rel_fnames = ["edit1.txt", "edit[markup].txt"] diff --git a/tests/tools/test_show_context.py b/tests/tools/test_show_context.py index dfedb021adb..20a1387f4ef 100644 --- a/tests/tools/test_show_context.py +++ b/tests/tools/test_show_context.py @@ -120,3 +120,23 @@ def test_target_symbol_empty_string_treated_as_missing(): end_pattern=None, line_count=1, ) + + +def test_multiline_pattern_search(coder_with_file): + coder, file_path = coder_with_file + # file_path contains "alpha\nbeta\ngamma\n" + + result = show_context.Tool.execute( + coder, + show=[ + { + "file_path": "example.txt", + "start_text": "alpha\nbeta", + "end_text": "beta\ngamma", + "padding": 0, + } + ], + ) + + assert "Successfully retrieved most recent contents for 1 file(s)" in result + coder.io.tool_error.assert_not_called()