diff --git a/pyproject.toml b/pyproject.toml index bdfecd610..8f9451256 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -180,9 +180,8 @@ audit = ["scitex-audit>=0.1.2"] # Benchmark Module - Performance monitoring # Use: pip install scitex[benchmark] -benchmark = [ - "psutil", -] +# Real implementation lives in the standalone scitex-benchmark package. +benchmark = ["scitex-benchmark>=0.1.0"] # Bridge Module - External system integration # Use: pip install scitex[bridge] diff --git a/src/scitex/benchmark/__init__.py b/src/scitex/benchmark/__init__.py index e2eb02f55..f9bc42063 100755 --- a/src/scitex/benchmark/__init__.py +++ b/src/scitex/benchmark/__init__.py @@ -1,40 +1,20 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Time-stamp: "2025-07-25 05:25:00" -# File: __init__.py +"""SciTeX benchmark — thin compatibility shim for scitex-benchmark. -""" -SciTeX Performance Benchmarking Suite +Aliases ``scitex.benchmark`` to the standalone ``scitex_benchmark`` package +via ``sys.modules``. ``scitex.benchmark is scitex_benchmark``. -This module provides tools for benchmarking and monitoring the performance -of SciTeX functions. +Install: ``pip install scitex[benchmark]`` (or ``pip install scitex-benchmark``). +See: https://github.com/ywatanabe1989/scitex-benchmark """ -from .benchmark import ( - BenchmarkResult, - BenchmarkSuite, - benchmark_function, - benchmark_module, - compare_implementations, - run_all_benchmarks, -) -from .monitor import PerformanceMonitor, get_performance_stats, track_performance -from .profiler import get_profile_report, profile_function, profile_module +import sys as _sys + +try: + import scitex_benchmark as _real +except ImportError as _e: # pragma: no cover + raise ImportError( + "scitex.benchmark requires the 'scitex-benchmark' package. " + "Install with: pip install scitex[benchmark] (or: pip install scitex-benchmark)" + ) from _e -__all__ = [ - # Benchmarking - "benchmark_function", - "benchmark_module", - "BenchmarkResult", - "BenchmarkSuite", - "run_all_benchmarks", - "compare_implementations", - # Profiling - "profile_function", - "profile_module", - "get_profile_report", - # Monitoring - "PerformanceMonitor", - "track_performance", - "get_performance_stats", -] +_sys.modules[__name__] = _real diff --git a/src/scitex/benchmark/_skills/SKILL.md b/src/scitex/benchmark/_skills/SKILL.md deleted file mode 100644 index 5714168f9..000000000 --- a/src/scitex/benchmark/_skills/SKILL.md +++ /dev/null @@ -1,47 +0,0 @@ ---- -name: stx.benchmark -description: Performance benchmarking, profiling, and monitoring tools for SciTeX functions. ---- - -# stx.benchmark - -The `stx.benchmark` module provides tools for measuring, profiling, and monitoring the performance of SciTeX functions. It supports function-level benchmarking, module-level profiling, and continuous performance monitoring. - -## Sub-skills - -- [benchmarking.md](benchmarking.md) — `benchmark_function`, `compare_implementations`, `BenchmarkSuite`, `run_all_benchmarks` -- [profiling.md](profiling.md) — `profile_function`, `profile_block`, `profile_module`, `track_memory` -- [monitoring.md](monitoring.md) — `track_performance`, `PerformanceMonitor`, alert thresholds - -## Quick Reference - -```python -import scitex as stx - -# Benchmark a single function -result = stx.benchmark.benchmark_function(my_func, args=(data,), iterations=50) -print(result.mean_time, result.std_time) - -# Profile function calls -@stx.benchmark.profile_function -def my_func(x): - return process(x) - -report = stx.benchmark.get_profile_report() - -# Monitor continuously -from scitex.benchmark.monitor import start_monitoring -start_monitoring() - -@stx.benchmark.track_performance -def my_func(x): - return process(x) - -stats = stx.benchmark.get_performance_stats() - -# Compare implementations -df = stx.benchmark.compare_implementations( - {"impl_a": func_a, "impl_b": func_b}, - test_data_generator=lambda: ((data,), {}) -) -``` diff --git a/src/scitex/benchmark/_skills/benchmarking.md b/src/scitex/benchmark/_skills/benchmarking.md deleted file mode 100644 index 907667dc8..000000000 --- a/src/scitex/benchmark/_skills/benchmarking.md +++ /dev/null @@ -1,116 +0,0 @@ -# Benchmarking Functions with stx.benchmark - -The benchmarking sub-system measures how long functions take to run, with warmup runs and statistical summaries. - -## benchmark_function - -```python -from scitex.benchmark import benchmark_function, BenchmarkResult - -import numpy as np - -def my_fft(x): - return np.fft.fft(x) - -data = np.random.randn(8192) - -result = benchmark_function( - func=my_fft, - args=(data,), - iterations=50, # number of timed runs (default: 10) - warmup=2, # warmup runs before timing (default: 2) - input_size="8192 samples", - measure_memory=True, # requires psutil -) - -print(result) -# my_fft: 0.000123s +- 0.000005s (n=50) - -print(result.mean_time) # float (seconds) -print(result.std_time) # float -print(result.min_time) # float -print(result.max_time) # float -print(result.memory_usage) # MB, or None if psutil not installed -``` - -## BenchmarkResult fields - -| Field | Type | Description | -|-------|------|-------------| -| `function_name` | str | Name of the benchmarked function | -| `module` | str | Module where function is defined | -| `mean_time` | float | Mean elapsed time in seconds | -| `std_time` | float | Standard deviation of elapsed time | -| `min_time` | float | Minimum elapsed time | -| `max_time` | float | Maximum elapsed time | -| `iterations` | int | Number of timed runs | -| `input_size` | str or None | User-supplied size description | -| `memory_usage` | float or None | RSS memory in MB (requires psutil) | - -## compare_implementations - -Compare multiple implementations side-by-side: - -```python -from scitex.benchmark import compare_implementations - -def baseline_sort(arr): - return sorted(arr) - -def numpy_sort(arr): - return np.sort(arr) - -def data_gen(): - arr = list(np.random.randn(10000)) - return (arr,), {} - -df = compare_implementations( - implementations={"builtin": baseline_sort, "numpy": numpy_sort}, - test_data_generator=data_gen, - iterations=20, -) -print(df) -# Columns: implementation, mean_time, std_time, speedup -# speedup is relative to the first implementation -``` - -## BenchmarkSuite - -Group multiple benchmarks together and run them as a suite: - -```python -from scitex.benchmark import BenchmarkSuite - -suite = BenchmarkSuite("Signal Processing") - -suite.add_benchmark( - func=np.fft.fft, - test_data_generator=lambda: ((np.random.randn(8192),), {}), - name="FFT 8192", - sizes=["8192"], -) - -suite.add_benchmark( - func=np.fft.fft, - test_data_generator=lambda: ((np.random.randn(65536),), {}), - name="FFT 65536", - sizes=["65536"], -) - -df = suite.run(iterations=20, verbose=True) -suite.save_results("fft_benchmarks.csv") - -# Compare against a saved baseline -comparison = suite.compare_with_baseline("fft_baseline.csv") -print(comparison[["function", "size", "speedup"]]) -``` - -## Pre-defined Suite Runners - -```python -from scitex.benchmark import run_all_benchmarks - -# Run pre-defined suites for IO and stats modules -results = run_all_benchmarks(output_dir="./benchmark_results") -# Saves: io_benchmark.csv, stats_benchmark.csv, benchmark_summary.csv -``` diff --git a/src/scitex/benchmark/_skills/monitoring.md b/src/scitex/benchmark/_skills/monitoring.md deleted file mode 100644 index 37ff17baa..000000000 --- a/src/scitex/benchmark/_skills/monitoring.md +++ /dev/null @@ -1,107 +0,0 @@ -# Performance Monitoring with stx.benchmark - -The monitoring sub-system provides continuous runtime tracking of functions using a global `PerformanceMonitor` instance. - -## track_performance (decorator) - -The `track_performance` decorator records each call's duration, memory delta, argument size, and result size into the global monitor: - -```python -from scitex.benchmark import track_performance, get_performance_stats -from scitex.benchmark.monitor import start_monitoring - -# Start the global monitor first -start_monitoring() - -@track_performance -def load_data(path): - return np.load(path) - -# Call the function normally -for path in file_list: - load_data(path) - -# Retrieve aggregated stats -stats = get_performance_stats("load_data") -# { -# "function": "load_data", -# "count": N, -# "total_time": float, -# "avg_time": float, -# "min_time": float, -# "max_time": float, -# "error_rate": float, -# } - -# All functions at once -all_stats = get_performance_stats() -``` - -## PerformanceMonitor (direct usage) - -```python -from scitex.benchmark import PerformanceMonitor - -monitor = PerformanceMonitor(max_history=500) -monitor.start() - -# Record metrics -from scitex.benchmark.monitor import PerformanceMetric -import time - -metric = PerformanceMetric( - timestamp=time.time(), - function="my_operation", - duration=0.5, -) -monitor.record_metric(metric) - -# Retrieve stats -stats = monitor.get_stats("my_operation") - -# Recent metrics list -recent = monitor.get_recent_metrics(n=50) - -# Persist to disk and reload -monitor.save_metrics("metrics.json") -monitor.load_metrics("metrics.json") - -monitor.stop() -monitor.clear() -``` - -## Alert Thresholds - -The global monitor emits `warnings.warn()` when thresholds are exceeded. Defaults are: - -| Alert type | Default threshold | -|------------|------------------| -| `slow_function` | 1.0 s | -| `memory_spike` | 100 MB | -| `error_rate` | 10% (after 10+ calls) | - -```python -from scitex.benchmark.monitor import set_performance_alerts, add_performance_alert_handler - -# Tighten the slow-function threshold -set_performance_alerts(slow_function=0.1) - -# Custom alert callback -def my_handler(alert): - if alert["type"] == "slow_function": - print(f"SLOW: {alert['function']} took {alert['duration']:.2f}s") - -add_performance_alert_handler(my_handler) -``` - -## PerformanceMetric fields - -| Field | Type | Description | -|-------|------|-------------| -| `timestamp` | float | Unix time when call started | -| `function` | str | Function name | -| `duration` | float | Wall time in seconds | -| `memory_delta` | float or None | RSS change in MB (requires psutil) | -| `args_size` | int or None | `sys.getsizeof(args + kwargs)` | -| `result_size` | int or None | `sys.getsizeof(result)` | -| `exception` | str or None | Exception message if call failed | diff --git a/src/scitex/benchmark/_skills/profiling.md b/src/scitex/benchmark/_skills/profiling.md deleted file mode 100644 index d3a732716..000000000 --- a/src/scitex/benchmark/_skills/profiling.md +++ /dev/null @@ -1,93 +0,0 @@ -# Profiling with stx.benchmark - -The profiling sub-system uses Python's `cProfile` to identify where time is spent inside a function's call stack. - -## profile_function (decorator) - -```python -from scitex.benchmark import profile_function, get_profile_report - -@profile_function -def process_data(x): - # some expensive computation - return np.fft.fft(x) ** 2 - -# Call normally — profile is accumulated in the global profiler -for _ in range(10): - process_data(np.random.randn(8192)) - -# Retrieve structured report -report = get_profile_report() -# report["process_data"]["call_count"] -> 10 -# report["process_data"]["total_time"] -> float (seconds) -# report["process_data"]["avg_time"] -> float -# report["process_data"]["profile"] -> cProfile text output (top 10 callers) -``` - -## profile_block (context manager) - -Profile an arbitrary code block — prints directly to stdout: - -```python -from scitex.benchmark.profiler import profile_block - -with profile_block("data_loading"): - data = np.load("large_array.npy") - processed = np.fft.rfft(data) -# Prints: total time + top 10 cumulative call stats -``` - -## profile_module - -Wrap all public functions in a module with profiling: - -```python -from scitex.benchmark import profile_module - -profiler = profile_module("scitex.dsp", pattern="*") -# Output: "Profiling N functions in scitex.dsp" -# Now run code that calls those functions... -import scitex.dsp as dsp -dsp.some_function(data) - -# Get report from the returned FunctionProfiler -report = profiler.get_report() -``` - -## FunctionProfiler class (direct usage) - -```python -from scitex.benchmark.profiler import FunctionProfiler - -profiler = FunctionProfiler() - -@profiler.profile -def my_func(x): - return np.sort(x) - -for _ in range(5): - my_func(np.random.randn(10000)) - -profiler.print_stats("my_func", top_n=10) -# Prints: call count, total time, avg time, and cProfile top-10 -``` - -## track_memory (context manager) - -Track memory usage for a code block (requires `psutil`): - -```python -from scitex.benchmark.profiler import track_memory - -with track_memory("array allocation"): - big = np.zeros((10000, 10000)) -# Prints: start/end/delta RSS in MB -``` - -## get_memory_usage - -```python -from scitex.benchmark.profiler import get_memory_usage - -mb = get_memory_usage() # current process RSS in MB, or None if psutil missing -``` diff --git a/src/scitex/benchmark/benchmark.py b/src/scitex/benchmark/benchmark.py deleted file mode 100755 index 809ac543c..000000000 --- a/src/scitex/benchmark/benchmark.py +++ /dev/null @@ -1,409 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Time-stamp: "2025-07-25 05:30:00" -# File: benchmark.py - -""" -Core benchmarking functionality for SciTeX. -""" - -import gc -import inspect -import os -import time -from dataclasses import dataclass -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple - -import numpy as np -import pandas as pd - - -@dataclass -class BenchmarkResult: - """Results from a benchmark run.""" - - function_name: str - module: str - mean_time: float - std_time: float - min_time: float - max_time: float - iterations: int - input_size: Optional[str] = None - memory_usage: Optional[float] = None - notes: Optional[str] = None - - def __str__(self): - return ( - f"{self.function_name}: {self.mean_time:.3f}s ± {self.std_time:.3f}s " - f"(n={self.iterations})" - ) - - def to_dict(self): - """Convert to dictionary for easy serialization.""" - return { - "function": self.function_name, - "module": self.module, - "mean_time": self.mean_time, - "std_time": self.std_time, - "min_time": self.min_time, - "max_time": self.max_time, - "iterations": self.iterations, - "input_size": self.input_size, - "memory_usage": self.memory_usage, - "notes": self.notes, - } - - -def benchmark_function( - func: Callable, - args: tuple = (), - kwargs: dict = None, - iterations: int = 10, - warmup: int = 2, - input_size: Optional[str] = None, - measure_memory: bool = False, -) -> BenchmarkResult: - """ - Benchmark a single function. - - Parameters - ---------- - func : Callable - Function to benchmark - args : tuple - Arguments to pass to function - kwargs : dict - Keyword arguments to pass to function - iterations : int - Number of benchmark iterations - warmup : int - Number of warmup iterations - input_size : str, optional - Description of input size - measure_memory : bool - Whether to measure memory usage - - Returns - ------- - BenchmarkResult - Benchmark results - """ - if kwargs is None: - kwargs = {} - - # Warmup runs - for _ in range(warmup): - _ = func(*args, **kwargs) - - # Garbage collection before timing - gc.collect() - - # Timing runs - times = [] - for _ in range(iterations): - start = time.perf_counter() - _ = func(*args, **kwargs) - end = time.perf_counter() - times.append(end - start) - - times = np.array(times) - - # Get function info - module = inspect.getmodule(func).__name__ if inspect.getmodule(func) else "unknown" - - # Memory measurement (simplified) - memory_usage = None - if measure_memory: - try: - import psutil - - process = psutil.Process(os.getpid()) - memory_usage = process.memory_info().rss / 1024 / 1024 # MB - except: - pass - - return BenchmarkResult( - function_name=func.__name__, - module=module, - mean_time=np.mean(times), - std_time=np.std(times), - min_time=np.min(times), - max_time=np.max(times), - iterations=iterations, - input_size=input_size, - memory_usage=memory_usage, - ) - - -def compare_implementations( - implementations: Dict[str, Callable], - test_data_generator: Callable[[], Tuple[tuple, dict]], - iterations: int = 10, - sizes: Optional[List[str]] = None, -) -> pd.DataFrame: - """ - Compare multiple implementations of the same functionality. - - Parameters - ---------- - implementations : dict - Dictionary mapping implementation names to functions - test_data_generator : callable - Function that returns (args, kwargs) for testing - iterations : int - Number of iterations per implementation - sizes : list, optional - List of input sizes to test - - Returns - ------- - pd.DataFrame - Comparison results - """ - results = [] - - for name, func in implementations.items(): - # Generate test data - args, kwargs = test_data_generator() - - # Benchmark - result = benchmark_function( - func, args=args, kwargs=kwargs, iterations=iterations - ) - - results.append( - { - "implementation": name, - "mean_time": result.mean_time, - "std_time": result.std_time, - "speedup": 1.0, # Will calculate relative to baseline - } - ) - - df = pd.DataFrame(results) - - # Calculate speedup relative to first implementation - baseline_time = df.iloc[0]["mean_time"] - df["speedup"] = baseline_time / df["mean_time"] - - return df - - -class BenchmarkSuite: - """Collection of benchmarks for a module or set of functions.""" - - def __init__(self, name: str): - self.name = name - self.benchmarks = [] - self.results = [] - - def add_benchmark( - self, - func: Callable, - test_data_generator: Callable[[], Tuple[tuple, dict]], - name: Optional[str] = None, - sizes: Optional[List[str]] = None, - ): - """Add a benchmark to the suite.""" - self.benchmarks.append( - { - "func": func, - "data_gen": test_data_generator, - "name": name or func.__name__, - "sizes": sizes or ["default"], - } - ) - - def run(self, iterations: int = 10, verbose: bool = True) -> pd.DataFrame: - """Run all benchmarks in the suite.""" - results = [] - - for benchmark in self.benchmarks: - if verbose: - print(f"Running benchmark: {benchmark['name']}") - - for size in benchmark["sizes"]: - # Generate test data - args, kwargs = benchmark["data_gen"]() - - # Run benchmark - result = benchmark_function( - benchmark["func"], - args=args, - kwargs=kwargs, - iterations=iterations, - input_size=size, - ) - - result_dict = result.to_dict() - result_dict["size"] = size - results.append(result_dict) - - if verbose: - print(f" {size}: {result}") - - self.results = pd.DataFrame(results) - return self.results - - def save_results(self, path: str): - """Save benchmark results to CSV.""" - if self.results is not None: - self.results.to_csv(path, index=False) - - def compare_with_baseline(self, baseline_path: str) -> pd.DataFrame: - """Compare current results with baseline.""" - baseline = pd.read_csv(baseline_path) - - # Merge on function name and size - comparison = pd.merge( - self.results, - baseline, - on=["function", "size"], - suffixes=("_current", "_baseline"), - ) - - # Calculate speedup - comparison["speedup"] = ( - comparison["mean_time_baseline"] / comparison["mean_time_current"] - ) - - return comparison - - -def benchmark_module(module_name: str, pattern: str = "test_*") -> BenchmarkSuite: - """ - Create a benchmark suite for all matching functions in a module. - - Parameters - ---------- - module_name : str - Name of module to benchmark - pattern : str - Pattern to match function names - - Returns - ------- - BenchmarkSuite - Suite containing all matching benchmarks - """ - import fnmatch - import importlib - - module = importlib.import_module(module_name) - suite = BenchmarkSuite(module_name) - - # Find all matching functions - for name in dir(module): - if fnmatch.fnmatch(name, pattern): - func = getattr(module, name) - if callable(func): - # Create simple test data generator - def data_gen(): - return (), {} - - suite.add_benchmark(func, data_gen, name) - - return suite - - -# Pre-defined benchmark suites for common SciTeX modules -def create_io_benchmark_suite() -> BenchmarkSuite: - """Create benchmark suite for I/O operations.""" - import tempfile - - import numpy as np - - suite = BenchmarkSuite("IO Operations") - - # Benchmark numpy file loading - def numpy_data_gen(): - data = np.random.randn(1000, 1000) - with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: - np.save(f.name, data) - return (f.name,), {} - - import scitex.io - - suite.add_benchmark( - scitex.io.load, numpy_data_gen, "load_numpy", sizes=["1MB", "10MB", "100MB"] - ) - - return suite - - -def create_stats_benchmark_suite() -> BenchmarkSuite: - """Create benchmark suite for statistics operations.""" - import numpy as np - - suite = BenchmarkSuite("Statistics Operations") - - # Benchmark correlation - def corr_data_gen(): - x = np.random.randn(1000) - y = x + np.random.randn(1000) * 0.5 - return (x, y), {"n_perm": 1000} - - import scitex.stats - - suite.add_benchmark( - scitex.stats.corr_test, - corr_data_gen, - "correlation_test", - sizes=["1000_samples", "10000_samples"], - ) - - return suite - - -def run_all_benchmarks( - output_dir: str = "./benchmark_results", -) -> Dict[str, pd.DataFrame]: - """ - Run all pre-defined benchmark suites. - - Parameters - ---------- - output_dir : str - Directory to save results - - Returns - ------- - dict - Dictionary mapping suite names to results - """ - output_path = Path(output_dir) - output_path.mkdir(exist_ok=True) - - suites = { - "io": create_io_benchmark_suite(), - "stats": create_stats_benchmark_suite(), - } - - results = {} - for name, suite in suites.items(): - print(f"\nRunning {name} benchmarks...") - df = suite.run() - - # Save results - suite.save_results(output_path / f"{name}_benchmark.csv") - results[name] = df - - # Create summary - summary = [] - for name, df in results.items(): - summary.append( - { - "suite": name, - "functions": len(df["function"].unique()), - "mean_time": df["mean_time"].mean(), - "total_time": df["mean_time"].sum(), - } - ) - - summary_df = pd.DataFrame(summary) - summary_df.to_csv(output_path / "benchmark_summary.csv", index=False) - - print(f"\nBenchmark results saved to {output_path}") - return results diff --git a/src/scitex/benchmark/monitor.py b/src/scitex/benchmark/monitor.py deleted file mode 100755 index 6f064715d..000000000 --- a/src/scitex/benchmark/monitor.py +++ /dev/null @@ -1,380 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Time-stamp: "2025-07-25 05:40:00" -# File: monitor.py - -""" -Real-time performance monitoring for SciTeX. -""" - -import json -import threading -import time -import warnings -from collections import defaultdict, deque -from dataclasses import dataclass -from datetime import datetime -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional - - -@dataclass -class PerformanceMetric: - """Single performance measurement.""" - - timestamp: float - function: str - duration: float - memory_delta: Optional[float] = None - args_size: Optional[int] = None - result_size: Optional[int] = None - exception: Optional[str] = None - - -class PerformanceMonitor: - """ - Monitor performance metrics for SciTeX functions. - - Example - ------- - >>> monitor = PerformanceMonitor() - >>> monitor.start() - >>> # Your code here - >>> stats = monitor.get_stats() - """ - - def __init__(self, max_history: int = 1000): - self.max_history = max_history - self.metrics = deque(maxlen=max_history) - self.function_stats = defaultdict( - lambda: { - "count": 0, - "total_time": 0.0, - "min_time": float("inf"), - "max_time": 0.0, - "errors": 0, - } - ) - self.is_monitoring = False - self._lock = threading.Lock() - - # Alerts configuration - self.alerts = { - "slow_function": 1.0, # Alert if function takes > 1s - "memory_spike": 100, # Alert if memory increases > 100MB - "error_rate": 0.1, # Alert if error rate > 10% - } - self.alert_callbacks = [] - - def start(self): - """Start monitoring.""" - self.is_monitoring = True - - def stop(self): - """Stop monitoring.""" - self.is_monitoring = False - - def record_metric(self, metric: PerformanceMetric): - """Record a performance metric.""" - if not self.is_monitoring: - return - - with self._lock: - self.metrics.append(metric) - - # Update function statistics - stats = self.function_stats[metric.function] - stats["count"] += 1 - stats["total_time"] += metric.duration - stats["min_time"] = min(stats["min_time"], metric.duration) - stats["max_time"] = max(stats["max_time"], metric.duration) - - if metric.exception: - stats["errors"] += 1 - - # Check alerts - self._check_alerts(metric) - - def _check_alerts(self, metric: PerformanceMetric): - """Check if metric triggers any alerts.""" - alerts_triggered = [] - - # Slow function alert - if metric.duration > self.alerts["slow_function"]: - alerts_triggered.append( - { - "type": "slow_function", - "function": metric.function, - "duration": metric.duration, - "threshold": self.alerts["slow_function"], - } - ) - - # Memory spike alert - if metric.memory_delta and metric.memory_delta > self.alerts["memory_spike"]: - alerts_triggered.append( - { - "type": "memory_spike", - "function": metric.function, - "delta": metric.memory_delta, - "threshold": self.alerts["memory_spike"], - } - ) - - # Error rate alert - stats = self.function_stats[metric.function] - if stats["count"] > 10: # Only check after sufficient calls - error_rate = stats["errors"] / stats["count"] - if error_rate > self.alerts["error_rate"]: - alerts_triggered.append( - { - "type": "high_error_rate", - "function": metric.function, - "rate": error_rate, - "threshold": self.alerts["error_rate"], - } - ) - - # Trigger callbacks - for alert in alerts_triggered: - for callback in self.alert_callbacks: - callback(alert) - - def add_alert_callback(self, callback: Callable[[Dict[str, Any]], None]): - """Add a callback for performance alerts.""" - self.alert_callbacks.append(callback) - - def get_stats(self, function: Optional[str] = None) -> Dict[str, Any]: - """ - Get performance statistics. - - Parameters - ---------- - function : str, optional - Specific function to get stats for - - Returns - ------- - dict - Performance statistics - """ - with self._lock: - if function: - stats = self.function_stats.get(function, {}) - if stats and stats["count"] > 0: - return { - "function": function, - "count": stats["count"], - "total_time": stats["total_time"], - "avg_time": stats["total_time"] / stats["count"], - "min_time": stats["min_time"], - "max_time": stats["max_time"], - "error_rate": stats["errors"] / stats["count"], - } - return {} - else: - # Return all stats - all_stats = {} - for func, stats in self.function_stats.items(): - if stats["count"] > 0: - all_stats[func] = { - "count": stats["count"], - "avg_time": stats["total_time"] / stats["count"], - "min_time": stats["min_time"], - "max_time": stats["max_time"], - "error_rate": stats["errors"] / stats["count"], - } - return all_stats - - def get_recent_metrics(self, n: int = 100) -> List[PerformanceMetric]: - """Get n most recent metrics.""" - with self._lock: - return list(self.metrics)[-n:] - - def save_metrics(self, path: str): - """Save metrics to file.""" - with self._lock: - data = { - "metrics": [ - { - "timestamp": m.timestamp, - "function": m.function, - "duration": m.duration, - "memory_delta": m.memory_delta, - "args_size": m.args_size, - "result_size": m.result_size, - "exception": m.exception, - } - for m in self.metrics - ], - "stats": dict(self.function_stats), - } - - Path(path).write_text(json.dumps(data, indent=2)) - - def load_metrics(self, path: str): - """Load metrics from file.""" - data = json.loads(Path(path).read_text()) - - with self._lock: - self.metrics.clear() - for m in data["metrics"]: - self.metrics.append(PerformanceMetric(**m)) - - self.function_stats.clear() - self.function_stats.update(data["stats"]) - - def clear(self): - """Clear all metrics.""" - with self._lock: - self.metrics.clear() - self.function_stats.clear() - - -# Global monitor instance -_global_monitor = PerformanceMonitor() - - -def track_performance(func: Callable) -> Callable: - """ - Decorator to track function performance. - - Example - ------- - >>> @track_performance - ... def my_function(x): - ... return x ** 2 - """ - import sys - from functools import wraps - - @wraps(func) - def wrapper(*args, **kwargs): - if not _global_monitor.is_monitoring: - return func(*args, **kwargs) - - # Get memory before (if available) - try: - import psutil - - process = psutil.Process() - mem_before = process.memory_info().rss / 1024 / 1024 - except: - mem_before = None - - # Time the function - start_time = time.time() - exception = None - result = None - - try: - result = func(*args, **kwargs) - except Exception as e: - exception = str(e) - raise - finally: - duration = time.time() - start_time - - # Get memory after - mem_delta = None - if mem_before is not None: - try: - mem_after = process.memory_info().rss / 1024 / 1024 - mem_delta = mem_after - mem_before - except: - pass - - # Estimate sizes - args_size = None - result_size = None - try: - args_size = sys.getsizeof(args) + sys.getsizeof(kwargs) - if result is not None: - result_size = sys.getsizeof(result) - except: - pass - - # Record metric - metric = PerformanceMetric( - timestamp=start_time, - function=func.__name__, - duration=duration, - memory_delta=mem_delta, - args_size=args_size, - result_size=result_size, - exception=exception, - ) - - _global_monitor.record_metric(metric) - - return result - - return wrapper - - -def start_monitoring(): - """Start global performance monitoring.""" - _global_monitor.start() - - -def stop_monitoring(): - """Stop global performance monitoring.""" - _global_monitor.stop() - - -def get_performance_stats(function: Optional[str] = None) -> Dict[str, Any]: - """Get performance statistics from global monitor.""" - return _global_monitor.get_stats(function) - - -def set_performance_alerts(**thresholds): - """ - Set performance alert thresholds. - - Parameters - ---------- - slow_function : float - Alert if function takes longer than this (seconds) - memory_spike : float - Alert if memory increases by more than this (MB) - error_rate : float - Alert if error rate exceeds this (0-1) - """ - _global_monitor.alerts.update(thresholds) - - -def add_performance_alert_handler(handler: Callable[[Dict[str, Any]], None]): - """ - Add a handler for performance alerts. - - Example - ------- - >>> def alert_handler(alert): - ... print(f"ALERT: {alert['type']} in {alert['function']}") - >>> add_performance_alert_handler(alert_handler) - """ - _global_monitor.add_alert_callback(handler) - - -# Default alert handler -def _default_alert_handler(alert: Dict[str, Any]): - """Default handler that prints warnings.""" - if alert["type"] == "slow_function": - warnings.warn( - f"Slow function: {alert['function']} took {alert['duration']:.2f}s " - f"(threshold: {alert['threshold']}s)" - ) - elif alert["type"] == "memory_spike": - warnings.warn( - f"Memory spike: {alert['function']} increased memory by {alert['delta']:.1f}MB " - f"(threshold: {alert['threshold']}MB)" - ) - elif alert["type"] == "high_error_rate": - warnings.warn( - f"High error rate: {alert['function']} has {alert['rate']:.1%} error rate " - f"(threshold: {alert['threshold']:.1%})" - ) - - -# Register default handler -add_performance_alert_handler(_default_alert_handler) diff --git a/src/scitex/benchmark/profiler.py b/src/scitex/benchmark/profiler.py deleted file mode 100755 index 39fff72ae..000000000 --- a/src/scitex/benchmark/profiler.py +++ /dev/null @@ -1,300 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Time-stamp: "2025-07-25 05:35:00" -# File: profiler.py - -""" -Profiling tools for SciTeX performance analysis. -""" - -import cProfile -import io -import pstats -import time -from contextlib import contextmanager -from functools import wraps -from typing import Any, Callable, Dict, Optional - - -class FunctionProfiler: - """Profile individual function calls.""" - - def __init__(self): - self.profiles = {} - self.call_counts = {} - self.total_times = {} - - def profile(self, func: Callable) -> Callable: - """ - Decorator to profile a function. - - Example - ------- - >>> profiler = FunctionProfiler() - >>> @profiler.profile - ... def my_function(x): - ... return x ** 2 - """ - - @wraps(func) - def wrapper(*args, **kwargs): - # Create profiler for this call - pr = cProfile.Profile() - pr.enable() - - # Call function - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - - pr.disable() - - # Store results - func_name = func.__name__ - if func_name not in self.profiles: - self.profiles[func_name] = [] - self.call_counts[func_name] = 0 - self.total_times[func_name] = 0.0 - - self.profiles[func_name].append(pr) - self.call_counts[func_name] += 1 - self.total_times[func_name] += end_time - start_time - - return result - - return wrapper - - def get_stats(self, func_name: str) -> Optional[pstats.Stats]: - """Get profiling statistics for a function.""" - if func_name not in self.profiles: - return None - - # Combine all profiles for this function - combined = pstats.Stats(self.profiles[func_name][0]) - for pr in self.profiles[func_name][1:]: - combined.add(pr) - - return combined - - def print_stats(self, func_name: Optional[str] = None, top_n: int = 10): - """Print profiling statistics.""" - if func_name: - stats = self.get_stats(func_name) - if stats: - print(f"\nProfile for {func_name}:") - print(f"Total calls: {self.call_counts[func_name]}") - print(f"Total time: {self.total_times[func_name]:.3f}s") - print( - f"Avg time per call: {self.total_times[func_name] / self.call_counts[func_name]:.3f}s" - ) - print("\nDetailed stats:") - stats.sort_stats("cumulative").print_stats(top_n) - else: - # Print all functions - for name in self.profiles: - self.print_stats(name, top_n) - - def get_report(self) -> Dict[str, Any]: - """Get a summary report of all profiled functions.""" - report = {} - for func_name in self.profiles: - stats = self.get_stats(func_name) - - # Get top time consumers - s = io.StringIO() - stats.sort_stats("cumulative").print_stats(10, s) - - report[func_name] = { - "call_count": self.call_counts[func_name], - "total_time": self.total_times[func_name], - "avg_time": self.total_times[func_name] / self.call_counts[func_name], - "profile": s.getvalue(), - } - - return report - - -# Global profiler instance -_global_profiler = FunctionProfiler() - - -def profile_function(func: Callable) -> Callable: - """ - Decorator to profile a function using the global profiler. - - Example - ------- - >>> @profile_function - ... def my_function(x): - ... return sum(range(x)) - """ - return _global_profiler.profile(func) - - -def get_profile_report() -> Dict[str, Any]: - """Get profiling report from global profiler.""" - return _global_profiler.get_report() - - -def print_profile_stats(func_name: Optional[str] = None): - """Print profiling statistics from global profiler.""" - _global_profiler.print_stats(func_name) - - -@contextmanager -def profile_block(name: str): - """ - Context manager for profiling a code block. - - Example - ------- - >>> with profile_block("data_processing"): - ... # Some expensive operations - ... data = process_data() - """ - pr = cProfile.Profile() - pr.enable() - start_time = time.time() - - try: - yield - finally: - pr.disable() - end_time = time.time() - - print(f"\nProfile for block '{name}':") - print(f"Total time: {end_time - start_time:.3f}s") - - s = io.StringIO() - ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") - ps.print_stats(10) - print(s.getvalue()) - - -def profile_module(module_name: str, pattern: str = "*") -> Dict[str, Any]: - """ - Profile all matching functions in a module. - - Parameters - ---------- - module_name : str - Name of module to profile - pattern : str - Pattern to match function names - - Returns - ------- - dict - Profiling results - """ - import fnmatch - import importlib - - module = importlib.import_module(module_name) - profiler = FunctionProfiler() - - # Wrap all matching functions - wrapped_functions = [] - for name in dir(module): - if fnmatch.fnmatch(name, pattern): - obj = getattr(module, name) - if callable(obj) and not name.startswith("_"): - # Replace with profiled version - profiled = profiler.profile(obj) - setattr(module, name, profiled) - wrapped_functions.append(name) - - print(f"Profiling {len(wrapped_functions)} functions in {module_name}") - print(f"Wrapped: {', '.join(wrapped_functions)}") - print("\nRun your code now. Call get_profile_report() when done.") - - return profiler - - -class LineProfiler: - """ - Line-by-line profiler for detailed analysis. - - Note: This is a simplified version. For production use, - consider using the line_profiler package. - """ - - def __init__(self): - self.timings = {} - - def profile_lines(self, func: Callable) -> Callable: - """Profile a function line by line.""" - import inspect - - @wraps(func) - def wrapper(*args, **kwargs): - # Get source lines - source_lines = inspect.getsourcelines(func)[0] - line_times = {} - - # This is a simplified implementation - # Real line profiling requires bytecode instrumentation - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - - # Store timing - func_name = func.__name__ - if func_name not in self.timings: - self.timings[func_name] = [] - - self.timings[func_name].append( - {"total_time": end_time - start_time, "source": source_lines} - ) - - return result - - return wrapper - - def print_timings(self, func_name: str): - """Print line timings for a function.""" - if func_name not in self.timings: - print(f"No timings for {func_name}") - return - - timing = self.timings[func_name][-1] # Most recent - print(f"\nLine timings for {func_name}:") - print(f"Total time: {timing['total_time']:.3f}s") - print("\nSource code:") - for i, line in enumerate(timing["source"]): - print(f"{i + 1:4d}: {line.rstrip()}") - - -# Memory profiling utilities -def get_memory_usage(): - """Get current memory usage in MB.""" - try: - import psutil - - process = psutil.Process() - return process.memory_info().rss / 1024 / 1024 - except ImportError: - return None - - -@contextmanager -def track_memory(name: str): - """ - Track memory usage for a code block. - - Example - ------- - >>> with track_memory("data_loading"): - ... data = load_large_dataset() - """ - start_mem = get_memory_usage() - - try: - yield - finally: - end_mem = get_memory_usage() - if start_mem and end_mem: - print(f"\nMemory usage for '{name}':") - print(f"Start: {start_mem:.1f} MB") - print(f"End: {end_mem:.1f} MB") - print(f"Delta: {end_mem - start_mem:+.1f} MB") diff --git a/tests/scitex/benchmark/test_benchmark.py b/tests/scitex/benchmark/test_benchmark.py deleted file mode 100644 index 90f8a09d2..000000000 --- a/tests/scitex/benchmark/test_benchmark.py +++ /dev/null @@ -1,943 +0,0 @@ -#!/usr/bin/env python3 -# Time-stamp: "2025-01-05" -# File: test_benchmark.py - -"""Tests for scitex.benchmark.benchmark module.""" - -import os -import tempfile -from pathlib import Path - -import numpy as np -import pandas as pd -import pytest - -from scitex.benchmark.benchmark import ( - BenchmarkResult, - BenchmarkSuite, - benchmark_function, - benchmark_module, - compare_implementations, -) - -# ============================================================================ -# Test Fixtures -# ============================================================================ - - -@pytest.fixture -def sample_function(): - """A simple function for benchmarking.""" - - def add_numbers(a, b): - return a + b - - return add_numbers - - -@pytest.fixture -def slow_function(): - """A function that takes measurable time.""" - import time - - def slow_add(a, b): - time.sleep(0.01) # 10ms - return a + b - - return slow_add - - -@pytest.fixture -def benchmark_result(): - """Create a sample BenchmarkResult.""" - return BenchmarkResult( - function_name="test_func", - module="test_module", - mean_time=0.1, - std_time=0.01, - min_time=0.08, - max_time=0.12, - iterations=10, - input_size="100x100", - memory_usage=50.0, - notes="Test benchmark", - ) - - -@pytest.fixture -def temp_dir(): - """Create a temporary directory for test files.""" - with tempfile.TemporaryDirectory() as tmpdir: - yield tmpdir - - -# ============================================================================ -# Test BenchmarkResult -# ============================================================================ - - -class TestBenchmarkResult: - """Tests for BenchmarkResult dataclass.""" - - def test_creation_with_required_fields(self): - """Test BenchmarkResult with only required fields.""" - result = BenchmarkResult( - function_name="my_func", - module="my_module", - mean_time=0.5, - std_time=0.05, - min_time=0.4, - max_time=0.6, - iterations=10, - ) - - assert result.function_name == "my_func" - assert result.module == "my_module" - assert result.mean_time == 0.5 - assert result.std_time == 0.05 - assert result.min_time == 0.4 - assert result.max_time == 0.6 - assert result.iterations == 10 - assert result.input_size is None - assert result.memory_usage is None - assert result.notes is None - - def test_creation_with_all_fields(self, benchmark_result): - """Test BenchmarkResult with all fields.""" - assert benchmark_result.function_name == "test_func" - assert benchmark_result.input_size == "100x100" - assert benchmark_result.memory_usage == 50.0 - assert benchmark_result.notes == "Test benchmark" - - def test_str_representation(self, benchmark_result): - """Test __str__ returns expected format.""" - result_str = str(benchmark_result) - - assert "test_func" in result_str - assert "0.100s" in result_str - assert "0.010s" in result_str - assert "n=10" in result_str - - def test_to_dict(self, benchmark_result): - """Test to_dict serialization.""" - result_dict = benchmark_result.to_dict() - - assert isinstance(result_dict, dict) - assert result_dict["function"] == "test_func" - assert result_dict["module"] == "test_module" - assert result_dict["mean_time"] == 0.1 - assert result_dict["std_time"] == 0.01 - assert result_dict["min_time"] == 0.08 - assert result_dict["max_time"] == 0.12 - assert result_dict["iterations"] == 10 - assert result_dict["input_size"] == "100x100" - assert result_dict["memory_usage"] == 50.0 - assert result_dict["notes"] == "Test benchmark" - - def test_to_dict_has_all_expected_keys(self, benchmark_result): - """Test that to_dict has all expected keys.""" - result_dict = benchmark_result.to_dict() - expected_keys = { - "function", - "module", - "mean_time", - "std_time", - "min_time", - "max_time", - "iterations", - "input_size", - "memory_usage", - "notes", - } - assert set(result_dict.keys()) == expected_keys - - -# ============================================================================ -# Test benchmark_function -# ============================================================================ - - -class TestBenchmarkFunction: - """Tests for benchmark_function.""" - - def test_basic_benchmark(self, sample_function): - """Test basic function benchmarking.""" - result = benchmark_function( - sample_function, args=(1, 2), iterations=5, warmup=1 - ) - - assert isinstance(result, BenchmarkResult) - assert result.function_name == "add_numbers" - assert result.iterations == 5 - assert result.mean_time >= 0 - assert result.std_time >= 0 - assert result.min_time <= result.mean_time <= result.max_time - - def test_benchmark_with_kwargs(self, sample_function): - """Test benchmarking with keyword arguments.""" - result = benchmark_function( - sample_function, args=(1,), kwargs={"b": 2}, iterations=5 - ) - - assert isinstance(result, BenchmarkResult) - assert result.function_name == "add_numbers" - - def test_timing_consistency(self, slow_function): - """Test that timing is consistent and measurable.""" - result = benchmark_function(slow_function, args=(1, 2), iterations=5, warmup=1) - - # 10ms sleep should result in measurable time - assert result.mean_time >= 0.009 # Allow some slack - assert result.mean_time < 0.1 # But not too slow - - def test_input_size_parameter(self, sample_function): - """Test that input_size is recorded.""" - result = benchmark_function( - sample_function, args=(1, 2), input_size="small", iterations=3 - ) - - assert result.input_size == "small" - - def test_warmup_iterations(self, sample_function): - """Test warmup iterations are executed.""" - call_count = [0] - original_func = sample_function - - def counting_func(a, b): - call_count[0] += 1 - return a + b - - result = benchmark_function(counting_func, args=(1, 2), iterations=3, warmup=2) - - # Should have 2 warmup + 3 benchmark = 5 total calls - assert call_count[0] == 5 - - def test_default_kwargs_none(self, sample_function): - """Test that None kwargs default to empty dict.""" - # Should not raise - kwargs internally becomes {} - result = benchmark_function(sample_function, args=(1, 2), kwargs=None) - assert isinstance(result, BenchmarkResult) - - def test_module_detection(self, sample_function): - """Test that module name is detected.""" - result = benchmark_function(sample_function, args=(1, 2)) - # Module should be detected (though may be __main__ or test module) - assert isinstance(result.module, str) - assert len(result.module) > 0 - - def test_measure_memory_flag(self, sample_function): - """Test memory measurement flag.""" - result_no_mem = benchmark_function( - sample_function, args=(1, 2), measure_memory=False - ) - # Memory might be None if psutil not available - # Just verify it doesn't crash - assert isinstance(result_no_mem, BenchmarkResult) - - result_with_mem = benchmark_function( - sample_function, args=(1, 2), measure_memory=True - ) - # Memory might still be None if psutil not installed - assert isinstance(result_with_mem, BenchmarkResult) - - -# ============================================================================ -# Test compare_implementations -# ============================================================================ - - -class TestCompareImplementations: - """Tests for compare_implementations.""" - - def test_compare_two_implementations(self): - """Test comparing two implementations.""" - - def impl1(x): - return sum(range(x)) - - def impl2(x): - return x * (x - 1) // 2 - - implementations = {"loop": impl1, "formula": impl2} - - def data_gen(): - return (1000,), {} - - df = compare_implementations(implementations, data_gen, iterations=3) - - assert isinstance(df, pd.DataFrame) - assert len(df) == 2 - assert "implementation" in df.columns - assert "mean_time" in df.columns - assert "std_time" in df.columns - assert "speedup" in df.columns - - def test_speedup_calculation(self): - """Test that speedup is calculated correctly.""" - import time - - def slow_impl(x): - time.sleep(0.01) - return x - - def fast_impl(x): - return x - - implementations = {"slow": slow_impl, "fast": fast_impl} - - def data_gen(): - return (10,), {} - - df = compare_implementations(implementations, data_gen, iterations=3) - - # First implementation has speedup 1.0 (baseline) - assert df.iloc[0]["speedup"] == 1.0 - # Fast implementation should have speedup > 1 - assert df.iloc[1]["speedup"] > 1.0 - - def test_empty_implementations(self): - """Test with no implementations raises IndexError.""" - implementations = {} - - def data_gen(): - return (), {} - - # Empty implementations causes IndexError when accessing baseline_time - with pytest.raises(IndexError): - compare_implementations(implementations, data_gen, iterations=3) - - def test_single_implementation(self): - """Test with single implementation.""" - implementations = {"only": lambda x: x} - - def data_gen(): - return (1,), {} - - df = compare_implementations(implementations, data_gen, iterations=3) - assert len(df) == 1 - assert df.iloc[0]["speedup"] == 1.0 - - -# ============================================================================ -# Test BenchmarkSuite -# ============================================================================ - - -class TestBenchmarkSuite: - """Tests for BenchmarkSuite class.""" - - def test_suite_creation(self): - """Test suite creation.""" - suite = BenchmarkSuite("test_suite") - - assert suite.name == "test_suite" - assert suite.benchmarks == [] - assert suite.results == [] - - def test_add_benchmark(self): - """Test adding benchmarks to suite.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen, name="custom_name", sizes=["small"]) - - assert len(suite.benchmarks) == 1 - assert suite.benchmarks[0]["func"] == my_func - assert suite.benchmarks[0]["name"] == "custom_name" - assert suite.benchmarks[0]["sizes"] == ["small"] - - def test_add_benchmark_default_name(self): - """Test adding benchmark uses function name by default.""" - suite = BenchmarkSuite("test_suite") - - def my_named_function(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_named_function, data_gen) - - assert suite.benchmarks[0]["name"] == "my_named_function" - - def test_add_benchmark_default_sizes(self): - """Test adding benchmark uses default size.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen) - - assert suite.benchmarks[0]["sizes"] == ["default"] - - def test_run_suite(self, capsys): - """Test running benchmark suite.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen, sizes=["small", "large"]) - - results = suite.run(iterations=3, verbose=True) - - assert isinstance(results, pd.DataFrame) - assert len(results) == 2 # Two sizes - assert "function" in results.columns - assert "mean_time" in results.columns - assert "size" in results.columns - - # Check verbose output - captured = capsys.readouterr() - assert "Running benchmark" in captured.out - - def test_run_suite_quiet(self, capsys): - """Test running suite without verbose output.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen) - suite.run(iterations=2, verbose=False) - - captured = capsys.readouterr() - assert "Running benchmark" not in captured.out - - def test_save_results(self, temp_dir): - """Test saving results to CSV.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen) - suite.run(iterations=2, verbose=False) - - output_path = os.path.join(temp_dir, "results.csv") - suite.save_results(output_path) - - assert os.path.exists(output_path) - - # Verify CSV content - loaded_df = pd.read_csv(output_path) - assert "function" in loaded_df.columns - assert len(loaded_df) > 0 - - def test_save_results_no_results(self, temp_dir): - """Test saving when no results exist yet raises AttributeError.""" - suite = BenchmarkSuite("test_suite") - output_path = os.path.join(temp_dir, "results.csv") - - # Empty results list causes AttributeError (list has no to_csv) - with pytest.raises(AttributeError): - suite.save_results(output_path) - - def test_compare_with_baseline(self, temp_dir): - """Test comparing with baseline results.""" - suite = BenchmarkSuite("test_suite") - - def my_func(): - return 42 - - def data_gen(): - return (), {} - - suite.add_benchmark(my_func, data_gen) - suite.run(iterations=2, verbose=False) - - # Create baseline file - baseline_path = os.path.join(temp_dir, "baseline.csv") - baseline_data = pd.DataFrame( - { - "function": ["my_func"], - "size": ["default"], - "mean_time": [0.001], # Baseline time - } - ) - baseline_data.to_csv(baseline_path, index=False) - - comparison = suite.compare_with_baseline(baseline_path) - - assert isinstance(comparison, pd.DataFrame) - assert "speedup" in comparison.columns - assert "mean_time_current" in comparison.columns - assert "mean_time_baseline" in comparison.columns - - -# ============================================================================ -# Test benchmark_module -# ============================================================================ - - -class TestBenchmarkModule: - """Tests for benchmark_module function.""" - - def test_benchmark_builtin_module(self): - """Test benchmarking a standard library module.""" - suite = benchmark_module("math", pattern="sqrt*") - - assert isinstance(suite, BenchmarkSuite) - assert suite.name == "math" - # sqrt should be matched - assert len(suite.benchmarks) >= 0 # May find sqrt - - def test_benchmark_module_with_pattern(self): - """Test pattern matching in module.""" - suite = benchmark_module("os.path", pattern="is*") - - assert isinstance(suite, BenchmarkSuite) - # Should find isfile, isdir, etc. - func_names = [b["name"] for b in suite.benchmarks] - # At least one of these should be matched - assert any(name.startswith("is") for name in func_names) or len(func_names) == 0 - - def test_benchmark_nonexistent_module(self): - """Test with non-existent module raises ImportError.""" - with pytest.raises(ImportError): - benchmark_module("nonexistent_module_12345") - - -# ============================================================================ -# Main -# ============================================================================ - -if __name__ == "__main__": - import os - - import pytest - - pytest.main([os.path.abspath(__file__)]) - -# -------------------------------------------------------------------------------- -# Start of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/benchmark.py -# -------------------------------------------------------------------------------- -# #!/usr/bin/env python3 -# # -*- coding: utf-8 -*- -# # Time-stamp: "2025-07-25 05:30:00" -# # File: benchmark.py -# -# """ -# Core benchmarking functionality for SciTeX. -# """ -# -# import time -# import numpy as np -# import pandas as pd -# from typing import Callable, Dict, List, Any, Optional, Tuple -# from dataclasses import dataclass -# import inspect -# import gc -# import os -# from pathlib import Path -# -# -# @dataclass -# class BenchmarkResult: -# """Results from a benchmark run.""" -# -# function_name: str -# module: str -# mean_time: float -# std_time: float -# min_time: float -# max_time: float -# iterations: int -# input_size: Optional[str] = None -# memory_usage: Optional[float] = None -# notes: Optional[str] = None -# -# def __str__(self): -# return ( -# f"{self.function_name}: {self.mean_time:.3f}s ± {self.std_time:.3f}s " -# f"(n={self.iterations})" -# ) -# -# def to_dict(self): -# """Convert to dictionary for easy serialization.""" -# return { -# "function": self.function_name, -# "module": self.module, -# "mean_time": self.mean_time, -# "std_time": self.std_time, -# "min_time": self.min_time, -# "max_time": self.max_time, -# "iterations": self.iterations, -# "input_size": self.input_size, -# "memory_usage": self.memory_usage, -# "notes": self.notes, -# } -# -# -# def benchmark_function( -# func: Callable, -# args: tuple = (), -# kwargs: dict = None, -# iterations: int = 10, -# warmup: int = 2, -# input_size: Optional[str] = None, -# measure_memory: bool = False, -# ) -> BenchmarkResult: -# """ -# Benchmark a single function. -# -# Parameters -# ---------- -# func : Callable -# Function to benchmark -# args : tuple -# Arguments to pass to function -# kwargs : dict -# Keyword arguments to pass to function -# iterations : int -# Number of benchmark iterations -# warmup : int -# Number of warmup iterations -# input_size : str, optional -# Description of input size -# measure_memory : bool -# Whether to measure memory usage -# -# Returns -# ------- -# BenchmarkResult -# Benchmark results -# """ -# if kwargs is None: -# kwargs = {} -# -# # Warmup runs -# for _ in range(warmup): -# _ = func(*args, **kwargs) -# -# # Garbage collection before timing -# gc.collect() -# -# # Timing runs -# times = [] -# for _ in range(iterations): -# start = time.perf_counter() -# _ = func(*args, **kwargs) -# end = time.perf_counter() -# times.append(end - start) -# -# times = np.array(times) -# -# # Get function info -# module = inspect.getmodule(func).__name__ if inspect.getmodule(func) else "unknown" -# -# # Memory measurement (simplified) -# memory_usage = None -# if measure_memory: -# try: -# import psutil -# -# process = psutil.Process(os.getpid()) -# memory_usage = process.memory_info().rss / 1024 / 1024 # MB -# except: -# pass -# -# return BenchmarkResult( -# function_name=func.__name__, -# module=module, -# mean_time=np.mean(times), -# std_time=np.std(times), -# min_time=np.min(times), -# max_time=np.max(times), -# iterations=iterations, -# input_size=input_size, -# memory_usage=memory_usage, -# ) -# -# -# def compare_implementations( -# implementations: Dict[str, Callable], -# test_data_generator: Callable[[], Tuple[tuple, dict]], -# iterations: int = 10, -# sizes: Optional[List[str]] = None, -# ) -> pd.DataFrame: -# """ -# Compare multiple implementations of the same functionality. -# -# Parameters -# ---------- -# implementations : dict -# Dictionary mapping implementation names to functions -# test_data_generator : callable -# Function that returns (args, kwargs) for testing -# iterations : int -# Number of iterations per implementation -# sizes : list, optional -# List of input sizes to test -# -# Returns -# ------- -# pd.DataFrame -# Comparison results -# """ -# results = [] -# -# for name, func in implementations.items(): -# # Generate test data -# args, kwargs = test_data_generator() -# -# # Benchmark -# result = benchmark_function( -# func, args=args, kwargs=kwargs, iterations=iterations -# ) -# -# results.append( -# { -# "implementation": name, -# "mean_time": result.mean_time, -# "std_time": result.std_time, -# "speedup": 1.0, # Will calculate relative to baseline -# } -# ) -# -# df = pd.DataFrame(results) -# -# # Calculate speedup relative to first implementation -# baseline_time = df.iloc[0]["mean_time"] -# df["speedup"] = baseline_time / df["mean_time"] -# -# return df -# -# -# class BenchmarkSuite: -# """Collection of benchmarks for a module or set of functions.""" -# -# def __init__(self, name: str): -# self.name = name -# self.benchmarks = [] -# self.results = [] -# -# def add_benchmark( -# self, -# func: Callable, -# test_data_generator: Callable[[], Tuple[tuple, dict]], -# name: Optional[str] = None, -# sizes: Optional[List[str]] = None, -# ): -# """Add a benchmark to the suite.""" -# self.benchmarks.append( -# { -# "func": func, -# "data_gen": test_data_generator, -# "name": name or func.__name__, -# "sizes": sizes or ["default"], -# } -# ) -# -# def run(self, iterations: int = 10, verbose: bool = True) -> pd.DataFrame: -# """Run all benchmarks in the suite.""" -# results = [] -# -# for benchmark in self.benchmarks: -# if verbose: -# print(f"Running benchmark: {benchmark['name']}") -# -# for size in benchmark["sizes"]: -# # Generate test data -# args, kwargs = benchmark["data_gen"]() -# -# # Run benchmark -# result = benchmark_function( -# benchmark["func"], -# args=args, -# kwargs=kwargs, -# iterations=iterations, -# input_size=size, -# ) -# -# result_dict = result.to_dict() -# result_dict["size"] = size -# results.append(result_dict) -# -# if verbose: -# print(f" {size}: {result}") -# -# self.results = pd.DataFrame(results) -# return self.results -# -# def save_results(self, path: str): -# """Save benchmark results to CSV.""" -# if self.results is not None: -# self.results.to_csv(path, index=False) -# -# def compare_with_baseline(self, baseline_path: str) -> pd.DataFrame: -# """Compare current results with baseline.""" -# baseline = pd.read_csv(baseline_path) -# -# # Merge on function name and size -# comparison = pd.merge( -# self.results, -# baseline, -# on=["function", "size"], -# suffixes=("_current", "_baseline"), -# ) -# -# # Calculate speedup -# comparison["speedup"] = ( -# comparison["mean_time_baseline"] / comparison["mean_time_current"] -# ) -# -# return comparison -# -# -# def benchmark_module(module_name: str, pattern: str = "test_*") -> BenchmarkSuite: -# """ -# Create a benchmark suite for all matching functions in a module. -# -# Parameters -# ---------- -# module_name : str -# Name of module to benchmark -# pattern : str -# Pattern to match function names -# -# Returns -# ------- -# BenchmarkSuite -# Suite containing all matching benchmarks -# """ -# import importlib -# import fnmatch -# -# module = importlib.import_module(module_name) -# suite = BenchmarkSuite(module_name) -# -# # Find all matching functions -# for name in dir(module): -# if fnmatch.fnmatch(name, pattern): -# func = getattr(module, name) -# if callable(func): -# # Create simple test data generator -# def data_gen(): -# return (), {} -# -# suite.add_benchmark(func, data_gen, name) -# -# return suite -# -# -# # Pre-defined benchmark suites for common SciTeX modules -# def create_io_benchmark_suite() -> BenchmarkSuite: -# """Create benchmark suite for I/O operations.""" -# import tempfile -# import numpy as np -# -# suite = BenchmarkSuite("IO Operations") -# -# # Benchmark numpy file loading -# def numpy_data_gen(): -# data = np.random.randn(1000, 1000) -# with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: -# np.save(f.name, data) -# return (f.name,), {} -# -# import scitex.io -# -# suite.add_benchmark( -# scitex.io.load, numpy_data_gen, "load_numpy", sizes=["1MB", "10MB", "100MB"] -# ) -# -# return suite -# -# -# def create_stats_benchmark_suite() -> BenchmarkSuite: -# """Create benchmark suite for statistics operations.""" -# import numpy as np -# -# suite = BenchmarkSuite("Statistics Operations") -# -# # Benchmark correlation -# def corr_data_gen(): -# x = np.random.randn(1000) -# y = x + np.random.randn(1000) * 0.5 -# return (x, y), {"n_perm": 1000} -# -# import scitex.stats -# -# suite.add_benchmark( -# scitex.stats.corr_test, -# corr_data_gen, -# "correlation_test", -# sizes=["1000_samples", "10000_samples"], -# ) -# -# return suite -# -# -# def run_all_benchmarks( -# output_dir: str = "./benchmark_results", -# ) -> Dict[str, pd.DataFrame]: -# """ -# Run all pre-defined benchmark suites. -# -# Parameters -# ---------- -# output_dir : str -# Directory to save results -# -# Returns -# ------- -# dict -# Dictionary mapping suite names to results -# """ -# output_path = Path(output_dir) -# output_path.mkdir(exist_ok=True) -# -# suites = { -# "io": create_io_benchmark_suite(), -# "stats": create_stats_benchmark_suite(), -# } -# -# results = {} -# for name, suite in suites.items(): -# print(f"\nRunning {name} benchmarks...") -# df = suite.run() -# -# # Save results -# suite.save_results(output_path / f"{name}_benchmark.csv") -# results[name] = df -# -# # Create summary -# summary = [] -# for name, df in results.items(): -# summary.append( -# { -# "suite": name, -# "functions": len(df["function"].unique()), -# "mean_time": df["mean_time"].mean(), -# "total_time": df["mean_time"].sum(), -# } -# ) -# -# summary_df = pd.DataFrame(summary) -# summary_df.to_csv(output_path / "benchmark_summary.csv", index=False) -# -# print(f"\nBenchmark results saved to {output_path}") -# return results - -# -------------------------------------------------------------------------------- -# End of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/benchmark.py -# -------------------------------------------------------------------------------- diff --git a/tests/scitex/benchmark/test_monitor.py b/tests/scitex/benchmark/test_monitor.py deleted file mode 100644 index dbcef4e05..000000000 --- a/tests/scitex/benchmark/test_monitor.py +++ /dev/null @@ -1,922 +0,0 @@ -#!/usr/bin/env python3 -# Time-stamp: "2025-01-05" -# File: test_monitor.py - -"""Tests for scitex.benchmark.monitor module.""" - -import json -import os -import tempfile -import threading -import time -import warnings - -import pytest - -from scitex.benchmark.monitor import ( - PerformanceMetric, - PerformanceMonitor, - add_performance_alert_handler, - get_performance_stats, - set_performance_alerts, - track_performance, -) - -# ============================================================================ -# Test Fixtures -# ============================================================================ - - -@pytest.fixture -def monitor(): - """Create a fresh PerformanceMonitor instance.""" - return PerformanceMonitor(max_history=100) - - -@pytest.fixture -def started_monitor(): - """Create a started PerformanceMonitor instance.""" - mon = PerformanceMonitor(max_history=100) - mon.start() - yield mon - mon.stop() - - -@pytest.fixture -def sample_metric(): - """Create a sample PerformanceMetric.""" - return PerformanceMetric( - timestamp=time.time(), - function="test_function", - duration=0.5, - memory_delta=10.0, - args_size=100, - result_size=50, - exception=None, - ) - - -@pytest.fixture -def temp_dir(): - """Create a temporary directory.""" - with tempfile.TemporaryDirectory() as tmpdir: - yield tmpdir - - -# ============================================================================ -# Test PerformanceMetric -# ============================================================================ - - -class TestPerformanceMetric: - """Tests for PerformanceMetric dataclass.""" - - def test_creation_with_all_fields(self, sample_metric): - """Test PerformanceMetric with all fields.""" - assert sample_metric.function == "test_function" - assert sample_metric.duration == 0.5 - assert sample_metric.memory_delta == 10.0 - assert sample_metric.args_size == 100 - assert sample_metric.result_size == 50 - assert sample_metric.exception is None - - def test_creation_with_required_fields_only(self): - """Test PerformanceMetric with only required fields.""" - metric = PerformanceMetric( - timestamp=1234567890.0, - function="my_func", - duration=0.1, - ) - - assert metric.timestamp == 1234567890.0 - assert metric.function == "my_func" - assert metric.duration == 0.1 - assert metric.memory_delta is None - assert metric.args_size is None - assert metric.result_size is None - assert metric.exception is None - - def test_creation_with_exception(self): - """Test PerformanceMetric with exception.""" - metric = PerformanceMetric( - timestamp=time.time(), - function="error_func", - duration=0.01, - exception="ValueError: test error", - ) - - assert metric.exception == "ValueError: test error" - - -# ============================================================================ -# Test PerformanceMonitor -# ============================================================================ - - -class TestPerformanceMonitor: - """Tests for PerformanceMonitor class.""" - - def test_monitor_creation(self, monitor): - """Test monitor initialization.""" - assert monitor.max_history == 100 - assert len(monitor.metrics) == 0 - assert monitor.is_monitoring is False - # New instances start with empty callbacks (default handler only on global) - assert isinstance(monitor.alert_callbacks, list) - - def test_start_stop(self, monitor): - """Test start and stop monitoring.""" - assert monitor.is_monitoring is False - - monitor.start() - assert monitor.is_monitoring is True - - monitor.stop() - assert monitor.is_monitoring is False - - def test_record_metric_when_monitoring(self, started_monitor, sample_metric): - """Test recording metrics when monitoring is active.""" - started_monitor.record_metric(sample_metric) - - assert len(started_monitor.metrics) == 1 - assert started_monitor.function_stats["test_function"]["count"] == 1 - - def test_record_metric_when_not_monitoring(self, monitor, sample_metric): - """Test that metrics are not recorded when monitoring is off.""" - monitor.record_metric(sample_metric) - - assert len(monitor.metrics) == 0 - - def test_function_stats_updated(self, started_monitor): - """Test that function stats are updated correctly.""" - for i in range(5): - metric = PerformanceMetric( - timestamp=time.time(), - function="my_func", - duration=0.1 * (i + 1), - ) - started_monitor.record_metric(metric) - - stats = started_monitor.function_stats["my_func"] - assert stats["count"] == 5 - assert stats["min_time"] == 0.1 - assert stats["max_time"] == 0.5 - assert abs(stats["total_time"] - 1.5) < 0.001 - - def test_error_tracking(self, started_monitor): - """Test that errors are tracked.""" - # Record normal metric - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="my_func", duration=0.1) - ) - - # Record error metric - started_monitor.record_metric( - PerformanceMetric( - timestamp=time.time(), - function="my_func", - duration=0.1, - exception="Error!", - ) - ) - - stats = started_monitor.function_stats["my_func"] - assert stats["count"] == 2 - assert stats["errors"] == 1 - - def test_max_history_limit(self): - """Test that max_history limits stored metrics.""" - monitor = PerformanceMonitor(max_history=5) - monitor.start() - - for i in range(10): - monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="func", duration=0.01) - ) - - assert len(monitor.metrics) == 5 - monitor.stop() - - def test_get_stats_all(self, started_monitor): - """Test get_stats for all functions.""" - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="func1", duration=0.1) - ) - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="func2", duration=0.2) - ) - - stats = started_monitor.get_stats() - - assert "func1" in stats - assert "func2" in stats - assert stats["func1"]["avg_time"] == 0.1 - assert stats["func2"]["avg_time"] == 0.2 - - def test_get_stats_single_function(self, started_monitor): - """Test get_stats for single function.""" - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="my_func", duration=0.1) - ) - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="my_func", duration=0.3) - ) - - stats = started_monitor.get_stats("my_func") - - assert stats["function"] == "my_func" - assert stats["count"] == 2 - assert stats["avg_time"] == 0.2 - assert stats["min_time"] == 0.1 - assert stats["max_time"] == 0.3 - - def test_get_stats_unknown_function(self, started_monitor): - """Test get_stats for unknown function.""" - stats = started_monitor.get_stats("unknown_func") - assert stats == {} - - def test_get_recent_metrics(self, started_monitor): - """Test get_recent_metrics.""" - for i in range(10): - started_monitor.record_metric( - PerformanceMetric( - timestamp=time.time() + i, function=f"func_{i}", duration=0.01 - ) - ) - - recent = started_monitor.get_recent_metrics(5) - - assert len(recent) == 5 - # Should be the last 5 - assert recent[0].function == "func_5" - assert recent[-1].function == "func_9" - - def test_clear_metrics(self, started_monitor, sample_metric): - """Test clearing metrics.""" - started_monitor.record_metric(sample_metric) - assert len(started_monitor.metrics) == 1 - - started_monitor.clear() - - assert len(started_monitor.metrics) == 0 - assert len(started_monitor.function_stats) == 0 - - def test_save_metrics(self, started_monitor, sample_metric, temp_dir): - """Test saving metrics to file.""" - started_monitor.record_metric(sample_metric) - - output_path = os.path.join(temp_dir, "metrics.json") - started_monitor.save_metrics(output_path) - - assert os.path.exists(output_path) - - with open(output_path) as f: - data = json.load(f) - - assert "metrics" in data - assert "stats" in data - assert len(data["metrics"]) == 1 - assert data["metrics"][0]["function"] == "test_function" - - def test_load_metrics(self, monitor, temp_dir): - """Test loading metrics from file.""" - # Create test data file - data = { - "metrics": [ - { - "timestamp": 123456.0, - "function": "loaded_func", - "duration": 0.5, - "memory_delta": None, - "args_size": None, - "result_size": None, - "exception": None, - } - ], - "stats": {"loaded_func": {"count": 1, "total_time": 0.5}}, - } - - input_path = os.path.join(temp_dir, "metrics.json") - with open(input_path, "w") as f: - json.dump(data, f) - - monitor.load_metrics(input_path) - - assert len(monitor.metrics) == 1 - assert monitor.metrics[0].function == "loaded_func" - - def test_thread_safety(self, started_monitor): - """Test thread safety of recording metrics.""" - num_threads = 5 - metrics_per_thread = 100 - - def record_metrics(): - for i in range(metrics_per_thread): - started_monitor.record_metric( - PerformanceMetric( - timestamp=time.time(), - function="thread_func", - duration=0.001, - ) - ) - - threads = [threading.Thread(target=record_metrics) for _ in range(num_threads)] - for t in threads: - t.start() - for t in threads: - t.join() - - stats = started_monitor.function_stats["thread_func"] - assert stats["count"] == num_threads * metrics_per_thread - - -# ============================================================================ -# Test Alerts -# ============================================================================ - - -class TestAlerts: - """Tests for performance alerts.""" - - def test_slow_function_alert(self, started_monitor): - """Test slow function alert.""" - alerts_received = [] - - def alert_handler(alert): - alerts_received.append(alert) - - started_monitor.alert_callbacks = [alert_handler] - started_monitor.alerts["slow_function"] = 0.1 - - # Record slow metric - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="slow_func", duration=0.5) - ) - - assert len(alerts_received) == 1 - assert alerts_received[0]["type"] == "slow_function" - assert alerts_received[0]["function"] == "slow_func" - - def test_memory_spike_alert(self, started_monitor): - """Test memory spike alert.""" - alerts_received = [] - - def alert_handler(alert): - alerts_received.append(alert) - - started_monitor.alert_callbacks = [alert_handler] - started_monitor.alerts["memory_spike"] = 50 - - # Record metric with large memory delta - started_monitor.record_metric( - PerformanceMetric( - timestamp=time.time(), - function="mem_func", - duration=0.1, - memory_delta=100, - ) - ) - - assert len(alerts_received) == 1 - assert alerts_received[0]["type"] == "memory_spike" - - def test_no_alert_below_threshold(self, started_monitor): - """Test no alert when below threshold.""" - alerts_received = [] - - def alert_handler(alert): - alerts_received.append(alert) - - started_monitor.alert_callbacks = [alert_handler] - started_monitor.alerts["slow_function"] = 1.0 - - # Record fast metric - started_monitor.record_metric( - PerformanceMetric(timestamp=time.time(), function="fast_func", duration=0.1) - ) - - # No slow function alert expected - slow_alerts = [a for a in alerts_received if a["type"] == "slow_function"] - assert len(slow_alerts) == 0 - - def test_add_alert_callback(self, monitor): - """Test adding alert callbacks.""" - initial_count = len(monitor.alert_callbacks) - - def my_handler(alert): - pass - - monitor.add_alert_callback(my_handler) - - assert len(monitor.alert_callbacks) == initial_count + 1 - - -# ============================================================================ -# Test track_performance decorator -# ============================================================================ - - -class TestTrackPerformance: - """Tests for track_performance decorator.""" - - def test_track_performance_basic(self): - """Test basic track_performance usage.""" - - @track_performance - def my_func(x): - return x * 2 - - result = my_func(5) - assert result == 10 - - def test_track_performance_preserves_function(self): - """Test that decorator preserves function metadata.""" - - @track_performance - def original_name(x): - """Original docstring.""" - return x - - assert original_name.__name__ == "original_name" - assert original_name.__doc__ == "Original docstring." - - def test_track_performance_with_exception(self): - """Test track_performance with exception.""" - - @track_performance - def error_func(): - raise ValueError("Test error") - - with pytest.raises(ValueError): - error_func() - - -# ============================================================================ -# Test Module-Level Functions -# ============================================================================ - - -class TestModuleFunctions: - """Tests for module-level functions.""" - - def test_get_performance_stats(self): - """Test get_performance_stats function.""" - stats = get_performance_stats() - assert isinstance(stats, dict) - - def test_get_performance_stats_with_function(self): - """Test get_performance_stats with function name.""" - stats = get_performance_stats("unknown_func") - # Should return empty dict for unknown function - assert isinstance(stats, dict) - - def test_set_performance_alerts(self): - """Test set_performance_alerts function.""" - # Should not raise - set_performance_alerts(slow_function=2.0, memory_spike=200) - - def test_add_performance_alert_handler(self): - """Test add_performance_alert_handler function.""" - - def my_handler(alert): - pass - - # Should not raise - add_performance_alert_handler(my_handler) - - -# ============================================================================ -# Test Default Alert Handler -# ============================================================================ - - -class TestDefaultAlertHandler: - """Tests for default alert handler.""" - - def test_default_handler_slow_function_warning(self): - """Test default handler issues warning for slow function.""" - from scitex.benchmark.monitor import _default_alert_handler - - # Create monitor with default handler registered - monitor = PerformanceMonitor(max_history=100) - monitor.add_alert_callback(_default_alert_handler) - monitor.start() - monitor.alerts["slow_function"] = 0.01 - - try: - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - - monitor.record_metric( - PerformanceMetric( - timestamp=time.time(), function="slow_func", duration=0.1 - ) - ) - - # Check for warning - slow_warnings = [ - warning for warning in w if "Slow function" in str(warning.message) - ] - assert len(slow_warnings) >= 1 - finally: - monitor.stop() - - -# ============================================================================ -# Main -# ============================================================================ - -if __name__ == "__main__": - import os - - import pytest - - pytest.main([os.path.abspath(__file__)]) - -# -------------------------------------------------------------------------------- -# Start of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/monitor.py -# -------------------------------------------------------------------------------- -# #!/usr/bin/env python3 -# # -*- coding: utf-8 -*- -# # Time-stamp: "2025-07-25 05:40:00" -# # File: monitor.py -# -# """ -# Real-time performance monitoring for SciTeX. -# """ -# -# import time -# import threading -# from collections import deque, defaultdict -# from typing import Dict, List, Optional, Callable, Any -# from dataclasses import dataclass -# from datetime import datetime -# import json -# from pathlib import Path -# import warnings -# -# -# @dataclass -# class PerformanceMetric: -# """Single performance measurement.""" -# -# timestamp: float -# function: str -# duration: float -# memory_delta: Optional[float] = None -# args_size: Optional[int] = None -# result_size: Optional[int] = None -# exception: Optional[str] = None -# -# -# class PerformanceMonitor: -# """ -# Monitor performance metrics for SciTeX functions. -# -# Example -# ------- -# >>> monitor = PerformanceMonitor() -# >>> monitor.start() -# >>> # Your code here -# >>> stats = monitor.get_stats() -# """ -# -# def __init__(self, max_history: int = 1000): -# self.max_history = max_history -# self.metrics = deque(maxlen=max_history) -# self.function_stats = defaultdict( -# lambda: { -# "count": 0, -# "total_time": 0.0, -# "min_time": float("inf"), -# "max_time": 0.0, -# "errors": 0, -# } -# ) -# self.is_monitoring = False -# self._lock = threading.Lock() -# -# # Alerts configuration -# self.alerts = { -# "slow_function": 1.0, # Alert if function takes > 1s -# "memory_spike": 100, # Alert if memory increases > 100MB -# "error_rate": 0.1, # Alert if error rate > 10% -# } -# self.alert_callbacks = [] -# -# def start(self): -# """Start monitoring.""" -# self.is_monitoring = True -# -# def stop(self): -# """Stop monitoring.""" -# self.is_monitoring = False -# -# def record_metric(self, metric: PerformanceMetric): -# """Record a performance metric.""" -# if not self.is_monitoring: -# return -# -# with self._lock: -# self.metrics.append(metric) -# -# # Update function statistics -# stats = self.function_stats[metric.function] -# stats["count"] += 1 -# stats["total_time"] += metric.duration -# stats["min_time"] = min(stats["min_time"], metric.duration) -# stats["max_time"] = max(stats["max_time"], metric.duration) -# -# if metric.exception: -# stats["errors"] += 1 -# -# # Check alerts -# self._check_alerts(metric) -# -# def _check_alerts(self, metric: PerformanceMetric): -# """Check if metric triggers any alerts.""" -# alerts_triggered = [] -# -# # Slow function alert -# if metric.duration > self.alerts["slow_function"]: -# alerts_triggered.append( -# { -# "type": "slow_function", -# "function": metric.function, -# "duration": metric.duration, -# "threshold": self.alerts["slow_function"], -# } -# ) -# -# # Memory spike alert -# if metric.memory_delta and metric.memory_delta > self.alerts["memory_spike"]: -# alerts_triggered.append( -# { -# "type": "memory_spike", -# "function": metric.function, -# "delta": metric.memory_delta, -# "threshold": self.alerts["memory_spike"], -# } -# ) -# -# # Error rate alert -# stats = self.function_stats[metric.function] -# if stats["count"] > 10: # Only check after sufficient calls -# error_rate = stats["errors"] / stats["count"] -# if error_rate > self.alerts["error_rate"]: -# alerts_triggered.append( -# { -# "type": "high_error_rate", -# "function": metric.function, -# "rate": error_rate, -# "threshold": self.alerts["error_rate"], -# } -# ) -# -# # Trigger callbacks -# for alert in alerts_triggered: -# for callback in self.alert_callbacks: -# callback(alert) -# -# def add_alert_callback(self, callback: Callable[[Dict[str, Any]], None]): -# """Add a callback for performance alerts.""" -# self.alert_callbacks.append(callback) -# -# def get_stats(self, function: Optional[str] = None) -> Dict[str, Any]: -# """ -# Get performance statistics. -# -# Parameters -# ---------- -# function : str, optional -# Specific function to get stats for -# -# Returns -# ------- -# dict -# Performance statistics -# """ -# with self._lock: -# if function: -# stats = self.function_stats.get(function, {}) -# if stats and stats["count"] > 0: -# return { -# "function": function, -# "count": stats["count"], -# "total_time": stats["total_time"], -# "avg_time": stats["total_time"] / stats["count"], -# "min_time": stats["min_time"], -# "max_time": stats["max_time"], -# "error_rate": stats["errors"] / stats["count"], -# } -# return {} -# else: -# # Return all stats -# all_stats = {} -# for func, stats in self.function_stats.items(): -# if stats["count"] > 0: -# all_stats[func] = { -# "count": stats["count"], -# "avg_time": stats["total_time"] / stats["count"], -# "min_time": stats["min_time"], -# "max_time": stats["max_time"], -# "error_rate": stats["errors"] / stats["count"], -# } -# return all_stats -# -# def get_recent_metrics(self, n: int = 100) -> List[PerformanceMetric]: -# """Get n most recent metrics.""" -# with self._lock: -# return list(self.metrics)[-n:] -# -# def save_metrics(self, path: str): -# """Save metrics to file.""" -# with self._lock: -# data = { -# "metrics": [ -# { -# "timestamp": m.timestamp, -# "function": m.function, -# "duration": m.duration, -# "memory_delta": m.memory_delta, -# "args_size": m.args_size, -# "result_size": m.result_size, -# "exception": m.exception, -# } -# for m in self.metrics -# ], -# "stats": dict(self.function_stats), -# } -# -# Path(path).write_text(json.dumps(data, indent=2)) -# -# def load_metrics(self, path: str): -# """Load metrics from file.""" -# data = json.loads(Path(path).read_text()) -# -# with self._lock: -# self.metrics.clear() -# for m in data["metrics"]: -# self.metrics.append(PerformanceMetric(**m)) -# -# self.function_stats.clear() -# self.function_stats.update(data["stats"]) -# -# def clear(self): -# """Clear all metrics.""" -# with self._lock: -# self.metrics.clear() -# self.function_stats.clear() -# -# -# # Global monitor instance -# _global_monitor = PerformanceMonitor() -# -# -# def track_performance(func: Callable) -> Callable: -# """ -# Decorator to track function performance. -# -# Example -# ------- -# >>> @track_performance -# ... def my_function(x): -# ... return x ** 2 -# """ -# from functools import wraps -# import sys -# -# @wraps(func) -# def wrapper(*args, **kwargs): -# if not _global_monitor.is_monitoring: -# return func(*args, **kwargs) -# -# # Get memory before (if available) -# try: -# import psutil -# -# process = psutil.Process() -# mem_before = process.memory_info().rss / 1024 / 1024 -# except: -# mem_before = None -# -# # Time the function -# start_time = time.time() -# exception = None -# result = None -# -# try: -# result = func(*args, **kwargs) -# except Exception as e: -# exception = str(e) -# raise -# finally: -# duration = time.time() - start_time -# -# # Get memory after -# mem_delta = None -# if mem_before is not None: -# try: -# mem_after = process.memory_info().rss / 1024 / 1024 -# mem_delta = mem_after - mem_before -# except: -# pass -# -# # Estimate sizes -# args_size = None -# result_size = None -# try: -# args_size = sys.getsizeof(args) + sys.getsizeof(kwargs) -# if result is not None: -# result_size = sys.getsizeof(result) -# except: -# pass -# -# # Record metric -# metric = PerformanceMetric( -# timestamp=start_time, -# function=func.__name__, -# duration=duration, -# memory_delta=mem_delta, -# args_size=args_size, -# result_size=result_size, -# exception=exception, -# ) -# -# _global_monitor.record_metric(metric) -# -# return result -# -# return wrapper -# -# -# def start_monitoring(): -# """Start global performance monitoring.""" -# _global_monitor.start() -# -# -# def stop_monitoring(): -# """Stop global performance monitoring.""" -# _global_monitor.stop() -# -# -# def get_performance_stats(function: Optional[str] = None) -> Dict[str, Any]: -# """Get performance statistics from global monitor.""" -# return _global_monitor.get_stats(function) -# -# -# def set_performance_alerts(**thresholds): -# """ -# Set performance alert thresholds. -# -# Parameters -# ---------- -# slow_function : float -# Alert if function takes longer than this (seconds) -# memory_spike : float -# Alert if memory increases by more than this (MB) -# error_rate : float -# Alert if error rate exceeds this (0-1) -# """ -# _global_monitor.alerts.update(thresholds) -# -# -# def add_performance_alert_handler(handler: Callable[[Dict[str, Any]], None]): -# """ -# Add a handler for performance alerts. -# -# Example -# ------- -# >>> def alert_handler(alert): -# ... print(f"ALERT: {alert['type']} in {alert['function']}") -# >>> add_performance_alert_handler(alert_handler) -# """ -# _global_monitor.add_alert_callback(handler) -# -# -# # Default alert handler -# def _default_alert_handler(alert: Dict[str, Any]): -# """Default handler that prints warnings.""" -# if alert["type"] == "slow_function": -# warnings.warn( -# f"Slow function: {alert['function']} took {alert['duration']:.2f}s " -# f"(threshold: {alert['threshold']}s)" -# ) -# elif alert["type"] == "memory_spike": -# warnings.warn( -# f"Memory spike: {alert['function']} increased memory by {alert['delta']:.1f}MB " -# f"(threshold: {alert['threshold']}MB)" -# ) -# elif alert["type"] == "high_error_rate": -# warnings.warn( -# f"High error rate: {alert['function']} has {alert['rate']:.1%} error rate " -# f"(threshold: {alert['threshold']:.1%})" -# ) -# -# -# # Register default handler -# add_performance_alert_handler(_default_alert_handler) - -# -------------------------------------------------------------------------------- -# End of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/monitor.py -# -------------------------------------------------------------------------------- diff --git a/tests/scitex/benchmark/test_profiler.py b/tests/scitex/benchmark/test_profiler.py deleted file mode 100644 index 77d1b3292..000000000 --- a/tests/scitex/benchmark/test_profiler.py +++ /dev/null @@ -1,769 +0,0 @@ -#!/usr/bin/env python3 -# Time-stamp: "2025-01-05" -# File: test_profiler.py - -"""Tests for scitex.benchmark.profiler module.""" - -import io -import os -import sys -import time - -import pytest - -from scitex.benchmark.profiler import ( - FunctionProfiler, - LineProfiler, - get_memory_usage, - get_profile_report, - profile_block, - profile_function, - profile_module, - track_memory, -) - -# ============================================================================ -# Test Fixtures -# ============================================================================ - - -@pytest.fixture -def profiler(): - """Create a fresh FunctionProfiler instance.""" - return FunctionProfiler() - - -@pytest.fixture -def line_profiler(): - """Create a fresh LineProfiler instance.""" - return LineProfiler() - - -@pytest.fixture -def sample_function(): - """A simple function for profiling.""" - - def compute_sum(n): - return sum(range(n)) - - return compute_sum - - -@pytest.fixture -def slow_function(): - """A function that takes measurable time.""" - - def slow_compute(n): - time.sleep(0.01) - return n * 2 - - return slow_compute - - -# ============================================================================ -# Test FunctionProfiler -# ============================================================================ - - -class TestFunctionProfiler: - """Tests for FunctionProfiler class.""" - - def test_profiler_creation(self, profiler): - """Test profiler initialization.""" - assert profiler.profiles == {} - assert profiler.call_counts == {} - assert profiler.total_times == {} - - def test_profile_decorator(self, profiler): - """Test profiling with decorator.""" - - @profiler.profile - def my_func(x): - return x * 2 - - result = my_func(5) - - assert result == 10 - assert "my_func" in profiler.profiles - assert profiler.call_counts["my_func"] == 1 - assert profiler.total_times["my_func"] > 0 - - def test_profile_multiple_calls(self, profiler): - """Test profiling with multiple calls.""" - - @profiler.profile - def my_func(x): - return x + 1 - - for i in range(5): - my_func(i) - - assert profiler.call_counts["my_func"] == 5 - assert len(profiler.profiles["my_func"]) == 5 - - def test_profile_preserves_function_name(self, profiler): - """Test that decorator preserves function metadata.""" - - @profiler.profile - def original_name(x): - """Original docstring.""" - return x - - assert original_name.__name__ == "original_name" - assert original_name.__doc__ == "Original docstring." - - def test_profile_with_args_and_kwargs(self, profiler): - """Test profiling function with args and kwargs.""" - - @profiler.profile - def complex_func(a, b, c=10, d=20): - return a + b + c + d - - result = complex_func(1, 2, c=30, d=40) - - assert result == 73 - assert profiler.call_counts["complex_func"] == 1 - - def test_get_stats_returns_none_for_unknown(self, profiler): - """Test get_stats returns None for unknown function.""" - stats = profiler.get_stats("unknown_function") - assert stats is None - - def test_get_stats_returns_stats(self, profiler): - """Test get_stats returns Stats object.""" - - @profiler.profile - def my_func(): - return 42 - - my_func() - my_func() - - stats = profiler.get_stats("my_func") - assert stats is not None - - def test_print_stats_single_function(self, profiler, capsys): - """Test print_stats for single function.""" - - @profiler.profile - def my_func(): - return 42 - - my_func() - profiler.print_stats("my_func") - - captured = capsys.readouterr() - assert "Profile for my_func" in captured.out - assert "Total calls: 1" in captured.out - - def test_print_stats_all_functions(self, profiler, capsys): - """Test print_stats for all functions.""" - - @profiler.profile - def func1(): - return 1 - - @profiler.profile - def func2(): - return 2 - - func1() - func2() - - profiler.print_stats() - - captured = capsys.readouterr() - assert "func1" in captured.out - assert "func2" in captured.out - - def test_get_report(self, profiler): - """Test get_report returns comprehensive report.""" - - @profiler.profile - def my_func(): - return sum(range(100)) - - my_func() - my_func() - - report = profiler.get_report() - - assert "my_func" in report - assert report["my_func"]["call_count"] == 2 - assert "total_time" in report["my_func"] - assert "avg_time" in report["my_func"] - assert "profile" in report["my_func"] - - -# ============================================================================ -# Test profile_function (global profiler) -# ============================================================================ - - -class TestProfileFunction: - """Tests for profile_function decorator.""" - - def test_profile_function_decorator(self): - """Test global profile_function decorator.""" - - @profile_function - def test_func(x): - return x**2 - - result = test_func(5) - assert result == 25 - - def test_profile_function_preserves_return(self): - """Test that decorated function returns correctly.""" - - @profile_function - def compute(a, b): - return a * b - - assert compute(3, 4) == 12 - - -# ============================================================================ -# Test get_profile_report -# ============================================================================ - - -class TestGetProfileReport: - """Tests for get_profile_report function.""" - - def test_get_profile_report_returns_dict(self): - """Test get_profile_report returns dictionary.""" - report = get_profile_report() - assert isinstance(report, dict) - - -# ============================================================================ -# Test profile_block context manager -# ============================================================================ - - -class TestProfileBlock: - """Tests for profile_block context manager.""" - - def test_profile_block_basic(self, capsys): - """Test basic profile_block usage.""" - with profile_block("test_block"): - result = sum(range(1000)) - - captured = capsys.readouterr() - assert "Profile for block 'test_block'" in captured.out - assert "Total time:" in captured.out - - def test_profile_block_with_slow_code(self, capsys): - """Test profile_block with slow code.""" - with profile_block("slow_block"): - time.sleep(0.02) - - captured = capsys.readouterr() - assert "slow_block" in captured.out - # Time should be at least 0.01s - assert "0.0" in captured.out # Time should be visible - - def test_profile_block_exception_handling(self, capsys): - """Test profile_block handles exceptions properly.""" - with pytest.raises(ValueError): - with profile_block("error_block"): - raise ValueError("Test error") - - # Profile output should still be printed - captured = capsys.readouterr() - assert "error_block" in captured.out - - -# ============================================================================ -# Test profile_module -# ============================================================================ - - -class TestProfileModule: - """Tests for profile_module function.""" - - def test_profile_module_returns_profiler(self, capsys): - """Test profile_module returns a profiler.""" - profiler = profile_module("math", pattern="sqrt") - - assert isinstance(profiler, FunctionProfiler) - - captured = capsys.readouterr() - assert "Profiling" in captured.out - - def test_profile_module_wraps_functions(self, capsys): - """Test profile_module wraps matching functions.""" - profiler = profile_module("os.path", pattern="exists") - - captured = capsys.readouterr() - # Should report profiling functions - assert "Profiling" in captured.out - - -# ============================================================================ -# Test LineProfiler -# ============================================================================ - - -class TestLineProfiler: - """Tests for LineProfiler class.""" - - def test_line_profiler_creation(self, line_profiler): - """Test LineProfiler initialization.""" - assert line_profiler.timings == {} - - def test_profile_lines_decorator(self, line_profiler): - """Test profile_lines decorator.""" - - @line_profiler.profile_lines - def my_func(n): - result = 0 - for i in range(n): - result += i - return result - - result = my_func(100) - - assert result == 4950 # sum of 0..99 - assert "my_func" in line_profiler.timings - assert len(line_profiler.timings["my_func"]) == 1 - - def test_profile_lines_stores_timing(self, line_profiler): - """Test that profile_lines stores timing info.""" - - @line_profiler.profile_lines - def my_func(): - time.sleep(0.01) - return 42 - - my_func() - - timing = line_profiler.timings["my_func"][0] - assert "total_time" in timing - assert timing["total_time"] >= 0.009 # At least 9ms - assert "source" in timing - - def test_profile_lines_stores_source(self, line_profiler): - """Test that profile_lines stores source code.""" - - @line_profiler.profile_lines - def my_func(): - x = 1 - y = 2 - return x + y - - my_func() - - timing = line_profiler.timings["my_func"][0] - source = timing["source"] - assert isinstance(source, list) - assert len(source) > 0 - # Source should contain the function code - source_text = "".join(source) - assert "return" in source_text - - def test_print_timings(self, line_profiler, capsys): - """Test print_timings output.""" - - @line_profiler.profile_lines - def my_func(): - return 42 - - my_func() - line_profiler.print_timings("my_func") - - captured = capsys.readouterr() - assert "Line timings for my_func" in captured.out - assert "Total time:" in captured.out - assert "Source code:" in captured.out - - def test_print_timings_unknown_function(self, line_profiler, capsys): - """Test print_timings for unknown function.""" - line_profiler.print_timings("unknown_func") - - captured = capsys.readouterr() - assert "No timings for unknown_func" in captured.out - - -# ============================================================================ -# Test Memory Utilities -# ============================================================================ - - -class TestGetMemoryUsage: - """Tests for get_memory_usage function.""" - - def test_get_memory_usage_returns_value_or_none(self): - """Test get_memory_usage returns float or None.""" - result = get_memory_usage() - - # Result should be float (if psutil available) or None - assert result is None or isinstance(result, float) - - def test_get_memory_usage_positive_value(self): - """Test get_memory_usage returns positive value if available.""" - result = get_memory_usage() - - if result is not None: - assert result > 0 # Memory usage should be positive - - -class TestTrackMemory: - """Tests for track_memory context manager.""" - - def test_track_memory_basic(self, capsys): - """Test basic track_memory usage.""" - with track_memory("test_allocation"): - # Allocate some memory - data = list(range(10000)) - - captured = capsys.readouterr() - # Output depends on whether psutil is available - if "Memory usage" in captured.out: - assert "test_allocation" in captured.out - assert "Start:" in captured.out - assert "End:" in captured.out - assert "Delta:" in captured.out - - def test_track_memory_exception_handling(self, capsys): - """Test track_memory handles exceptions.""" - with pytest.raises(ValueError): - with track_memory("error_block"): - raise ValueError("Test error") - - # Should still print memory info before exception - captured = capsys.readouterr() - # May or may not have output depending on psutil availability - - def test_track_memory_nested(self, capsys): - """Test nested track_memory blocks.""" - with track_memory("outer"): - data1 = list(range(1000)) - with track_memory("inner"): - data2 = list(range(1000)) - - captured = capsys.readouterr() - # Should have info for both if psutil available - if "Memory usage" in captured.out: - assert "outer" in captured.out or "inner" in captured.out - - -# ============================================================================ -# Main -# ============================================================================ - -if __name__ == "__main__": - import os - - import pytest - - pytest.main([os.path.abspath(__file__)]) - -# -------------------------------------------------------------------------------- -# Start of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/profiler.py -# -------------------------------------------------------------------------------- -# #!/usr/bin/env python3 -# # -*- coding: utf-8 -*- -# # Time-stamp: "2025-07-25 05:35:00" -# # File: profiler.py -# -# """ -# Profiling tools for SciTeX performance analysis. -# """ -# -# import cProfile -# import pstats -# import io -# from typing import Callable, Optional, Dict, Any -# from functools import wraps -# import time -# from contextlib import contextmanager -# -# -# class FunctionProfiler: -# """Profile individual function calls.""" -# -# def __init__(self): -# self.profiles = {} -# self.call_counts = {} -# self.total_times = {} -# -# def profile(self, func: Callable) -> Callable: -# """ -# Decorator to profile a function. -# -# Example -# ------- -# >>> profiler = FunctionProfiler() -# >>> @profiler.profile -# ... def my_function(x): -# ... return x ** 2 -# """ -# -# @wraps(func) -# def wrapper(*args, **kwargs): -# # Create profiler for this call -# pr = cProfile.Profile() -# pr.enable() -# -# # Call function -# start_time = time.time() -# result = func(*args, **kwargs) -# end_time = time.time() -# -# pr.disable() -# -# # Store results -# func_name = func.__name__ -# if func_name not in self.profiles: -# self.profiles[func_name] = [] -# self.call_counts[func_name] = 0 -# self.total_times[func_name] = 0.0 -# -# self.profiles[func_name].append(pr) -# self.call_counts[func_name] += 1 -# self.total_times[func_name] += end_time - start_time -# -# return result -# -# return wrapper -# -# def get_stats(self, func_name: str) -> Optional[pstats.Stats]: -# """Get profiling statistics for a function.""" -# if func_name not in self.profiles: -# return None -# -# # Combine all profiles for this function -# combined = pstats.Stats(self.profiles[func_name][0]) -# for pr in self.profiles[func_name][1:]: -# combined.add(pr) -# -# return combined -# -# def print_stats(self, func_name: Optional[str] = None, top_n: int = 10): -# """Print profiling statistics.""" -# if func_name: -# stats = self.get_stats(func_name) -# if stats: -# print(f"\nProfile for {func_name}:") -# print(f"Total calls: {self.call_counts[func_name]}") -# print(f"Total time: {self.total_times[func_name]:.3f}s") -# print( -# f"Avg time per call: {self.total_times[func_name] / self.call_counts[func_name]:.3f}s" -# ) -# print("\nDetailed stats:") -# stats.sort_stats("cumulative").print_stats(top_n) -# else: -# # Print all functions -# for name in self.profiles: -# self.print_stats(name, top_n) -# -# def get_report(self) -> Dict[str, Any]: -# """Get a summary report of all profiled functions.""" -# report = {} -# for func_name in self.profiles: -# stats = self.get_stats(func_name) -# -# # Get top time consumers -# s = io.StringIO() -# stats.sort_stats("cumulative").print_stats(10, s) -# -# report[func_name] = { -# "call_count": self.call_counts[func_name], -# "total_time": self.total_times[func_name], -# "avg_time": self.total_times[func_name] / self.call_counts[func_name], -# "profile": s.getvalue(), -# } -# -# return report -# -# -# # Global profiler instance -# _global_profiler = FunctionProfiler() -# -# -# def profile_function(func: Callable) -> Callable: -# """ -# Decorator to profile a function using the global profiler. -# -# Example -# ------- -# >>> @profile_function -# ... def my_function(x): -# ... return sum(range(x)) -# """ -# return _global_profiler.profile(func) -# -# -# def get_profile_report() -> Dict[str, Any]: -# """Get profiling report from global profiler.""" -# return _global_profiler.get_report() -# -# -# def print_profile_stats(func_name: Optional[str] = None): -# """Print profiling statistics from global profiler.""" -# _global_profiler.print_stats(func_name) -# -# -# @contextmanager -# def profile_block(name: str): -# """ -# Context manager for profiling a code block. -# -# Example -# ------- -# >>> with profile_block("data_processing"): -# ... # Some expensive operations -# ... data = process_data() -# """ -# pr = cProfile.Profile() -# pr.enable() -# start_time = time.time() -# -# try: -# yield -# finally: -# pr.disable() -# end_time = time.time() -# -# print(f"\nProfile for block '{name}':") -# print(f"Total time: {end_time - start_time:.3f}s") -# -# s = io.StringIO() -# ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") -# ps.print_stats(10) -# print(s.getvalue()) -# -# -# def profile_module(module_name: str, pattern: str = "*") -> Dict[str, Any]: -# """ -# Profile all matching functions in a module. -# -# Parameters -# ---------- -# module_name : str -# Name of module to profile -# pattern : str -# Pattern to match function names -# -# Returns -# ------- -# dict -# Profiling results -# """ -# import importlib -# import fnmatch -# -# module = importlib.import_module(module_name) -# profiler = FunctionProfiler() -# -# # Wrap all matching functions -# wrapped_functions = [] -# for name in dir(module): -# if fnmatch.fnmatch(name, pattern): -# obj = getattr(module, name) -# if callable(obj) and not name.startswith("_"): -# # Replace with profiled version -# profiled = profiler.profile(obj) -# setattr(module, name, profiled) -# wrapped_functions.append(name) -# -# print(f"Profiling {len(wrapped_functions)} functions in {module_name}") -# print(f"Wrapped: {', '.join(wrapped_functions)}") -# print("\nRun your code now. Call get_profile_report() when done.") -# -# return profiler -# -# -# class LineProfiler: -# """ -# Line-by-line profiler for detailed analysis. -# -# Note: This is a simplified version. For production use, -# consider using the line_profiler package. -# """ -# -# def __init__(self): -# self.timings = {} -# -# def profile_lines(self, func: Callable) -> Callable: -# """Profile a function line by line.""" -# import inspect -# -# @wraps(func) -# def wrapper(*args, **kwargs): -# # Get source lines -# source_lines = inspect.getsourcelines(func)[0] -# line_times = {} -# -# # This is a simplified implementation -# # Real line profiling requires bytecode instrumentation -# start_time = time.time() -# result = func(*args, **kwargs) -# end_time = time.time() -# -# # Store timing -# func_name = func.__name__ -# if func_name not in self.timings: -# self.timings[func_name] = [] -# -# self.timings[func_name].append( -# {"total_time": end_time - start_time, "source": source_lines} -# ) -# -# return result -# -# return wrapper -# -# def print_timings(self, func_name: str): -# """Print line timings for a function.""" -# if func_name not in self.timings: -# print(f"No timings for {func_name}") -# return -# -# timing = self.timings[func_name][-1] # Most recent -# print(f"\nLine timings for {func_name}:") -# print(f"Total time: {timing['total_time']:.3f}s") -# print("\nSource code:") -# for i, line in enumerate(timing["source"]): -# print(f"{i + 1:4d}: {line.rstrip()}") -# -# -# # Memory profiling utilities -# def get_memory_usage(): -# """Get current memory usage in MB.""" -# try: -# import psutil -# -# process = psutil.Process() -# return process.memory_info().rss / 1024 / 1024 -# except ImportError: -# return None -# -# -# @contextmanager -# def track_memory(name: str): -# """ -# Track memory usage for a code block. -# -# Example -# ------- -# >>> with track_memory("data_loading"): -# ... data = load_large_dataset() -# """ -# start_mem = get_memory_usage() -# -# try: -# yield -# finally: -# end_mem = get_memory_usage() -# if start_mem and end_mem: -# print(f"\nMemory usage for '{name}':") -# print(f"Start: {start_mem:.1f} MB") -# print(f"End: {end_mem:.1f} MB") -# print(f"Delta: {end_mem - start_mem:+.1f} MB") - -# -------------------------------------------------------------------------------- -# End of Source Code from: /home/ywatanabe/proj/scitex-code/src/scitex/benchmark/profiler.py -# --------------------------------------------------------------------------------