diff --git a/pdd/sync_orchestration.py b/pdd/sync_orchestration.py index 9e25371..5ec19ff 100644 --- a/pdd/sync_orchestration.py +++ b/pdd/sync_orchestration.py @@ -11,11 +11,15 @@ import subprocess import re import os +import sys +import io +from contextlib import redirect_stdout, redirect_stderr from pathlib import Path from typing import Dict, Any, Optional, List from dataclasses import asdict import click +from rich.console import Console # --- Constants --- MAX_CONSECUTIVE_TESTS = 3 # Allow up to 3 consecutive test attempts @@ -135,6 +139,64 @@ def _save_operation_fingerprint(basename: str, language: str, operation: str, # SyncLock class now imported from sync_determine_operation module +def _display_operation_output(operation: str, stdout_content: str, stderr_content: str, + stop_event: threading.Event, quiet: bool = False): + """Display captured stdout/stderr after an operation completes. + + Temporarily pauses the animation to show output, then resumes. + + Args: + operation: Name of the operation that was executed + stdout_content: Captured stdout from the operation + stderr_content: Captured stderr from the operation + stop_event: Threading event to pause/resume animation + quiet: If True, suppress output display + """ + if quiet: + return + + # Signal animation to pause + stop_event.set() + time.sleep(0.3) # Give animation time to clean up + + # Create console for output display + console = Console() + + # Display operation header + console.print(f"\n{'='*80}") + console.print(f"[bold cyan]Operation: {operation.upper()}[/bold cyan]") + console.print(f"{'='*80}\n") + + # Display STDOUT if present + if stdout_content and stdout_content.strip(): + console.print("[bold green]STDOUT:[/bold green]") + # Print raw content without panel to avoid nested formatting issues + print(stdout_content.strip()) + console.print() # Add spacing + + # Display STDERR if present + if stderr_content and stderr_content.strip(): + console.print("[bold red]STDERR:[/bold red]") + # Print raw content without panel to avoid nested formatting issues + print(stderr_content.strip()) + console.print() # Add spacing + + # If both are empty, show a message + if not (stdout_content and stdout_content.strip()) and not (stderr_content and stderr_content.strip()): + console.print("[dim]No output captured[/dim]\n") + + # Display footer + console.print(f"{'='*80}") + console.print(f"[bold cyan]End of {operation.upper()} output[/bold cyan]") + console.print(f"{'='*80}\n") + + # Small pause so user can see the output + time.sleep(0.5) # Reduced from 1.0s + + # Resume animation + stop_event.clear() + + def _execute_tests_and_create_run_report(test_file: Path, basename: str, language: str, target_coverage: float = 90.0) -> RunReport: """Execute tests and create a RunReport with actual results.""" timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat() @@ -667,48 +729,90 @@ def sync_orchestration( # --- Execute Operation --- try: if operation == 'auto-deps': - # Save the modified prompt to a temporary location - temp_output = str(pdd_files['prompt']).replace('.prompt', '_with_deps.prompt') - - # Read original prompt content to compare later - original_content = pdd_files['prompt'].read_text(encoding='utf-8') - - result = auto_deps_main( - ctx, - prompt_file=str(pdd_files['prompt']), - directory_path=f"{examples_dir}/*", - auto_deps_csv_path="project_dependencies.csv", - output=temp_output, - force_scan=False # Don't force scan every time + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + # Save the modified prompt to a temporary location + temp_output = str(pdd_files['prompt']).replace('.prompt', '_with_deps.prompt') + + # Read original prompt content to compare later + original_content = pdd_files['prompt'].read_text(encoding='utf-8') + + result = auto_deps_main( + ctx, + prompt_file=str(pdd_files['prompt']), + directory_path=f"{examples_dir}/*", + auto_deps_csv_path="project_dependencies.csv", + output=temp_output, + force_scan=False # Don't force scan every time + ) + + # Only move the temp file back if content actually changed + if Path(temp_output).exists(): + import shutil + new_content = Path(temp_output).read_text(encoding='utf-8') + if new_content != original_content: + shutil.move(temp_output, str(pdd_files['prompt'])) + else: + # No changes needed, remove temp file + Path(temp_output).unlink() + # Mark as successful with no changes + result = (new_content, 0.0, 'no-changes') + + # Display captured output + _display_operation_output( + operation='auto-deps', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) - - # Only move the temp file back if content actually changed - if Path(temp_output).exists(): - import shutil - new_content = Path(temp_output).read_text(encoding='utf-8') - if new_content != original_content: - shutil.move(temp_output, str(pdd_files['prompt'])) - else: - # No changes needed, remove temp file - Path(temp_output).unlink() - # Mark as successful with no changes - result = (new_content, 0.0, 'no-changes') elif operation == 'generate': - result = code_generator_main( - ctx, - prompt_file=str(pdd_files['prompt']), - output=str(pdd_files['code']), - original_prompt_file_path=None, - force_incremental_flag=False + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + result = code_generator_main( + ctx, + prompt_file=str(pdd_files['prompt']), + output=str(pdd_files['code']), + original_prompt_file_path=None, + force_incremental_flag=False + ) + + # Display captured output + _display_operation_output( + operation='generate', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) elif operation == 'example': - print(f"DEBUG SYNC: pdd_files['example'] = {pdd_files['example']}") - print(f"DEBUG SYNC: str(pdd_files['example']) = {str(pdd_files['example'])}") - result = context_generator_main( - ctx, - prompt_file=str(pdd_files['prompt']), - code_file=str(pdd_files['code']), - output=str(pdd_files['example']) + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + print(f"DEBUG SYNC: pdd_files['example'] = {pdd_files['example']}") + print(f"DEBUG SYNC: str(pdd_files['example']) = {str(pdd_files['example'])}") + result = context_generator_main( + ctx, + prompt_file=str(pdd_files['prompt']), + code_file=str(pdd_files['code']), + output=str(pdd_files['example']) + ) + + # Display captured output + _display_operation_output( + operation='example', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) elif operation == 'crash': # Validate required files exist before attempting crash operation @@ -875,17 +979,31 @@ def sync_orchestration( Path("crash.log").write_text(crash_log_content) try: - result = crash_main( - ctx, - prompt_file=str(pdd_files['prompt']), - code_file=str(pdd_files['code']), - program_file=str(pdd_files['example']), - error_file="crash.log", - output=str(pdd_files['code']), - output_program=str(pdd_files['example']), - loop=True, - max_attempts=max_attempts, - budget=budget - current_cost_ref[0] + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + result = crash_main( + ctx, + prompt_file=str(pdd_files['prompt']), + code_file=str(pdd_files['code']), + program_file=str(pdd_files['example']), + error_file="crash.log", + output=str(pdd_files['code']), + output_program=str(pdd_files['example']), + loop=True, + max_attempts=max_attempts, + budget=budget - current_cost_ref[0] + ) + + # Display captured output + _display_operation_output( + operation='crash', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) except (RuntimeError, Exception) as e: error_str = str(e) @@ -931,124 +1049,181 @@ def sync_orchestration( append_sync_log(basename, language, log_entry) # Intentionally avoid writing run report/fingerprint here continue - result = fix_verification_main( - ctx, - prompt_file=str(pdd_files['prompt']), - code_file=str(pdd_files['code']), - program_file=str(pdd_files['example']), - output_results=f"{basename}_verify_results.log", - output_code=str(pdd_files['code']), - output_program=str(pdd_files['example']), - loop=True, - verification_program=str(pdd_files['example']), - max_attempts=max_attempts, - budget=budget - current_cost_ref[0] + + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + result = fix_verification_main( + ctx, + prompt_file=str(pdd_files['prompt']), + code_file=str(pdd_files['code']), + program_file=str(pdd_files['example']), + output_results=f"{basename}_verify_results.log", + output_code=str(pdd_files['code']), + output_program=str(pdd_files['example']), + loop=True, + verification_program=str(pdd_files['example']), + max_attempts=max_attempts, + budget=budget - current_cost_ref[0] + ) + + # Display captured output + _display_operation_output( + operation='verify', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) elif operation == 'test': - # First, generate the test file - # Ensure the test directory exists - test_path = pdd_files['test'] - if isinstance(test_path, Path): - # Debug logging - if not quiet: - print(f"Creating test directory: {test_path.parent}") - test_path.parent.mkdir(parents=True, exist_ok=True) - - result = cmd_test_main( - ctx, - prompt_file=str(pdd_files['prompt']), - code_file=str(pdd_files['code']), - output=str(pdd_files['test']), - language=language, - coverage_report=None, - existing_tests=None, - target_coverage=target_coverage, - merge=False + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + # First, generate the test file + # Ensure the test directory exists + test_path = pdd_files['test'] + if isinstance(test_path, Path): + # Debug logging + if not quiet: + print(f"Creating test directory: {test_path.parent}") + test_path.parent.mkdir(parents=True, exist_ok=True) + + result = cmd_test_main( + ctx, + prompt_file=str(pdd_files['prompt']), + code_file=str(pdd_files['code']), + output=str(pdd_files['test']), + language=language, + coverage_report=None, + existing_tests=None, + target_coverage=target_coverage, + merge=False + ) + + # After test generation, check if the test file was actually created + test_file = pdd_files['test'] + test_generation_successful = False + + if isinstance(result, dict) and result.get('success', False): + test_generation_successful = True + elif isinstance(result, tuple) and len(result) >= 3: + # For tuple format, check if the test file actually exists rather than assuming success + test_generation_successful = test_file.exists() + + if test_generation_successful and test_file.exists(): + try: + _execute_tests_and_create_run_report( + test_file, basename, language, target_coverage + ) + except Exception as e: + # Don't fail the entire operation if test execution fails + # Just log it - the test file generation was successful + print(f"Warning: Test execution failed: {e}") + else: + # Test generation failed or test file was not created + error_msg = f"Test generation failed - test file not created: {test_file}" + print(f"Error: {error_msg}") + update_sync_log_entry(log_entry, { + 'success': False, + 'cost': 0.0, + 'model': 'N/A', + 'error': error_msg + }, 0.0) + append_sync_log(basename, language, log_entry) + errors.append(error_msg) + break + + # Display captured output + _display_operation_output( + operation='test', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) - - # After test generation, check if the test file was actually created - test_file = pdd_files['test'] - test_generation_successful = False - - if isinstance(result, dict) and result.get('success', False): - test_generation_successful = True - elif isinstance(result, tuple) and len(result) >= 3: - # For tuple format, check if the test file actually exists rather than assuming success - test_generation_successful = test_file.exists() - - if test_generation_successful and test_file.exists(): + elif operation == 'fix': + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + # Create error file with actual test failure information + error_file_path = Path("fix_errors.log") + + # Try to get actual test failure details from latest run try: - _execute_tests_and_create_run_report( - test_file, basename, language, target_coverage - ) + run_report = read_run_report(basename, language) + test_file = pdd_files.get('test') + if run_report and run_report.tests_failed > 0 and test_file and test_file.exists(): + # Run the tests again to capture actual error output + # Use environment-aware Python executable for pytest execution + python_executable = detect_host_python_executable() + test_result = subprocess.run([ + python_executable, '-m', 'pytest', + str(pdd_files['test']), + '-v', '--tb=short' + ], capture_output=True, text=True, timeout=300) + + error_content = f"Test failures detected ({run_report.tests_failed} failed tests):\n\n" + error_content += "STDOUT:\n" + test_result.stdout + "\n\n" + error_content += "STDERR:\n" + test_result.stderr + else: + error_content = "Simulated test failures" except Exception as e: - # Don't fail the entire operation if test execution fails - # Just log it - the test file generation was successful - print(f"Warning: Test execution failed: {e}") - else: - # Test generation failed or test file was not created - error_msg = f"Test generation failed - test file not created: {test_file}" - print(f"Error: {error_msg}") - update_sync_log_entry(log_entry, { - 'success': False, - 'cost': 0.0, - 'model': 'N/A', - 'error': error_msg - }, 0.0) - append_sync_log(basename, language, log_entry) - errors.append(error_msg) - break - elif operation == 'fix': - # Create error file with actual test failure information - error_file_path = Path("fix_errors.log") - - # Try to get actual test failure details from latest run - try: - run_report = read_run_report(basename, language) - test_file = pdd_files.get('test') - if run_report and run_report.tests_failed > 0 and test_file and test_file.exists(): - # Run the tests again to capture actual error output - # Use environment-aware Python executable for pytest execution - python_executable = detect_host_python_executable() - test_result = subprocess.run([ - python_executable, '-m', 'pytest', - str(pdd_files['test']), - '-v', '--tb=short' - ], capture_output=True, text=True, timeout=300) - - error_content = f"Test failures detected ({run_report.tests_failed} failed tests):\n\n" - error_content += "STDOUT:\n" + test_result.stdout + "\n\n" - error_content += "STDERR:\n" + test_result.stderr - else: - error_content = "Simulated test failures" - except Exception as e: - error_content = f"Could not capture test failures: {e}\nUsing simulated test failures" - - error_file_path.write_text(error_content) - - result = fix_main( - ctx, - prompt_file=str(pdd_files['prompt']), - code_file=str(pdd_files['code']), - unit_test_file=str(pdd_files['test']), - error_file=str(error_file_path), - output_test=str(pdd_files['test']), - output_code=str(pdd_files['code']), - output_results=f"{basename}_fix_results.log", - loop=True, - verification_program=str(pdd_files['example']), - max_attempts=max_attempts, - budget=budget - current_cost_ref[0], - auto_submit=True + error_content = f"Could not capture test failures: {e}\nUsing simulated test failures" + + error_file_path.write_text(error_content) + + result = fix_main( + ctx, + prompt_file=str(pdd_files['prompt']), + code_file=str(pdd_files['code']), + unit_test_file=str(pdd_files['test']), + error_file=str(error_file_path), + output_test=str(pdd_files['test']), + output_code=str(pdd_files['code']), + output_results=f"{basename}_fix_results.log", + loop=True, + verification_program=str(pdd_files['example']), + max_attempts=max_attempts, + budget=budget - current_cost_ref[0], + auto_submit=True + ) + + # Display captured output + _display_operation_output( + operation='fix', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) elif operation == 'update': - result = update_main( - ctx, - input_prompt_file=str(pdd_files['prompt']), - modified_code_file=str(pdd_files['code']), - input_code_file=None, - output=str(pdd_files['prompt']), - git=True + # Capture stdout/stderr + stdout_capture = io.StringIO() + stderr_capture = io.StringIO() + + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + result = update_main( + ctx, + input_prompt_file=str(pdd_files['prompt']), + modified_code_file=str(pdd_files['code']), + input_code_file=None, + output=str(pdd_files['prompt']), + git=True + ) + + # Display captured output + _display_operation_output( + operation='update', + stdout_content=stdout_capture.getvalue(), + stderr_content=stderr_capture.getvalue(), + stop_event=stop_event, + quiet=quiet ) else: errors.append(f"Unknown operation '{operation}' requested.") diff --git a/tests/test_sync_orchestration.py b/tests/test_sync_orchestration.py index a9d1184..c8ca79c 100644 --- a/tests/test_sync_orchestration.py +++ b/tests/test_sync_orchestration.py @@ -1039,3 +1039,281 @@ def test_verify_skipped_when_example_missing_after_crash_skip(orchestration_fixt # And there should be no orchestrator errors if verify was correctly skipped assert not result.get('errors'), f"Unexpected errors: {result.get('errors')}" + + +# --- Tests for output capture feature (Issue #65) --- + +def test_display_operation_output_basic_functionality(): + """ + Test _display_operation_output function displays stdout and stderr correctly. + This tests the core functionality added for issue #65. + """ + from pdd.sync_orchestration import _display_operation_output + import threading + from unittest.mock import patch + + stop_event = threading.Event() + stop_event.clear() # Start with animation running + + stdout_content = "This is stdout output\nLine 2 of stdout" + stderr_content = "This is stderr output\nWarning message" + + # Mock the console output to verify it's called + with patch('pdd.sync_orchestration.Console') as mock_console_class: + mock_console = MagicMock() + mock_console_class.return_value = mock_console + + # Also patch time.sleep to avoid delays in tests + with patch('pdd.sync_orchestration.time.sleep'): + _display_operation_output( + operation='generate', + stdout_content=stdout_content, + stderr_content=stderr_content, + stop_event=stop_event, + quiet=False + ) + + # After function completes, stop event should be clear (animation resumed) + assert not stop_event.is_set() + + # Verify console was created + assert mock_console_class.called + + # Verify print methods were called for the output + assert mock_console.print.call_count > 0 + + +def test_display_operation_output_respects_quiet_flag(): + """ + Test that _display_operation_output respects the quiet flag. + When quiet=True, no output should be displayed. + """ + from pdd.sync_orchestration import _display_operation_output + import threading + from unittest.mock import patch + + stop_event = threading.Event() + stop_event.clear() + + with patch('pdd.sync_orchestration.Console') as mock_console_class: + _display_operation_output( + operation='test', + stdout_content="Some output", + stderr_content="Some error", + stop_event=stop_event, + quiet=True # Quiet mode enabled + ) + + # Verify Console was not created when quiet=True + mock_console_class.assert_not_called() + + # Verify stop event was NOT set (no pause needed) + assert not stop_event.is_set() + + +def test_display_operation_output_handles_empty_output(): + """ + Test that _display_operation_output handles empty stdout/stderr gracefully. + """ + from pdd.sync_orchestration import _display_operation_output + import threading + from unittest.mock import patch + + stop_event = threading.Event() + + with patch('pdd.sync_orchestration.Console') as mock_console_class: + mock_console = MagicMock() + mock_console_class.return_value = mock_console + + with patch('pdd.sync_orchestration.time.sleep'): + _display_operation_output( + operation='example', + stdout_content="", # Empty stdout + stderr_content="", # Empty stderr + stop_event=stop_event, + quiet=False + ) + + # Should still create console and show "No output captured" message + assert mock_console_class.called + # After function completes, stop event should be clear (animation resumed) + assert not stop_event.is_set() + + +def test_output_capture_in_generate_operation(orchestration_fixture): + """ + Integration test: verify that generate operation captures and displays output. + This tests that the output capture wrapper is properly integrated. + """ + mocks = orchestration_fixture + mock_determine = mocks['sync_determine_operation'] + mock_code_gen = mocks['code_generator_main'] + + # Set up simple workflow: generate -> all_synced + mock_determine.side_effect = [ + SyncDecision(operation='generate', reason='New unit'), + SyncDecision(operation='all_synced', reason='Done'), + ] + + # Mock code_generator_main to produce some output when called + def mock_gen_with_output(*args, **kwargs): + # Simulate printing during operation + print("Generating code...") + print("Code generation complete") + return {'success': True, 'cost': 0.05, 'model': 'mock-model'} + + mock_code_gen.side_effect = mock_gen_with_output + + # Patch _display_operation_output to verify it's called + with patch('pdd.sync_orchestration._display_operation_output') as mock_display: + result = sync_orchestration(basename="calculator", language="python", quiet=False) + + # Verify sync completed successfully + assert result['success'] is True + assert 'generate' in result['operations_completed'] + + # Verify _display_operation_output was called for the generate operation + assert mock_display.call_count >= 1 + + # Verify it was called with the correct operation name + display_calls = mock_display.call_args_list + operation_names = [call[1]['operation'] for call in display_calls if 'operation' in call[1]] + assert 'generate' in operation_names + + +def test_output_capture_preserves_operation_success(): + """ + Test that output capture doesn't interfere with operation success detection. + This is a regression test to ensure the wrapper doesn't break existing functionality. + """ + from pdd.sync_orchestration import sync_orchestration + import threading + from unittest.mock import patch, MagicMock + from pathlib import Path + + # Create minimal test environment + tmp_path = Path.cwd() + (tmp_path / "prompts").mkdir(exist_ok=True) + (tmp_path / "prompts" / "test_python.prompt").write_text("test prompt") + + with patch('pdd.sync_orchestration.sync_determine_operation') as mock_determine, \ + patch('pdd.sync_orchestration.SyncLock') as mock_lock, \ + patch('pdd.sync_orchestration.sync_animation'), \ + patch('pdd.sync_orchestration.code_generator_main') as mock_code_gen, \ + patch('pdd.sync_orchestration.get_pdd_file_paths') as mock_get_paths, \ + patch('pdd.sync_orchestration._display_operation_output'), \ + patch('pdd.sync_orchestration._save_operation_fingerprint'): + + # Configure mocks + mock_lock.return_value.__enter__.return_value = mock_lock + mock_lock.return_value.__exit__.return_value = None + + mock_get_paths.return_value = { + 'prompt': tmp_path / 'prompts' / 'test_python.prompt', + 'code': tmp_path / 'src' / 'test.py', + 'example': tmp_path / 'examples' / 'test_example.py', + 'test': tmp_path / 'tests' / 'test_test.py' + } + + # Set up workflow + mock_determine.side_effect = [ + SyncDecision(operation='generate', reason='New unit'), + SyncDecision(operation='all_synced', reason='Done'), + ] + + # Mock successful code generation with output + def gen_with_output(*args, **kwargs): + print("Generating code with output capture") + return {'success': True, 'cost': 0.05, 'model': 'mock'} + + mock_code_gen.side_effect = gen_with_output + + # Run sync + result = sync_orchestration(basename="test", language="python") + + # Verify operation was marked as successful despite output capture + assert result['success'] is True + assert 'generate' in result['operations_completed'] + assert result['total_cost'] > 0 # Cost was tracked correctly + + +def test_output_capture_quiet_mode_integration(orchestration_fixture): + """ + Integration test: verify quiet mode suppresses output display throughout workflow. + """ + mocks = orchestration_fixture + mock_determine = mocks['sync_determine_operation'] + + # Set up multi-operation workflow + mock_determine.side_effect = [ + SyncDecision(operation='generate', reason='New unit'), + SyncDecision(operation='example', reason='Generate example'), + SyncDecision(operation='all_synced', reason='Done'), + ] + + # Patch _display_operation_output to verify it respects quiet mode + with patch('pdd.sync_orchestration._display_operation_output') as mock_display: + result = sync_orchestration( + basename="calculator", + language="python", + quiet=True # Quiet mode enabled + ) + + # Verify sync completed + assert result['success'] is True + + # Verify _display_operation_output was called with quiet=True + for call in mock_display.call_args_list: + assert call[1]['quiet'] is True + + +def test_output_capture_with_operation_failure(): + """ + Test that output capture works correctly when an operation fails. + The output should still be displayed before the failure is handled. + """ + from pdd.sync_orchestration import sync_orchestration + from unittest.mock import patch, MagicMock + from pathlib import Path + + tmp_path = Path.cwd() + (tmp_path / "prompts").mkdir(exist_ok=True) + (tmp_path / "prompts" / "fail_python.prompt").write_text("test") + + with patch('pdd.sync_orchestration.sync_determine_operation') as mock_determine, \ + patch('pdd.sync_orchestration.SyncLock') as mock_lock, \ + patch('pdd.sync_orchestration.sync_animation'), \ + patch('pdd.sync_orchestration.code_generator_main') as mock_code_gen, \ + patch('pdd.sync_orchestration.get_pdd_file_paths') as mock_get_paths, \ + patch('pdd.sync_orchestration._display_operation_output') as mock_display, \ + patch('pdd.sync_orchestration._save_operation_fingerprint'): + + mock_lock.return_value.__enter__.return_value = mock_lock + mock_lock.return_value.__exit__.return_value = None + + mock_get_paths.return_value = { + 'prompt': tmp_path / 'prompts' / 'fail_python.prompt', + 'code': tmp_path / 'src' / 'fail.py', + 'example': tmp_path / 'examples' / 'fail_example.py', + 'test': tmp_path / 'tests' / 'test_fail.py' + } + + mock_determine.side_effect = [ + SyncDecision(operation='generate', reason='New unit'), + ] + + # Mock failed operation with error output + def gen_with_error(*args, **kwargs): + print("Starting code generation...") + print("ERROR: Generation failed!", file=__import__('sys').stderr) + return {'success': False, 'cost': 0.01, 'model': 'mock', 'error': 'Generation failed'} + + mock_code_gen.side_effect = gen_with_error + + result = sync_orchestration(basename="fail", language="python") + + # Verify operation failed + assert result['success'] is False + + # Verify _display_operation_output was still called (output shown before failure) + assert mock_display.call_count >= 1