From 7b2d5692c5ff2810403430d7c9f4b36857843c64 Mon Sep 17 00:00:00 2001 From: Rajveer* Date: Mon, 16 Jun 2025 09:53:36 +0530 Subject: [PATCH 1/2] feat: add two-step confirmation for dangerous shell commands Added manual command re-entry as a second-layer safety check for destructive commands like 'rm', 'delete', and 'shutdown'. Now, after initial confirmation, users must re-type the command exactly before execution. --- promptshell/ai_terminal_assistant.py | 129 ++++++++++++++++----------- promptshell/main.py | 7 ++ 2 files changed, 84 insertions(+), 52 deletions(-) diff --git a/promptshell/ai_terminal_assistant.py b/promptshell/ai_terminal_assistant.py index ee381be..8d27d72 100644 --- a/promptshell/ai_terminal_assistant.py +++ b/promptshell/ai_terminal_assistant.py @@ -8,9 +8,12 @@ from typing import Tuple from .node import Node from .data_gatherer import DataGatherer -from .format_utils import format_text, reset_format, get_current_os, get_os_specific_examples +from .format_utils import format_text, reset_format from .system_info import get_system_info +def get_current_os(): + return platform.system() + class AITerminalAssistant: def __init__(self, model_name: str, max_tokens: int = 8000, config: dict = None): self.username = getpass.getuser() @@ -202,9 +205,13 @@ def execute_interactive_command(self, command: str) -> Tuple[str, str, int]: def execute_command(self, user_input: str) -> str: try: + command = None # Initialize command variable self.current_directory = os.getcwd() + + # Basic input validation if user_input.strip() == "": return "Please provide a valid input." + if user_input.startswith('?') or user_input.endswith('?'): return self.answer_question(user_input) if user_input.lower().strip() == 'clear' or user_input.lower().strip() == 'cls': @@ -218,46 +225,68 @@ def execute_command(self, user_input: str) -> str: additional_data = self.gather_additional_data(user_input) command = self.command_executor(f""" User Input: {user_input} - Current OS: {get_current_os()} - Current OS specific examples: {get_os_specific_examples()} + Current OS: {platform.system()} Current Directory: {self.current_directory} - Translate the user input into a SINGLE shell command according to the operating system. + Translate the user input into a SINGLE shell command. Return ONLY the command, nothing else. - If the input is already a valid shell command, return it as is. - Do not provide any explanations or comments. - Use the actual filenames and content provided in the additional data. """, additional_data=additional_data).strip() - choice = questionary.confirm(f"Do you want to run the command '{command}'?").ask() - if choice: - if command.startswith("CONFIRM:"): - confirmation = questionary.confirm(f"Warning: This command may be destructive. Are you sure you want to run '{command[8:]}'?").ask() - if not confirmation: - return format_text('red') + "Command execution aborted." + reset_format() - command = command[8:] - formatted_command = format_text('cyan') + f"Command: {command}" + reset_format() - print(formatted_command) - self.command_history.append(command) - if len(self.command_history) > 10: - self.command_history.pop(0) - if command.startswith("cd "): - path = command.split(" ", 1)[1] - os.chdir(os.path.expanduser(path)) - result = f"Changed directory to {os.getcwd()}" - exit_code = 0 - else: - stdout, stderr, exit_code = self.execute_command_with_live_output(command) - result = "" - if exit_code != 0: - debug_suggestion = self.debug_error(command, stderr, exit_code) - result += format_text('yellow') + f"\n\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() - return result.strip() - else: - print(format_text('red') + "Command cancelled!" + reset_format()) - return "" + # Handle dangerous commands with two-step verification + if self.is_dangerous_command(command): + # First confirmation + proceed = input(format_text('yellow', bold=True) + + f"\n⚠️ Warning: This is a destructive command.\nDo you want to continue? (yes/no): " + + reset_format()) + if proceed.strip().lower() != "yes": + return format_text('red', bold=True) + "\nCommand aborted by user." + reset_format() + + # Second confirmation - manual reentry + confirm_input = input(format_text('yellow', bold=True) + + f"For safety, please re-type the exact command to proceed:\n> " + + reset_format()) + if confirm_input.strip() != command.strip(): + return format_text('red', bold=True) + "\n❌ Command mismatch. Operation aborted!" + reset_format() + + # Execute the validated command + return self._execute_validated_command(command) + except Exception as e: - print(format_text('red') + "Error in execute command" + reset_format()) - return self.handle_error(str(e), user_input, command) + print(format_text('red') + f"Error: {str(e)}" + reset_format()) + return format_text('red') + "Command execution failed." + reset_format() + + def handle_error(self, error: str, user_input: str, command: str) -> str: + try: + error_analysis = self.error_handler(f""" + Error: {error} + User Input: {user_input} + Interpreted Command: {command} + Current Directory: {self.current_directory} + Provide ONLY a single, simple corrected command. No explanations. + """) + + print(format_text('red') + f"Error occurred: {error}" + reset_format()) + print(format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format()) + + if questionary.confirm("Would you like to execute the suggested command?").ask(): + return self._execute_validated_command(error_analysis) + return format_text('red') + "Command execution aborted." + reset_format() + + except Exception as e: + return format_text('red') + f"Error handler failed: {str(e)}" + reset_format() + + def _execute_validated_command(self, command: str) -> str: + """Helper method to execute validated commands""" + if command.startswith("cd "): + path = command.split(" ", 1)[1] + os.chdir(os.path.expanduser(path)) + return f"Changed directory to {os.getcwd()}" + + stdout, stderr, exit_code = self.execute_command_with_live_output(command) + if exit_code != 0 and stderr: + debug_suggestion = self.debug_error(command, stderr, exit_code) + return format_text('yellow') + f"\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() + + return stdout.strip() if stdout else "" def run_direct_command(self, command: str) -> str: try: @@ -334,19 +363,15 @@ def debug_error(self, command: str, error_output: str, exit_code: int) -> str: """ return self.debugger(debug_input) - def handle_error(self, error: str, user_input: str, command: str) -> str: - error_analysis = self.error_handler(f""" - Error: {error} - User Input: {user_input} - Interpreted Command: {command} - Current Directory: {self.current_directory} - Provide ONLY a single, simple corrected command. No explanations. - """) - error_msg = format_text('red') + f"Error occurred: {error}" + reset_format() - suggestion_msg = format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format() - print(error_msg) - print(suggestion_msg) - confirmation = questionary.confirm("Would you like to execute the suggested command?").ask() - if confirmation: - return self.execute_command(error_analysis) - return format_text('red') + "Command execution aborted." + reset_format() \ No newline at end of file + def is_dangerous_command(self, command: str) -> bool: + dangerous_keywords = [ + "rm ", "rmdir ", "del ", "delete ", # File deletion + "chmod ", "chown ", "attrib ", # Permission changes + "mkfs.", "format ", # Formatting + "dd ", # Direct disk operations + "mv ", "move ", # Moving files + "> ", # Output redirection that might overwrite + "reboot", "shutdown", # System commands + "init 0", "init 6" # System state changes + ] + return any(command.strip().lower().startswith(kw.lower()) for kw in dangerous_keywords) \ No newline at end of file diff --git a/promptshell/main.py b/promptshell/main.py index a8dfb90..d85dbe7 100644 --- a/promptshell/main.py +++ b/promptshell/main.py @@ -53,6 +53,13 @@ def main(): - Type 'clear' to clear the terminal.{reset_format()}""") continue + # ✅ Handle alias command + if user_input.lower().startswith("alias "): + result = handle_alias_command(user_input, assistant.alias_manager) + print(result) + continue + + # All other commands (including dangerous ones) are handled by the assistant result = assistant.execute_command(user_input) print(result) From c2fd38c03f233241c814ebc8d98f2d21de55d293 Mon Sep 17 00:00:00 2001 From: Rajveer* Date: Mon, 16 Jun 2025 10:19:26 +0530 Subject: [PATCH 2/2] Add two-step verification for dangerous commands - Implement manual reentry safeguard for dangerous commands - Remove CONFIRM prefix handling - Improve dangerous command detection patterns - Add better error handling and user feedback - Ensure proper validation before executing destructive commands --- promptshell/ai_terminal_assistant.py | 62 +++++++++++++++------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/promptshell/ai_terminal_assistant.py b/promptshell/ai_terminal_assistant.py index 8d27d72..810a983 100644 --- a/promptshell/ai_terminal_assistant.py +++ b/promptshell/ai_terminal_assistant.py @@ -5,6 +5,7 @@ import sys import platform import questionary +import re from typing import Tuple from .node import Node from .data_gatherer import DataGatherer @@ -205,49 +206,45 @@ def execute_interactive_command(self, command: str) -> Tuple[str, str, int]: def execute_command(self, user_input: str) -> str: try: - command = None # Initialize command variable + command = None self.current_directory = os.getcwd() - # Basic input validation if user_input.strip() == "": return "Please provide a valid input." - + + # Handle special commands (?, clear, !) if user_input.startswith('?') or user_input.endswith('?'): return self.answer_question(user_input) - if user_input.lower().strip() == 'clear' or user_input.lower().strip() == 'cls': - if get_current_os() == 'windows': - os.system('cls') - else: - os.system('clear') + if user_input.lower().strip() in ['clear', 'cls']: + os.system('cls' if get_current_os() == 'windows' else 'clear') return "" if user_input.startswith('!'): return self.run_direct_command(user_input[1:]) - additional_data = self.gather_additional_data(user_input) + + # Get command from executor command = self.command_executor(f""" User Input: {user_input} Current OS: {platform.system()} Current Directory: {self.current_directory} Translate the user input into a SINGLE shell command. Return ONLY the command, nothing else. - """, additional_data=additional_data).strip() + """, additional_data=self.gather_additional_data(user_input)).strip() - # Handle dangerous commands with two-step verification + # Two-step verification for dangerous commands if self.is_dangerous_command(command): - # First confirmation - proceed = input(format_text('yellow', bold=True) + - f"\n⚠️ Warning: This is a destructive command.\nDo you want to continue? (yes/no): " + + print(format_text('yellow', bold=True) + + f"\n⚠️ Warning: The command '{command}' may be destructive." + reset_format()) - if proceed.strip().lower() != "yes": + + # First confirmation + if not questionary.confirm("Do you want to proceed?").ask(): return format_text('red', bold=True) + "\nCommand aborted by user." + reset_format() # Second confirmation - manual reentry - confirm_input = input(format_text('yellow', bold=True) + - f"For safety, please re-type the exact command to proceed:\n> " + - reset_format()) + confirm_input = questionary.text("For safety, please re-type the exact command:").ask() if confirm_input.strip() != command.strip(): return format_text('red', bold=True) + "\n❌ Command mismatch. Operation aborted!" + reset_format() - # Execute the validated command return self._execute_validated_command(command) except Exception as e: @@ -364,14 +361,21 @@ def debug_error(self, command: str, error_output: str, exit_code: int) -> str: return self.debugger(debug_input) def is_dangerous_command(self, command: str) -> bool: - dangerous_keywords = [ - "rm ", "rmdir ", "del ", "delete ", # File deletion - "chmod ", "chown ", "attrib ", # Permission changes - "mkfs.", "format ", # Formatting - "dd ", # Direct disk operations - "mv ", "move ", # Moving files - "> ", # Output redirection that might overwrite - "reboot", "shutdown", # System commands - "init 0", "init 6" # System state changes + dangerous_patterns = [ + (r"(?:^|\s)(rm|del|delete)\s+-[rf]*\s+", "Recursive/forced deletion"), + (r"(?:^|\s)(rm|del|delete)\s+.*[\*\?]", "Wildcard deletion"), + (r"(?:^|\s)(chmod|chown)\s+-R\s+", "Recursive permission change"), + (r"(?:^|\s)(dd|format|mkfs)\s+", "Disk operations"), + (r"(?:^|\s)(shutdown|reboot|init\s+[06])\s*", "System control"), + (r"(?:^|\s)>(>?)\s+", "Output redirection"), ] - return any(command.strip().lower().startswith(kw.lower()) for kw in dangerous_keywords) \ No newline at end of file + + cmd_lower = command.lower().strip() + return any( + cmd_lower.startswith(kw) for kw in [ + "rm ", "del ", "delete ", "chmod ", "chown ", + "mkfs.", "dd ", "reboot", "shutdown", "init 0", "init 6" + ] + ) or any( + bool(re.search(pattern, cmd_lower)) for pattern, _ in dangerous_patterns + ) \ No newline at end of file