From 00ad3a2c2e4a8531f458d97cb36217d21d675d43 Mon Sep 17 00:00:00 2001 From: shrutu0929 Date: Thu, 9 Apr 2026 19:34:31 +0530 Subject: [PATCH 1/2] fix/perf: Thread-safe collection of semantic_skip_warnings in parallel analyze --- refactron/core/parallel.py | 43 ++++++++++++++++++++++--------------- refactron/core/refactron.py | 17 +++++++++------ 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/refactron/core/parallel.py b/refactron/core/parallel.py index 2fea0c0..49dcf3b 100644 --- a/refactron/core/parallel.py +++ b/refactron/core/parallel.py @@ -7,7 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple from refactron.core.analysis_result import FileAnalysisError -from refactron.core.models import FileMetrics +from refactron.core.models import FileMetrics, AnalysisSkipWarning logger = logging.getLogger(__name__) @@ -58,20 +58,20 @@ def __init__( def process_files( self, files: List[Path], - process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError]]], + process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]], progress_callback: Optional[Callable[[int, int], None]] = None, - ) -> Tuple[List[FileMetrics], List[FileAnalysisError]]: + ) -> Tuple[List[FileMetrics], List[FileAnalysisError], List[AnalysisSkipWarning]]: """ Process multiple files in parallel. Args: files: List of file paths to process. process_func: Function to process a single file. Should return - (FileMetrics, None) on success or (None, FileAnalysisError) on error. + (FileMetrics, None, skip_warn) on success or (None, FileAnalysisError, None) on error. progress_callback: Optional callback for progress updates (completed, total). Returns: - Tuple of (successful results, failed files). + Tuple of (successful results, failed files, skip warnings). """ if not self.enabled or len(files) <= 1: # Process sequentially if disabled or only one file @@ -86,20 +86,23 @@ def process_files( def _process_sequential( self, files: List[Path], - process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError]]], + process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]], progress_callback: Optional[Callable[[int, int], None]] = None, - ) -> Tuple[List[FileMetrics], List[FileAnalysisError]]: + ) -> Tuple[List[FileMetrics], List[FileAnalysisError], List[AnalysisSkipWarning]]: """Process files sequentially.""" results: List[FileMetrics] = [] errors: List[FileAnalysisError] = [] + skips: List[AnalysisSkipWarning] = [] for i, file_path in enumerate(files): try: - result, error = process_func(file_path) + result, error, skip = process_func(file_path) if result is not None: results.append(result) if error is not None: errors.append(error) + if skip is not None: + skips.append(skip) except Exception as e: logger.error(f"Unexpected error processing {file_path}: {e}", exc_info=True) errors.append( @@ -114,17 +117,18 @@ def _process_sequential( if progress_callback: progress_callback(i + 1, len(files)) - return results, errors + return results, errors, skips def _process_parallel_threads( self, files: List[Path], - process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError]]], + process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]], progress_callback: Optional[Callable[[int, int], None]] = None, - ) -> Tuple[List[FileMetrics], List[FileAnalysisError]]: + ) -> Tuple[List[FileMetrics], List[FileAnalysisError], List[AnalysisSkipWarning]]: """Process files in parallel using threads.""" results: List[FileMetrics] = [] errors: List[FileAnalysisError] = [] + skips: List[AnalysisSkipWarning] = [] completed = 0 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: @@ -139,11 +143,13 @@ def _process_parallel_threads( completed += 1 try: - result, error = future.result() + result, error, skip = future.result() if result is not None: results.append(result) if error is not None: errors.append(error) + if skip is not None: + skips.append(skip) except Exception as e: logger.error(f"Unexpected error processing {file_path}: {e}", exc_info=True) recovery_msg = "Check the file for syntax errors or encoding issues" @@ -159,14 +165,14 @@ def _process_parallel_threads( if progress_callback: progress_callback(completed, len(files)) - return results, errors + return results, errors, skips def _process_parallel_processes( self, files: List[Path], - process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError]]], + process_func: Callable[[Path], Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]], progress_callback: Optional[Callable[[int, int], None]] = None, - ) -> Tuple[List[FileMetrics], List[FileAnalysisError]]: + ) -> Tuple[List[FileMetrics], List[FileAnalysisError], List[AnalysisSkipWarning]]: """ Process files in parallel using processes. @@ -176,6 +182,7 @@ def _process_parallel_processes( """ results: List[FileMetrics] = [] errors: List[FileAnalysisError] = [] + skips: List[AnalysisSkipWarning] = [] completed = 0 try: @@ -191,11 +198,13 @@ def _process_parallel_processes( completed += 1 try: - result, error = future.result() + result, error, skip = future.result() if result is not None: results.append(result) if error is not None: errors.append(error) + if skip is not None: + skips.append(skip) except Exception as e: logger.error(f"Unexpected error processing {file_path}: {e}", exc_info=True) recovery_msg = "Check the file for syntax errors or encoding issues" @@ -216,7 +225,7 @@ def _process_parallel_processes( logger.info("Falling back to sequential processing") return self._process_sequential(files, process_func, progress_callback) - return results, errors + return results, errors, skips def get_config(self) -> Dict[str, Any]: """ diff --git a/refactron/core/refactron.py b/refactron/core/refactron.py index 48580b2..63aa0eb 100644 --- a/refactron/core/refactron.py +++ b/refactron/core/refactron.py @@ -265,15 +265,15 @@ def analyze(self, target: Union[str, Path]) -> AnalysisResult: # Create a wrapper function for parallel processing def process_file_wrapper( file_path: Path, - ) -> Tuple[Optional[FileMetrics], Optional[FileAnalysisError]]: + ) -> Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]: try: - file_metrics = self._analyze_file(file_path) + file_metrics, skip_warn = self._analyze_file(file_path) # Update incremental tracker if self.incremental_tracker.enabled: self.incremental_tracker.update_file_state(file_path) - return file_metrics, None + return file_metrics, None, skip_warn except AnalysisError as e: logger.debug(f"Failed to analyze {file_path}: {e}") error = FileAnalysisError( @@ -282,7 +282,7 @@ def process_file_wrapper( error_type=e.__class__.__name__, recovery_suggestion=e.recovery_suggestion, ) - return None, error + return None, error, None except Exception as e: logger.error(f"Unexpected error analyzing {file_path}: {e}", exc_info=True) error = FileAnalysisError( @@ -291,24 +291,27 @@ def process_file_wrapper( error_type=e.__class__.__name__, recovery_suggestion="Check the file for syntax errors or encoding issues", ) - return None, error + return None, error, None # Process files in parallel - file_metrics_list, error_list = self.parallel_processor.process_files( + file_metrics_list, error_list, skip_warnings = self.parallel_processor.process_files( files, process_file_wrapper, ) result.file_metrics.extend(file_metrics_list) result.failed_files.extend(error_list) + result.semantic_skip_warnings.extend(skip_warnings) result.total_issues = sum(fm.issue_count for fm in file_metrics_list) else: # Sequential processing for file_path in files: try: - file_metrics = self._analyze_file(file_path) + file_metrics, skip_warn = self._analyze_file(file_path) result.file_metrics.append(file_metrics) result.total_issues += file_metrics.issue_count + if skip_warn is not None: + result.semantic_skip_warnings.append(skip_warn) # Update incremental tracker if self.incremental_tracker.enabled: From dea2f934304174dd0ed3bee29b51464d130fe8c7 Mon Sep 17 00:00:00 2001 From: shrutu0929 Date: Thu, 9 Apr 2026 20:03:55 +0530 Subject: [PATCH 2/2] solve this error --- refactron/core/refactron.py | 3 +-- tests/test_config_management.py | 35 +++++++++++++------------- tests/test_performance_optimization.py | 5 ++-- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/refactron/core/refactron.py b/refactron/core/refactron.py index bbe10fc..e83bd7b 100644 --- a/refactron/core/refactron.py +++ b/refactron/core/refactron.py @@ -270,8 +270,7 @@ def process_file_wrapper( ) -> Tuple[Optional[FileMetrics], Optional[FileAnalysisError], Optional[AnalysisSkipWarning]]: try: file_metrics, skip_warn = self._analyze_file(file_path) - if skip_warn is not None: - result.semantic_skip_warnings.append(skip_warn) + # Warnings are collected by ParallelProcessor.process_files return # Update incremental tracker if self.incremental_tracker.enabled: diff --git a/tests/test_config_management.py b/tests/test_config_management.py index c71b71a..50afda8 100644 --- a/tests/test_config_management.py +++ b/tests/test_config_management.py @@ -903,16 +903,16 @@ def test_parallel_processor_sequential_and_thread_modes(tmp_path: Path) -> None: def process_func(p: Path): if p.name == "bad.py": raise ValueError("boom") - return None, None + return None, None, None p_seq = ParallelProcessor(max_workers=1, use_processes=False, enabled=True) - _, errors = p_seq.process_files(files, process_func) + _, errors, skips = p_seq.process_files(files, process_func) assert p_seq.enabled is False assert len(errors) == 1 assert isinstance(errors[0], FileAnalysisError) p_thr = ParallelProcessor(max_workers=2, use_processes=False, enabled=True) - results, errors = p_thr.process_files(files, lambda p: (None, None)) + results, errors, skips = p_thr.process_files(files, lambda p: (None, None, None)) assert results == [] assert errors == [] assert p_thr.get_config()["max_workers"] == 2 @@ -1171,11 +1171,11 @@ def make_error(path): def success_func(p): - return make_metrics(p), None + return make_metrics(p), None, None def error_func(p): - return None, make_error(p) + return None, make_error(p), None def raises_func(p): @@ -1210,23 +1210,23 @@ def test_get_config(self): class TestSequentialProcessing: def test_empty_files(self): pp = ParallelProcessor(enabled=False) - results, errors = pp.process_files([], success_func) - assert results == [] and errors == [] + results, errors, skips = pp.process_files([], success_func) + assert results == [] and errors == [] and skips == [] def test_single_file_success(self): pp = ParallelProcessor(enabled=False) files = [Path("a.py")] - results, errors = pp.process_files(files, success_func) - assert len(results) == 1 and len(errors) == 0 + results, errors, skips = pp.process_files(files, success_func) + assert len(results) == 1 and len(errors) == 0 and len(skips) == 0 def test_single_file_error(self): pp = ParallelProcessor(enabled=False) - results, errors = pp.process_files([Path("a.py")], error_func) - assert len(results) == 0 and len(errors) == 1 + results, errors, skips = pp.process_files([Path("a.py")], error_func) + assert len(results) == 0 and len(errors) == 1 and len(skips) == 0 def test_single_file_exception(self): pp = ParallelProcessor(enabled=False) - results, errors = pp.process_files([Path("a.py")], raises_func) + results, errors, skips = pp.process_files([Path("a.py")], raises_func) assert len(errors) == 1 def test_progress_callback(self): @@ -1244,12 +1244,13 @@ class TestThreadedProcessing: def test_two_files_threads(self): pp = ParallelProcessor(max_workers=2, use_processes=False, enabled=True) files = [Path("a.py"), Path("b.py")] - results, errors = pp.process_files(files, success_func) + results, errors, skips = pp.process_files(files, success_func) assert len(results) == 2 + assert len(skips) == 0 def test_thread_error_handling(self): pp = ParallelProcessor(max_workers=2, use_processes=False, enabled=True) - results, errors = pp.process_files([Path("a.py"), Path("b.py")], raises_func) + results, errors, skips = pp.process_files([Path("a.py"), Path("b.py")], raises_func) assert len(errors) == 2 def test_thread_progress_callback(self): @@ -1264,7 +1265,7 @@ def test_thread_progress_callback(self): def test_single_file_goes_sequential(self): pp = ParallelProcessor(max_workers=4, use_processes=False, enabled=True) - results, errors = pp.process_files([Path("a.py")], success_func) + results, errors, skips = pp.process_files([Path("a.py")], success_func) assert len(results) == 1 @@ -1274,13 +1275,13 @@ def test_process_pool_falls_back_on_exception(self): with patch( "refactron.core.parallel.ProcessPoolExecutor", side_effect=Exception("spawn fail") ): - results, errors = pp.process_files([Path("a.py")], success_func) + results, errors, skips = pp.process_files([Path("a.py")], success_func) assert len(results) == 1 def test_process_pool_success(self): pp = ParallelProcessor(max_workers=2, use_processes=True, enabled=True) mock_future = MagicMock() - mock_future.result.return_value = (make_metrics(Path("a.py")), None) + mock_future.result.return_value = (make_metrics(Path("a.py")), None, None) mock_exec = MagicMock() mock_exec.__enter__ = lambda s: s mock_exec.__exit__ = MagicMock(return_value=False) diff --git a/tests/test_performance_optimization.py b/tests/test_performance_optimization.py index 0b66afc..ca60500 100644 --- a/tests/test_performance_optimization.py +++ b/tests/test_performance_optimization.py @@ -242,11 +242,12 @@ def test_sequential_processing(self): def process_func(file_path): # Simulate processing - return None, None + return None, None, None - results, errors = processor.process_files(files, process_func) + results, errors, skips = processor.process_files(files, process_func) assert len(results) == 0 # All return None assert len(errors) == 0 + assert len(skips) == 0 class TestMemoryProfiler: